blob: cd6d27bad3fa6f5bdef97635ce97c933f000aca4 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Paulc594bb32019-11-06 07:34:27 +000019 "context"
khenaidoo43c82122018-11-22 18:38:28 -050020 "errors"
21 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 "strings"
23 "sync"
24 "time"
25
Scott Bakerf2596722019-09-27 12:39:56 -070026 "github.com/Shopify/sarama"
khenaidoo43c82122018-11-22 18:38:28 -050027 scc "github.com/bsm/sarama-cluster"
Devmalya Paulc594bb32019-11-06 07:34:27 +000028 "github.com/eapache/go-resiliency/breaker"
khenaidoo43c82122018-11-22 18:38:28 -050029 "github.com/golang/protobuf/proto"
Scott Baker504b4802020-04-17 10:12:20 -070030 "github.com/golang/protobuf/ptypes"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050031 "github.com/google/uuid"
Maninderdfadc982020-10-28 14:04:33 +053032 "github.com/opencord/voltha-lib-go/v4/pkg/log"
33 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
khenaidoo43c82122018-11-22 18:38:28 -050034)
35
khenaidoo4c1a5bf2018-11-29 15:53:42 -050036// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
37// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
38//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050039type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050040 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050041 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050042}
43
npujar467fe752020-01-16 20:17:45 +053044// static check to ensure SaramaClient implements Client
45var _ Client = &SaramaClient{}
46
khenaidoo43c82122018-11-22 18:38:28 -050047// SaramaClient represents the messaging proxy
48type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050049 cAdmin sarama.ClusterAdmin
Neha Sharmad1387da2020-05-07 20:07:28 +000050 KafkaAddress string
khenaidoo43c82122018-11-22 18:38:28 -050051 producer sarama.AsyncProducer
52 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050053 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040054 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050055 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050056 consumerType int
khenaidooca301322019-01-09 23:06:32 -050057 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050058 producerFlushFrequency int
59 producerFlushMessages int
60 producerFlushMaxmessages int
61 producerRetryMax int
62 producerRetryBackOff time.Duration
63 producerReturnSuccess bool
64 producerReturnErrors bool
65 consumerMaxwait int
66 maxProcessingTime int
67 numPartitions int
68 numReplicas int
69 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050070 doneCh chan int
Scott Baker504b4802020-04-17 10:12:20 -070071 metadataCallback func(fromTopic string, timestamp time.Time)
khenaidoo43c82122018-11-22 18:38:28 -050072 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050074 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053076 metadataMaxRetry int
Scott Bakeree6a0872019-10-29 15:59:52 -070077 alive bool
Girish Kumarf8d4f8d2020-08-18 11:45:30 +000078 livenessMutex sync.Mutex
Scott Bakeree6a0872019-10-29 15:59:52 -070079 liveness chan bool
80 livenessChannelInterval time.Duration
81 lastLivenessTime time.Time
82 started bool
Girish Kumarf8d4f8d2020-08-18 11:45:30 +000083 healthinessMutex sync.Mutex
serkant.uluderya2ae470f2020-01-21 11:13:09 -080084 healthy bool
85 healthiness chan bool
khenaidoo43c82122018-11-22 18:38:28 -050086}
87
88type SaramaClientOption func(*SaramaClient)
89
Neha Sharmad1387da2020-05-07 20:07:28 +000090func Address(address string) SaramaClientOption {
khenaidoo43c82122018-11-22 18:38:28 -050091 return func(args *SaramaClient) {
Neha Sharmad1387da2020-05-07 20:07:28 +000092 args.KafkaAddress = address
khenaidoo43c82122018-11-22 18:38:28 -050093 }
94}
95
khenaidooca301322019-01-09 23:06:32 -050096func ConsumerGroupPrefix(prefix string) SaramaClientOption {
97 return func(args *SaramaClient) {
98 args.consumerGroupPrefix = prefix
99 }
100}
101
102func ConsumerGroupName(name string) SaramaClientOption {
103 return func(args *SaramaClient) {
104 args.consumerGroupName = name
105 }
106}
107
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500108func ConsumerType(consumer int) SaramaClientOption {
109 return func(args *SaramaClient) {
110 args.consumerType = consumer
111 }
112}
113
114func ProducerFlushFrequency(frequency int) SaramaClientOption {
115 return func(args *SaramaClient) {
116 args.producerFlushFrequency = frequency
117 }
118}
119
120func ProducerFlushMessages(num int) SaramaClientOption {
121 return func(args *SaramaClient) {
122 args.producerFlushMessages = num
123 }
124}
125
126func ProducerFlushMaxMessages(num int) SaramaClientOption {
127 return func(args *SaramaClient) {
128 args.producerFlushMaxmessages = num
129 }
130}
131
khenaidoo90847922018-12-03 14:47:51 -0500132func ProducerMaxRetries(num int) SaramaClientOption {
133 return func(args *SaramaClient) {
134 args.producerRetryMax = num
135 }
136}
137
138func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
139 return func(args *SaramaClient) {
140 args.producerRetryBackOff = duration
141 }
142}
143
144func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500145 return func(args *SaramaClient) {
146 args.producerReturnErrors = opt
147 }
148}
149
khenaidoo90847922018-12-03 14:47:51 -0500150func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500151 return func(args *SaramaClient) {
152 args.producerReturnSuccess = opt
153 }
154}
155
156func ConsumerMaxWait(wait int) SaramaClientOption {
157 return func(args *SaramaClient) {
158 args.consumerMaxwait = wait
159 }
160}
161
162func MaxProcessingTime(pTime int) SaramaClientOption {
163 return func(args *SaramaClient) {
164 args.maxProcessingTime = pTime
165 }
166}
167
168func NumPartitions(number int) SaramaClientOption {
169 return func(args *SaramaClient) {
170 args.numPartitions = number
171 }
172}
173
174func NumReplicas(number int) SaramaClientOption {
175 return func(args *SaramaClient) {
176 args.numReplicas = number
177 }
178}
179
180func AutoCreateTopic(opt bool) SaramaClientOption {
181 return func(args *SaramaClient) {
182 args.autoCreateTopic = opt
183 }
184}
185
Abhilash S.L294ff522019-06-26 18:14:33 +0530186func MetadatMaxRetries(retry int) SaramaClientOption {
187 return func(args *SaramaClient) {
188 args.metadataMaxRetry = retry
189 }
190}
191
Scott Bakeree6a0872019-10-29 15:59:52 -0700192func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
193 return func(args *SaramaClient) {
194 args.livenessChannelInterval = opt
195 }
196}
197
khenaidoo43c82122018-11-22 18:38:28 -0500198func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
199 client := &SaramaClient{
Neha Sharmad1387da2020-05-07 20:07:28 +0000200 KafkaAddress: DefaultKafkaAddress,
khenaidoo43c82122018-11-22 18:38:28 -0500201 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500202 client.consumerType = DefaultConsumerType
203 client.producerFlushFrequency = DefaultProducerFlushFrequency
204 client.producerFlushMessages = DefaultProducerFlushMessages
205 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
206 client.producerReturnErrors = DefaultProducerReturnErrors
207 client.producerReturnSuccess = DefaultProducerReturnSuccess
208 client.producerRetryMax = DefaultProducerRetryMax
209 client.producerRetryBackOff = DefaultProducerRetryBackoff
210 client.consumerMaxwait = DefaultConsumerMaxwait
211 client.maxProcessingTime = DefaultMaxProcessingTime
212 client.numPartitions = DefaultNumberPartitions
213 client.numReplicas = DefaultNumberReplicas
214 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530215 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Bakeree6a0872019-10-29 15:59:52 -0700216 client.livenessChannelInterval = DefaultLivenessChannelInterval
khenaidoo43c82122018-11-22 18:38:28 -0500217
218 for _, option := range opts {
219 option(client)
220 }
221
khenaidooca301322019-01-09 23:06:32 -0500222 client.groupConsumers = make(map[string]*scc.Consumer)
223
khenaidoo43c82122018-11-22 18:38:28 -0500224 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500225 client.topicLockMap = make(map[string]*sync.RWMutex)
226 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500227 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Bakeree6a0872019-10-29 15:59:52 -0700228
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800229 // healthy and alive until proven otherwise
Scott Bakeree6a0872019-10-29 15:59:52 -0700230 client.alive = true
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800231 client.healthy = true
Scott Bakeree6a0872019-10-29 15:59:52 -0700232
khenaidoo43c82122018-11-22 18:38:28 -0500233 return client
234}
235
Rohan Agrawal31f21802020-06-12 05:38:46 +0000236func (sc *SaramaClient) Start(ctx context.Context) error {
237 logger.Info(ctx, "Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500238
239 // Create the Done channel
240 sc.doneCh = make(chan int, 1)
241
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500242 var err error
243
khenaidoob3244212019-08-27 14:32:27 -0400244 // Add a cleanup in case of failure to startup
245 defer func() {
246 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000247 sc.Stop(ctx)
khenaidoob3244212019-08-27 14:32:27 -0400248 }
249 }()
250
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500251 // Create the Cluster Admin
Rohan Agrawal31f21802020-06-12 05:38:46 +0000252 if err = sc.createClusterAdmin(ctx); err != nil {
253 logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500254 return err
255 }
256
khenaidoo43c82122018-11-22 18:38:28 -0500257 // Create the Publisher
Rohan Agrawal31f21802020-06-12 05:38:46 +0000258 if err := sc.createPublisher(ctx); err != nil {
259 logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500260 return err
261 }
262
khenaidooca301322019-01-09 23:06:32 -0500263 if sc.consumerType == DefaultConsumerType {
264 // Create the master consumers
Rohan Agrawal31f21802020-06-12 05:38:46 +0000265 if err := sc.createConsumer(ctx); err != nil {
266 logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
khenaidooca301322019-01-09 23:06:32 -0500267 return err
268 }
khenaidoo43c82122018-11-22 18:38:28 -0500269 }
270
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500271 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500272 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
273
Rohan Agrawal31f21802020-06-12 05:38:46 +0000274 logger.Info(ctx, "kafka-sarama-client-started")
khenaidooca301322019-01-09 23:06:32 -0500275
Scott Bakeree6a0872019-10-29 15:59:52 -0700276 sc.started = true
277
khenaidoo43c82122018-11-22 18:38:28 -0500278 return nil
279}
280
Rohan Agrawal31f21802020-06-12 05:38:46 +0000281func (sc *SaramaClient) Stop(ctx context.Context) {
282 logger.Info(ctx, "stopping-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500283
Scott Bakeree6a0872019-10-29 15:59:52 -0700284 sc.started = false
285
khenaidoo43c82122018-11-22 18:38:28 -0500286 //Send a message over the done channel to close all long running routines
287 sc.doneCh <- 1
288
khenaidoo43c82122018-11-22 18:38:28 -0500289 if sc.producer != nil {
290 if err := sc.producer.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000291 logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500292 }
293 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500294
khenaidoo43c82122018-11-22 18:38:28 -0500295 if sc.consumer != nil {
296 if err := sc.consumer.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000297 logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500298 }
299 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500300
khenaidooca301322019-01-09 23:06:32 -0500301 for key, val := range sc.groupConsumers {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000302 logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
khenaidooca301322019-01-09 23:06:32 -0500303 if err := val.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000304 logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500305 }
306 }
307
308 if sc.cAdmin != nil {
309 if err := sc.cAdmin.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000310 logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500311 }
312 }
313
314 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500315 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500316
Rohan Agrawal31f21802020-06-12 05:38:46 +0000317 logger.Info(ctx, "sarama-client-stopped")
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500318}
319
khenaidooca301322019-01-09 23:06:32 -0500320//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
321// the invoking function must hold the lock
Rohan Agrawal31f21802020-06-12 05:38:46 +0000322func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500323 // Set the topic details
324 topicDetail := &sarama.TopicDetail{}
325 topicDetail.NumPartitions = int32(numPartition)
326 topicDetail.ReplicationFactor = int16(repFactor)
327 topicDetail.ConfigEntries = make(map[string]*string)
328 topicDetails := make(map[string]*sarama.TopicDetail)
329 topicDetails[topic.Name] = topicDetail
330
331 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
332 if err == sarama.ErrTopicAlreadyExists {
333 // Not an error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000334 logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500335 return nil
336 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000337 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500338 return err
339 }
340 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
341 // do so.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000342 logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500343 return nil
344}
345
khenaidooca301322019-01-09 23:06:32 -0500346//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
347// ensure no two go routines are performing operations on the same topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000348func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
khenaidooca301322019-01-09 23:06:32 -0500349 sc.lockTopic(topic)
350 defer sc.unLockTopic(topic)
351
Rohan Agrawal31f21802020-06-12 05:38:46 +0000352 return sc.createTopic(ctx, topic, numPartition, repFactor)
khenaidooca301322019-01-09 23:06:32 -0500353}
354
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500355//DeleteTopic removes a topic from the kafka Broker
Rohan Agrawal31f21802020-06-12 05:38:46 +0000356func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500357 sc.lockTopic(topic)
358 defer sc.unLockTopic(topic)
359
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500360 // Remove the topic from the broker
361 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
362 if err == sarama.ErrUnknownTopicOrPartition {
363 // Not an error as does not exist
Rohan Agrawal31f21802020-06-12 05:38:46 +0000364 logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500365 return nil
366 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000367 logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500368 return err
369 }
370
371 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000372 if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
373 logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500374 return err
375 }
376 return nil
377}
378
379// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
380// messages from that topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000381func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500382 sc.lockTopic(topic)
383 defer sc.unLockTopic(topic)
384
Rohan Agrawal31f21802020-06-12 05:38:46 +0000385 logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500386
387 // If a consumers already exist for that topic then resuse it
388 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000389 logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500390 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500391 ch := make(chan *ic.InterContainerMessage)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000392 sc.addChannelToConsumerChannelMap(ctx, topic, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500393 return ch, nil
394 }
395
396 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500397 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500398 var err error
399
400 // Use the consumerType option to figure out the type of consumer to launch
401 if sc.consumerType == PartitionConsumer {
402 if sc.autoCreateTopic {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000403 if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
404 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500405 return nil, err
406 }
407 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000408 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
409 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500410 return nil, err
411 }
412 } else if sc.consumerType == GroupCustomer {
413 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
414 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500415 //if sc.autoCreateTopic {
416 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000417 // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500418 // return nil, err
419 // }
420 //}
421 //groupId := sc.consumerGroupName
422 groupId := getGroupId(kvArgs...)
423 // Include the group prefix
424 if groupId != "" {
425 groupId = sc.consumerGroupPrefix + groupId
426 } else {
427 // Need to use a unique group Id per topic
428 groupId = sc.consumerGroupPrefix + topic.Name
429 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000430 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
431 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500432 return nil, err
433 }
khenaidooca301322019-01-09 23:06:32 -0500434
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500435 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000436 logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500437 return nil, errors.New("unknown-consumer-type")
438 }
439
440 return consumerListeningChannel, nil
441}
442
443//UnSubscribe unsubscribe a consumer from a given topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000444func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500445 sc.lockTopic(topic)
446 defer sc.unLockTopic(topic)
447
Rohan Agrawal31f21802020-06-12 05:38:46 +0000448 logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500449 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000450 if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
451 logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500452 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000453 if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
454 logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500455 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500456 return err
457}
458
Rohan Agrawal31f21802020-06-12 05:38:46 +0000459func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
npujar467fe752020-01-16 20:17:45 +0530460 sc.metadataCallback = callback
461}
462
Rohan Agrawal31f21802020-06-12 05:38:46 +0000463func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700464 // Post a consistent stream of liveness data to the channel,
465 // so that in a live state, the core does not timeout and
466 // send a forced liveness message. Production of liveness
467 // events to the channel is rate-limited by livenessChannelInterval.
Girish Kumarf8d4f8d2020-08-18 11:45:30 +0000468 sc.livenessMutex.Lock()
469 defer sc.livenessMutex.Unlock()
Scott Bakeree6a0872019-10-29 15:59:52 -0700470 if sc.liveness != nil {
471 if sc.alive != alive {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000472 logger.Info(ctx, "update-liveness-channel-because-change")
Scott Bakeree6a0872019-10-29 15:59:52 -0700473 sc.liveness <- alive
474 sc.lastLivenessTime = time.Now()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000475 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000476 logger.Info(ctx, "update-liveness-channel-because-interval")
Scott Bakeree6a0872019-10-29 15:59:52 -0700477 sc.liveness <- alive
478 sc.lastLivenessTime = time.Now()
479 }
480 }
481
482 // Only emit a log message when the state changes
483 if sc.alive != alive {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000484 logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
Scott Bakeree6a0872019-10-29 15:59:52 -0700485 sc.alive = alive
486 }
487}
488
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800489// Once unhealthy, we never go back
Rohan Agrawal31f21802020-06-12 05:38:46 +0000490func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800491 sc.healthy = false
Girish Kumarf8d4f8d2020-08-18 11:45:30 +0000492 sc.healthinessMutex.Lock()
493 defer sc.healthinessMutex.Unlock()
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800494 if sc.healthiness != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000495 logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800496 sc.healthiness <- sc.healthy
497 }
498}
499
Rohan Agrawal31f21802020-06-12 05:38:46 +0000500func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
Devmalya Paulc594bb32019-11-06 07:34:27 +0000501 // Sarama producers and consumers encapsulate the error inside
502 // a ProducerError or ConsumerError struct.
503 if prodError, ok := err.(*sarama.ProducerError); ok {
504 err = prodError.Err
505 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
506 err = consumerError.Err
507 }
508
509 // Sarama-Cluster will compose the error into a ClusterError struct,
510 // which we can't do a compare by reference. To handle that, we the
511 // best we can do is compare the error strings.
512
513 switch err.Error() {
514 case context.DeadlineExceeded.Error():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000515 logger.Info(ctx, "is-liveness-error-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000516 return true
517 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000518 logger.Info(ctx, "is-liveness-error-no-brokers")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000519 return true
520 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000521 logger.Info(ctx, "is-liveness-error-shutting-down")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000522 return true
523 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000524 logger.Info(ctx, "is-liveness-error-not-available")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000525 return true
526 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000527 logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000528 return true
529 }
530
531 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000532 logger.Info(ctx, "is-liveness-error-connection-refused")
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800533 return true
534 }
535
536 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000537 logger.Info(ctx, "is-liveness-error-io-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000538 return true
539 }
540
541 // Other errors shouldn't trigger a loss of liveness
542
Rohan Agrawal31f21802020-06-12 05:38:46 +0000543 logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000544
545 return false
546}
547
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500548// send formats and sends the request onto the kafka messaging bus.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000549func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500550
551 // Assert message is a proto message
552 var protoMsg proto.Message
553 var ok bool
554 // ascertain the value interface type is a proto.Message
555 if protoMsg, ok = msg.(proto.Message); !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000556 logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000557 return fmt.Errorf("not-a-proto-msg-%s", msg)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500558 }
559
560 var marshalled []byte
561 var err error
562 // Create the Sarama producer message
563 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000564 logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500565 return err
566 }
567 key := ""
568 if len(keys) > 0 {
569 key = keys[0] // Only the first key is relevant
570 }
571 kafkaMsg := &sarama.ProducerMessage{
572 Topic: topic.Name,
573 Key: sarama.StringEncoder(key),
574 Value: sarama.ByteEncoder(marshalled),
575 }
576
577 // Send message to kafka
578 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500579 // Wait for result
580 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
581 select {
582 case ok := <-sc.producer.Successes():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000583 logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
584 sc.updateLiveness(ctx, true)
khenaidoo90847922018-12-03 14:47:51 -0500585 case notOk := <-sc.producer.Errors():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000586 logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
587 if sc.isLivenessError(ctx, notOk) {
588 sc.updateLiveness(ctx, false)
Scott Bakeree6a0872019-10-29 15:59:52 -0700589 }
590 return notOk
591 }
592 return nil
593}
594
595// Enable the liveness monitor channel. This channel will report
596// a "true" or "false" on every publish, which indicates whether
597// or not the channel is still live. This channel is then picked up
598// by the service (i.e. rw_core / ro_core) to update readiness status
599// and/or take other actions.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000600func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
601 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Bakeree6a0872019-10-29 15:59:52 -0700602 if enable {
Girish Kumarf8d4f8d2020-08-18 11:45:30 +0000603 sc.livenessMutex.Lock()
604 defer sc.livenessMutex.Unlock()
Scott Bakeree6a0872019-10-29 15:59:52 -0700605 if sc.liveness == nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000606 logger.Info(ctx, "kafka-create-liveness-channel")
Scott Bakeree6a0872019-10-29 15:59:52 -0700607 // At least 1, so we can immediately post to it without blocking
608 // Setting a bigger number (10) allows the monitor to fall behind
609 // without blocking others. The monitor shouldn't really fall
610 // behind...
611 sc.liveness = make(chan bool, 10)
612 // post intial state to the channel
613 sc.liveness <- sc.alive
614 }
615 } else {
616 // TODO: Think about whether we need the ability to turn off
617 // liveness monitoring
618 panic("Turning off liveness reporting is not supported")
619 }
620 return sc.liveness
621}
622
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800623// Enable the Healthiness monitor channel. This channel will report "false"
624// if the kafka consumers die, or some other problem occurs which is
625// catastrophic that would require re-creating the client.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000626func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
627 logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800628 if enable {
Girish Kumarf8d4f8d2020-08-18 11:45:30 +0000629 sc.healthinessMutex.Lock()
630 defer sc.healthinessMutex.Unlock()
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800631 if sc.healthiness == nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000632 logger.Info(ctx, "kafka-create-healthiness-channel")
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800633 // At least 1, so we can immediately post to it without blocking
634 // Setting a bigger number (10) allows the monitor to fall behind
635 // without blocking others. The monitor shouldn't really fall
636 // behind...
637 sc.healthiness = make(chan bool, 10)
638 // post intial state to the channel
639 sc.healthiness <- sc.healthy
640 }
641 } else {
642 // TODO: Think about whether we need the ability to turn off
643 // liveness monitoring
644 panic("Turning off healthiness reporting is not supported")
645 }
646 return sc.healthiness
647}
648
Scott Bakeree6a0872019-10-29 15:59:52 -0700649// send an empty message on the liveness channel to check whether connectivity has
650// been restored.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000651func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
Scott Bakeree6a0872019-10-29 15:59:52 -0700652 if !sc.started {
653 return fmt.Errorf("SendLiveness() called while not started")
654 }
655
656 kafkaMsg := &sarama.ProducerMessage{
657 Topic: "_liveness_test",
658 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
659 }
660
661 // Send message to kafka
662 sc.producer.Input() <- kafkaMsg
663 // Wait for result
664 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
665 select {
666 case ok := <-sc.producer.Successes():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000667 logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
668 sc.updateLiveness(ctx, true)
Scott Bakeree6a0872019-10-29 15:59:52 -0700669 case notOk := <-sc.producer.Errors():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000670 logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
671 if sc.isLivenessError(ctx, notOk) {
672 sc.updateLiveness(ctx, false)
Scott Bakeree6a0872019-10-29 15:59:52 -0700673 }
khenaidoo90847922018-12-03 14:47:51 -0500674 return notOk
675 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500676 return nil
677}
678
khenaidooca301322019-01-09 23:06:32 -0500679// getGroupId returns the group id from the key-value args.
680func getGroupId(kvArgs ...*KVArg) string {
681 for _, arg := range kvArgs {
682 if arg.Key == GroupIdKey {
683 return arg.Value.(string)
684 }
685 }
686 return ""
687}
688
khenaidoo731697e2019-01-29 16:03:29 -0500689// getOffset returns the offset from the key-value args.
690func getOffset(kvArgs ...*KVArg) int64 {
691 for _, arg := range kvArgs {
692 if arg.Key == Offset {
693 return arg.Value.(int64)
694 }
695 }
696 return sarama.OffsetNewest
697}
698
Rohan Agrawal31f21802020-06-12 05:38:46 +0000699func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500700 config := sarama.NewConfig()
701 config.Version = sarama.V1_0_0_0
702
703 // Create a cluster Admin
704 var cAdmin sarama.ClusterAdmin
705 var err error
Neha Sharmad1387da2020-05-07 20:07:28 +0000706 if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000707 logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500708 return err
709 }
710 sc.cAdmin = cAdmin
711 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500712}
713
khenaidood2b6df92018-12-13 16:37:20 -0500714func (sc *SaramaClient) lockTopic(topic *Topic) {
715 sc.lockOfTopicLockMap.Lock()
716 if _, exist := sc.topicLockMap[topic.Name]; exist {
717 sc.lockOfTopicLockMap.Unlock()
718 sc.topicLockMap[topic.Name].Lock()
719 } else {
720 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
721 sc.lockOfTopicLockMap.Unlock()
722 sc.topicLockMap[topic.Name].Lock()
723 }
724}
725
726func (sc *SaramaClient) unLockTopic(topic *Topic) {
727 sc.lockOfTopicLockMap.Lock()
728 defer sc.lockOfTopicLockMap.Unlock()
729 if _, exist := sc.topicLockMap[topic.Name]; exist {
730 sc.topicLockMap[topic.Name].Unlock()
731 }
732}
733
khenaidoo43c82122018-11-22 18:38:28 -0500734func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
735 sc.lockTopicToConsumerChannelMap.Lock()
736 defer sc.lockTopicToConsumerChannelMap.Unlock()
737 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
738 sc.topicToConsumerChannelMap[id] = arg
739 }
740}
741
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500742func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400743 sc.lockTopicToConsumerChannelMap.RLock()
744 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500745
746 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
747 return consumerCh
748 }
749 return nil
750}
751
Rohan Agrawal31f21802020-06-12 05:38:46 +0000752func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500753 sc.lockTopicToConsumerChannelMap.Lock()
754 defer sc.lockTopicToConsumerChannelMap.Unlock()
755 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
756 consumerCh.channels = append(consumerCh.channels, ch)
757 return
758 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000759 logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500760}
761
762//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
Rohan Agrawal31f21802020-06-12 05:38:46 +0000763func closeConsumers(ctx context.Context, consumers []interface{}) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500764 var err error
765 for _, consumer := range consumers {
766 // Is it a partition consumers?
767 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
768 if errTemp := partionConsumer.Close(); errTemp != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000769 logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500770 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
771 // This can occur on race condition
772 err = nil
773 } else {
774 err = errTemp
775 }
776 }
777 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
778 if errTemp := groupConsumer.Close(); errTemp != nil {
779 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
780 // This can occur on race condition
781 err = nil
782 } else {
783 err = errTemp
784 }
785 }
786 }
787 }
788 return err
khenaidoo43c82122018-11-22 18:38:28 -0500789}
790
Rohan Agrawal31f21802020-06-12 05:38:46 +0000791func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500792 sc.lockTopicToConsumerChannelMap.Lock()
793 defer sc.lockTopicToConsumerChannelMap.Unlock()
794 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
795 // Channel will be closed in the removeChannel method
Rohan Agrawal31f21802020-06-12 05:38:46 +0000796 consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500797 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500798 if len(consumerCh.channels) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000799 logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
800 err := closeConsumers(ctx, consumerCh.consumers)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500801 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500802 delete(sc.topicToConsumerChannelMap, topic.Name)
803 return err
804 }
805 return nil
806 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000807 logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500808 return errors.New("topic-does-not-exist")
809}
810
Rohan Agrawal31f21802020-06-12 05:38:46 +0000811func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500812 sc.lockTopicToConsumerChannelMap.Lock()
813 defer sc.lockTopicToConsumerChannelMap.Unlock()
814 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
815 for _, ch := range consumerCh.channels {
816 // Channel will be closed in the removeChannel method
Rohan Agrawal31f21802020-06-12 05:38:46 +0000817 removeChannel(ctx, consumerCh.channels, ch)
khenaidoo43c82122018-11-22 18:38:28 -0500818 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000819 err := closeConsumers(ctx, consumerCh.consumers)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500820 //if err == sarama.ErrUnknownTopicOrPartition {
821 // // Not an error
822 // err = nil
823 //}
824 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500825 delete(sc.topicToConsumerChannelMap, topic.Name)
826 return err
827 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000828 logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500829 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500830}
831
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500832//createPublisher creates the publisher which is used to send a message onto kafka
Rohan Agrawal31f21802020-06-12 05:38:46 +0000833func (sc *SaramaClient) createPublisher(ctx context.Context) error {
khenaidoo43c82122018-11-22 18:38:28 -0500834 // This Creates the publisher
835 config := sarama.NewConfig()
Himani Chawla27f81212021-04-07 11:37:47 +0530836 config.Version = sarama.V1_0_0_0
khenaidoo43c82122018-11-22 18:38:28 -0500837 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500838 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
839 config.Producer.Flush.Messages = sc.producerFlushMessages
840 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
841 config.Producer.Return.Errors = sc.producerReturnErrors
842 config.Producer.Return.Successes = sc.producerReturnSuccess
843 //config.Producer.RequiredAcks = sarama.WaitForAll
844 config.Producer.RequiredAcks = sarama.WaitForLocal
845
Neha Sharmad1387da2020-05-07 20:07:28 +0000846 brokers := []string{sc.KafkaAddress}
khenaidoo43c82122018-11-22 18:38:28 -0500847
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500848 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000849 logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500850 return err
851 } else {
852 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500853 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000854 logger.Info(ctx, "Kafka-publisher-created")
khenaidoo43c82122018-11-22 18:38:28 -0500855 return nil
856}
857
Rohan Agrawal31f21802020-06-12 05:38:46 +0000858func (sc *SaramaClient) createConsumer(ctx context.Context) error {
khenaidoo43c82122018-11-22 18:38:28 -0500859 config := sarama.NewConfig()
Himani Chawla27f81212021-04-07 11:37:47 +0530860 config.Version = sarama.V1_0_0_0
khenaidoo43c82122018-11-22 18:38:28 -0500861 config.Consumer.Return.Errors = true
862 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500863 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
864 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500865 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530866 config.Metadata.Retry.Max = sc.metadataMaxRetry
Neha Sharmad1387da2020-05-07 20:07:28 +0000867 brokers := []string{sc.KafkaAddress}
khenaidoo43c82122018-11-22 18:38:28 -0500868
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500869 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000870 logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500871 return err
872 } else {
873 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500874 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000875 logger.Info(ctx, "Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500876 return nil
877}
878
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500879// createGroupConsumer creates a consumers group
Rohan Agrawal31f21802020-06-12 05:38:46 +0000880func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500881 config := scc.NewConfig()
Himani Chawla27f81212021-04-07 11:37:47 +0530882 config.Version = sarama.V1_0_0_0
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500883 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500884 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Bakeree6a0872019-10-29 15:59:52 -0700885 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
886 config.Consumer.Return.Errors = true
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500887 //config.Group.Return.Notifications = false
888 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
889 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500890 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500891 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
Neha Sharmad1387da2020-05-07 20:07:28 +0000892 brokers := []string{sc.KafkaAddress}
khenaidoo43c82122018-11-22 18:38:28 -0500893
khenaidoo43c82122018-11-22 18:38:28 -0500894 topics := []string{topic.Name}
895 var consumer *scc.Consumer
896 var err error
897
khenaidooca301322019-01-09 23:06:32 -0500898 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000899 logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500900 return nil, err
901 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000902 logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500903
904 //sc.groupConsumers[topic.Name] = consumer
905 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500906 return consumer, nil
907}
908
khenaidoo43c82122018-11-22 18:38:28 -0500909// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500910// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500911func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500912 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400913 sc.lockTopicToConsumerChannelMap.RLock()
khenaidoo43c82122018-11-22 18:38:28 -0500914 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500915 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500916 c <- protoMessage
917 }(ch)
918 }
npujar467fe752020-01-16 20:17:45 +0530919 sc.lockTopicToConsumerChannelMap.RUnlock()
920
921 if callback := sc.metadataCallback; callback != nil {
Scott Baker504b4802020-04-17 10:12:20 -0700922 ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
923 callback(protoMessage.Header.FromTopic, ts)
npujar467fe752020-01-16 20:17:45 +0530924 }
khenaidoo43c82122018-11-22 18:38:28 -0500925}
926
Rohan Agrawal31f21802020-06-12 05:38:46 +0000927func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
928 logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500929startloop:
930 for {
931 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500932 case err, ok := <-consumer.Errors():
933 if ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000934 if sc.isLivenessError(ctx, err) {
935 sc.updateLiveness(ctx, false)
936 logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
khenaidoo6e55d9e2019-12-12 18:26:26 -0500937 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500938 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500939 // Channel is closed
940 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500941 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500942 case msg, ok := <-consumer.Messages():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000943 //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500944 if !ok {
945 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500946 break startloop
947 }
948 msgBody := msg.Value
Rohan Agrawal31f21802020-06-12 05:38:46 +0000949 sc.updateLiveness(ctx, true)
950 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo79232702018-12-04 11:00:41 -0500951 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500952 if err := proto.Unmarshal(msgBody, icm); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000953 logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500954 continue
955 }
956 go sc.dispatchToConsumers(consumerChnls, icm)
957 case <-sc.doneCh:
Rohan Agrawal31f21802020-06-12 05:38:46 +0000958 logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500959 break startloop
960 }
961 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000962 logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
963 sc.setUnhealthy(ctx)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500964}
965
Rohan Agrawal31f21802020-06-12 05:38:46 +0000966func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
967 logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500968
969startloop:
970 for {
971 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500972 case err, ok := <-consumer.Errors():
973 if ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000974 if sc.isLivenessError(ctx, err) {
975 sc.updateLiveness(ctx, false)
Devmalya Paulc594bb32019-11-06 07:34:27 +0000976 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000977 logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500978 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000979 logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500980 // channel is closed
981 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500982 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500983 case msg, ok := <-consumer.Messages():
984 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000985 logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500986 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500987 break startloop
988 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000989 sc.updateLiveness(ctx, true)
990 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500991 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500992 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500993 if err := proto.Unmarshal(msgBody, icm); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000994 logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500995 continue
996 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500997 go sc.dispatchToConsumers(consumerChnls, icm)
998 consumer.MarkOffset(msg, "")
999 case ntf := <-consumer.Notifications():
Rohan Agrawal31f21802020-06-12 05:38:46 +00001000 logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -05001001 case <-sc.doneCh:
Rohan Agrawal31f21802020-06-12 05:38:46 +00001002 logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001003 break startloop
1004 }
1005 }
Rohan Agrawal31f21802020-06-12 05:38:46 +00001006 logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
1007 sc.setUnhealthy(ctx)
khenaidoo43c82122018-11-22 18:38:28 -05001008}
1009
Rohan Agrawal31f21802020-06-12 05:38:46 +00001010func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
1011 logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001012 var consumerCh *consumerChannels
1013 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001014 logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001015 return errors.New("consumers-not-exist")
1016 }
1017 // For each consumer listening for that topic, start a consumption loop
1018 for _, consumer := range consumerCh.consumers {
1019 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001020 go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001021 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001022 go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001023 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001024 logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001025 return errors.New("invalid-consumer")
1026 }
1027 }
1028 return nil
1029}
1030
1031//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1032//// for that topic. It also starts the routine that listens for messages on that topic.
Rohan Agrawal31f21802020-06-12 05:38:46 +00001033func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001034 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -05001035 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001036
Rohan Agrawal31f21802020-06-12 05:38:46 +00001037 if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
1038 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001039 return nil, err
1040 }
1041
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001042 consumersIf := make([]interface{}, 0)
1043 for _, pConsumer := range pConsumers {
1044 consumersIf = append(consumersIf, pConsumer)
1045 }
1046
1047 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -05001048 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001049 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -05001050 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001051 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -05001052 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -05001053 }
1054
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001055 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -05001056 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1057
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001058 //Start a consumers to listen on that specific topic
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001059 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001060 if err := sc.startConsumers(ctx, topic); err != nil {
1061 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001062 "topic": topic,
1063 "error": err})
1064 }
1065 }()
khenaidoo43c82122018-11-22 18:38:28 -05001066
1067 return consumerListeningChannel, nil
1068}
1069
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001070// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1071// for that topic. It also starts the routine that listens for messages on that topic.
Rohan Agrawal31f21802020-06-12 05:38:46 +00001072func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001073 // TODO: Replace this development partition consumers with a group consumers
1074 var pConsumer *scc.Consumer
1075 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +00001076 if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1077 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001078 return nil, err
1079 }
1080 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1081 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001082 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001083 cc := &consumerChannels{
1084 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -05001085 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001086 }
1087
1088 // Add the consumers channel to the map
1089 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1090
1091 //Start a consumers to listen on that specific topic
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001092 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001093 if err := sc.startConsumers(ctx, topic); err != nil {
1094 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001095 "topic": topic,
1096 "error": err})
1097 }
1098 }()
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001099
1100 return consumerListeningChannel, nil
1101}
1102
Rohan Agrawal31f21802020-06-12 05:38:46 +00001103func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1104 logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001105 partitionList, err := sc.consumer.Partitions(topic.Name)
1106 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001107 logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001108 return nil, err
1109 }
1110
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001111 pConsumers := make([]sarama.PartitionConsumer, 0)
1112 for _, partition := range partitionList {
1113 var pConsumer sarama.PartitionConsumer
1114 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001115 logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001116 return nil, err
1117 }
1118 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -05001119 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001120 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -05001121}
1122
Rohan Agrawal31f21802020-06-12 05:38:46 +00001123func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -05001124 var i int
khenaidoo79232702018-12-04 11:00:41 -05001125 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -05001126 for i, channel = range channels {
1127 if channel == ch {
1128 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1129 close(channel)
Rohan Agrawal31f21802020-06-12 05:38:46 +00001130 logger.Debug(ctx, "channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -05001131 return channels[:len(channels)-1]
1132 }
1133 }
1134 return channels
1135}
khenaidoo7ff26c72019-01-16 14:55:48 -05001136
khenaidoo7ff26c72019-01-16 14:55:48 -05001137func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1138 sc.lockOfGroupConsumers.Lock()
1139 defer sc.lockOfGroupConsumers.Unlock()
1140 if _, exist := sc.groupConsumers[topic]; !exist {
1141 sc.groupConsumers[topic] = consumer
1142 }
1143}
1144
Rohan Agrawal31f21802020-06-12 05:38:46 +00001145func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
khenaidoo7ff26c72019-01-16 14:55:48 -05001146 sc.lockOfGroupConsumers.Lock()
1147 defer sc.lockOfGroupConsumers.Unlock()
1148 if _, exist := sc.groupConsumers[topic]; exist {
1149 consumer := sc.groupConsumers[topic]
1150 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -04001151 if err := consumer.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001152 logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -05001153 return err
1154 }
1155 }
1156 return nil
1157}