blob: 185f6ec6f247d1718fe0dd4cbba7d3b3a996fa81 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080019 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070020 "errors"
21 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022 "strings"
23 "sync"
24 "time"
25
Scott Baker2c1c4822019-10-16 11:02:41 -070026 "github.com/Shopify/sarama"
27 scc "github.com/bsm/sarama-cluster"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080028 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070029 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
khenaidoo26721882021-08-11 17:42:52 -040031 "github.com/opencord/voltha-lib-go/v7/pkg/log"
Scott Baker2c1c4822019-10-16 11:02:41 -070032)
33
Scott Baker2c1c4822019-10-16 11:02:41 -070034// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
35// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
36//consumer or a group consumer
37type consumerChannels struct {
38 consumers []interface{}
khenaidoo26721882021-08-11 17:42:52 -040039 channels []chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -070040}
41
Kent Hagermanccfa2132019-12-17 13:29:34 -050042// static check to ensure SaramaClient implements Client
43var _ Client = &SaramaClient{}
44
Scott Baker2c1c4822019-10-16 11:02:41 -070045// SaramaClient represents the messaging proxy
46type SaramaClient struct {
47 cAdmin sarama.ClusterAdmin
Neha Sharmadd9af392020-04-28 09:03:57 +000048 KafkaAddress string
Scott Baker2c1c4822019-10-16 11:02:41 -070049 producer sarama.AsyncProducer
50 consumer sarama.Consumer
51 groupConsumers map[string]*scc.Consumer
52 lockOfGroupConsumers sync.RWMutex
53 consumerGroupPrefix string
54 consumerType int
55 consumerGroupName string
56 producerFlushFrequency int
57 producerFlushMessages int
58 producerFlushMaxmessages int
59 producerRetryMax int
60 producerRetryBackOff time.Duration
61 producerReturnSuccess bool
62 producerReturnErrors bool
63 consumerMaxwait int
64 maxProcessingTime int
65 numPartitions int
66 numReplicas int
67 autoCreateTopic bool
68 doneCh chan int
Scott Baker84a55ce2020-04-17 10:11:30 -070069 metadataCallback func(fromTopic string, timestamp time.Time)
Scott Baker2c1c4822019-10-16 11:02:41 -070070 topicToConsumerChannelMap map[string]*consumerChannels
71 lockTopicToConsumerChannelMap sync.RWMutex
72 topicLockMap map[string]*sync.RWMutex
73 lockOfTopicLockMap sync.RWMutex
74 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070075 alive bool
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -070076 livenessMutex sync.Mutex
Scott Baker104b67d2019-10-29 15:56:27 -070077 liveness chan bool
78 livenessChannelInterval time.Duration
79 lastLivenessTime time.Time
80 started bool
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -070081 healthinessMutex sync.Mutex
Scott Baker0fef6982019-12-12 09:49:42 -080082 healthy bool
83 healthiness chan bool
Scott Baker2c1c4822019-10-16 11:02:41 -070084}
85
86type SaramaClientOption func(*SaramaClient)
87
Neha Sharmadd9af392020-04-28 09:03:57 +000088func Address(address string) SaramaClientOption {
Scott Baker2c1c4822019-10-16 11:02:41 -070089 return func(args *SaramaClient) {
Neha Sharmadd9af392020-04-28 09:03:57 +000090 args.KafkaAddress = address
Scott Baker2c1c4822019-10-16 11:02:41 -070091 }
92}
93
94func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
Scott Baker104b67d2019-10-29 15:56:27 -0700190func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.livenessChannelInterval = opt
193 }
194}
195
Scott Baker2c1c4822019-10-16 11:02:41 -0700196func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
197 client := &SaramaClient{
Neha Sharmadd9af392020-04-28 09:03:57 +0000198 KafkaAddress: DefaultKafkaAddress,
Scott Baker2c1c4822019-10-16 11:02:41 -0700199 }
200 client.consumerType = DefaultConsumerType
201 client.producerFlushFrequency = DefaultProducerFlushFrequency
202 client.producerFlushMessages = DefaultProducerFlushMessages
203 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
204 client.producerReturnErrors = DefaultProducerReturnErrors
205 client.producerReturnSuccess = DefaultProducerReturnSuccess
206 client.producerRetryMax = DefaultProducerRetryMax
207 client.producerRetryBackOff = DefaultProducerRetryBackoff
208 client.consumerMaxwait = DefaultConsumerMaxwait
209 client.maxProcessingTime = DefaultMaxProcessingTime
210 client.numPartitions = DefaultNumberPartitions
211 client.numReplicas = DefaultNumberReplicas
212 client.autoCreateTopic = DefaultAutoCreateTopic
213 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700214 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700215
216 for _, option := range opts {
217 option(client)
218 }
219
220 client.groupConsumers = make(map[string]*scc.Consumer)
221
222 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
223 client.topicLockMap = make(map[string]*sync.RWMutex)
224 client.lockOfTopicLockMap = sync.RWMutex{}
225 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700226
Scott Baker0fef6982019-12-12 09:49:42 -0800227 // healthy and alive until proven otherwise
Scott Baker104b67d2019-10-29 15:56:27 -0700228 client.alive = true
Scott Baker0fef6982019-12-12 09:49:42 -0800229 client.healthy = true
Scott Baker104b67d2019-10-29 15:56:27 -0700230
Scott Baker2c1c4822019-10-16 11:02:41 -0700231 return client
232}
233
Neha Sharma94f16a92020-06-26 04:17:55 +0000234func (sc *SaramaClient) Start(ctx context.Context) error {
235 logger.Info(ctx, "Starting-kafka-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700236
237 // Create the Done channel
238 sc.doneCh = make(chan int, 1)
239
240 var err error
241
242 // Add a cleanup in case of failure to startup
243 defer func() {
244 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000245 sc.Stop(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700246 }
247 }()
248
249 // Create the Cluster Admin
Neha Sharma94f16a92020-06-26 04:17:55 +0000250 if err = sc.createClusterAdmin(ctx); err != nil {
251 logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700252 return err
253 }
254
255 // Create the Publisher
Neha Sharma94f16a92020-06-26 04:17:55 +0000256 if err := sc.createPublisher(ctx); err != nil {
257 logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700258 return err
259 }
260
261 if sc.consumerType == DefaultConsumerType {
262 // Create the master consumers
Neha Sharma94f16a92020-06-26 04:17:55 +0000263 if err := sc.createConsumer(ctx); err != nil {
264 logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700265 return err
266 }
267 }
268
269 // Create the topic to consumers/channel map
270 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
271
Neha Sharma94f16a92020-06-26 04:17:55 +0000272 logger.Info(ctx, "kafka-sarama-client-started")
Scott Baker2c1c4822019-10-16 11:02:41 -0700273
Scott Baker104b67d2019-10-29 15:56:27 -0700274 sc.started = true
275
Scott Baker2c1c4822019-10-16 11:02:41 -0700276 return nil
277}
278
Neha Sharma94f16a92020-06-26 04:17:55 +0000279func (sc *SaramaClient) Stop(ctx context.Context) {
280 logger.Info(ctx, "stopping-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700281
Scott Baker104b67d2019-10-29 15:56:27 -0700282 sc.started = false
283
Scott Baker2c1c4822019-10-16 11:02:41 -0700284 //Send a message over the done channel to close all long running routines
285 sc.doneCh <- 1
286
287 if sc.producer != nil {
288 if err := sc.producer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000289 logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700290 }
291 }
292
293 if sc.consumer != nil {
294 if err := sc.consumer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000295 logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700296 }
297 }
298
299 for key, val := range sc.groupConsumers {
Neha Sharma94f16a92020-06-26 04:17:55 +0000300 logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700301 if err := val.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000302 logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700303 }
304 }
305
306 if sc.cAdmin != nil {
307 if err := sc.cAdmin.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000308 logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700309 }
310 }
311
312 //TODO: Clear the consumers map
313 //sc.clearConsumerChannelMap()
314
Neha Sharma94f16a92020-06-26 04:17:55 +0000315 logger.Info(ctx, "sarama-client-stopped")
Scott Baker2c1c4822019-10-16 11:02:41 -0700316}
317
318//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
319// the invoking function must hold the lock
Neha Sharma94f16a92020-06-26 04:17:55 +0000320func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700321 // Set the topic details
322 topicDetail := &sarama.TopicDetail{}
323 topicDetail.NumPartitions = int32(numPartition)
324 topicDetail.ReplicationFactor = int16(repFactor)
325 topicDetail.ConfigEntries = make(map[string]*string)
326 topicDetails := make(map[string]*sarama.TopicDetail)
327 topicDetails[topic.Name] = topicDetail
328
329 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
330 if err == sarama.ErrTopicAlreadyExists {
331 // Not an error
Neha Sharma94f16a92020-06-26 04:17:55 +0000332 logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700333 return nil
334 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000335 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700336 return err
337 }
338 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
339 // do so.
Neha Sharma94f16a92020-06-26 04:17:55 +0000340 logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
Scott Baker2c1c4822019-10-16 11:02:41 -0700341 return nil
342}
343
344//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
345// ensure no two go routines are performing operations on the same topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000346func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700347 sc.lockTopic(topic)
348 defer sc.unLockTopic(topic)
349
Neha Sharma94f16a92020-06-26 04:17:55 +0000350 return sc.createTopic(ctx, topic, numPartition, repFactor)
Scott Baker2c1c4822019-10-16 11:02:41 -0700351}
352
353//DeleteTopic removes a topic from the kafka Broker
Neha Sharma94f16a92020-06-26 04:17:55 +0000354func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700355 sc.lockTopic(topic)
356 defer sc.unLockTopic(topic)
357
358 // Remove the topic from the broker
359 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
360 if err == sarama.ErrUnknownTopicOrPartition {
361 // Not an error as does not exist
Neha Sharma94f16a92020-06-26 04:17:55 +0000362 logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700363 return nil
364 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000365 logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700366 return err
367 }
368
369 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
Neha Sharma94f16a92020-06-26 04:17:55 +0000370 if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
371 logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700372 return err
373 }
374 return nil
375}
376
377// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
378// messages from that topic
khenaidoo26721882021-08-11 17:42:52 -0400379func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700380 sc.lockTopic(topic)
381 defer sc.unLockTopic(topic)
382
Neha Sharma94f16a92020-06-26 04:17:55 +0000383 logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700384
385 // If a consumers already exist for that topic then resuse it
386 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000387 logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700388 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo26721882021-08-11 17:42:52 -0400389 ch := make(chan proto.Message)
Neha Sharma94f16a92020-06-26 04:17:55 +0000390 sc.addChannelToConsumerChannelMap(ctx, topic, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700391 return ch, nil
392 }
393
394 // Register for the topic and set it up
khenaidoo26721882021-08-11 17:42:52 -0400395 var consumerListeningChannel chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -0700396 var err error
397
398 // Use the consumerType option to figure out the type of consumer to launch
399 if sc.consumerType == PartitionConsumer {
400 if sc.autoCreateTopic {
Neha Sharma94f16a92020-06-26 04:17:55 +0000401 if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
402 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700403 return nil, err
404 }
405 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000406 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
407 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700408 return nil, err
409 }
410 } else if sc.consumerType == GroupCustomer {
411 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
412 // does not consume from a precreated topic in some scenarios
413 //if sc.autoCreateTopic {
414 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000415 // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700416 // return nil, err
417 // }
418 //}
419 //groupId := sc.consumerGroupName
420 groupId := getGroupId(kvArgs...)
421 // Include the group prefix
422 if groupId != "" {
423 groupId = sc.consumerGroupPrefix + groupId
424 } else {
425 // Need to use a unique group Id per topic
426 groupId = sc.consumerGroupPrefix + topic.Name
427 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000428 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
429 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700430 return nil, err
431 }
432
433 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000434 logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700435 return nil, errors.New("unknown-consumer-type")
436 }
437
438 return consumerListeningChannel, nil
439}
440
441//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo26721882021-08-11 17:42:52 -0400442func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700443 sc.lockTopic(topic)
444 defer sc.unLockTopic(topic)
445
Neha Sharma94f16a92020-06-26 04:17:55 +0000446 logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700447 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000448 if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
449 logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700450 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000451 if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
452 logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700453 }
454 return err
455}
456
Neha Sharma94f16a92020-06-26 04:17:55 +0000457func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
Kent Hagermanccfa2132019-12-17 13:29:34 -0500458 sc.metadataCallback = callback
459}
460
Neha Sharma94f16a92020-06-26 04:17:55 +0000461func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
Scott Baker104b67d2019-10-29 15:56:27 -0700462 // Post a consistent stream of liveness data to the channel,
463 // so that in a live state, the core does not timeout and
464 // send a forced liveness message. Production of liveness
465 // events to the channel is rate-limited by livenessChannelInterval.
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700466 sc.livenessMutex.Lock()
467 defer sc.livenessMutex.Unlock()
Scott Baker104b67d2019-10-29 15:56:27 -0700468 if sc.liveness != nil {
469 if sc.alive != alive {
Neha Sharma94f16a92020-06-26 04:17:55 +0000470 logger.Info(ctx, "update-liveness-channel-because-change")
Scott Baker104b67d2019-10-29 15:56:27 -0700471 sc.liveness <- alive
472 sc.lastLivenessTime = time.Now()
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800473 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Neha Sharma94f16a92020-06-26 04:17:55 +0000474 logger.Info(ctx, "update-liveness-channel-because-interval")
Scott Baker104b67d2019-10-29 15:56:27 -0700475 sc.liveness <- alive
476 sc.lastLivenessTime = time.Now()
477 }
478 }
479
480 // Only emit a log message when the state changes
481 if sc.alive != alive {
Neha Sharma94f16a92020-06-26 04:17:55 +0000482 logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
Scott Baker104b67d2019-10-29 15:56:27 -0700483 sc.alive = alive
484 }
485}
486
Scott Baker0fef6982019-12-12 09:49:42 -0800487// Once unhealthy, we never go back
Neha Sharma94f16a92020-06-26 04:17:55 +0000488func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
Scott Baker0fef6982019-12-12 09:49:42 -0800489 sc.healthy = false
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700490 sc.healthinessMutex.Lock()
491 defer sc.healthinessMutex.Unlock()
Scott Baker0fef6982019-12-12 09:49:42 -0800492 if sc.healthiness != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000493 logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker0fef6982019-12-12 09:49:42 -0800494 sc.healthiness <- sc.healthy
495 }
496}
497
Neha Sharma94f16a92020-06-26 04:17:55 +0000498func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800499 // Sarama producers and consumers encapsulate the error inside
500 // a ProducerError or ConsumerError struct.
501 if prodError, ok := err.(*sarama.ProducerError); ok {
502 err = prodError.Err
503 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
504 err = consumerError.Err
505 }
506
507 // Sarama-Cluster will compose the error into a ClusterError struct,
508 // which we can't do a compare by reference. To handle that, we the
509 // best we can do is compare the error strings.
510
511 switch err.Error() {
512 case context.DeadlineExceeded.Error():
Neha Sharma94f16a92020-06-26 04:17:55 +0000513 logger.Info(ctx, "is-liveness-error-timeout")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800514 return true
515 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Neha Sharma94f16a92020-06-26 04:17:55 +0000516 logger.Info(ctx, "is-liveness-error-no-brokers")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800517 return true
518 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Neha Sharma94f16a92020-06-26 04:17:55 +0000519 logger.Info(ctx, "is-liveness-error-shutting-down")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800520 return true
521 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Neha Sharma94f16a92020-06-26 04:17:55 +0000522 logger.Info(ctx, "is-liveness-error-not-available")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800523 return true
524 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Neha Sharma94f16a92020-06-26 04:17:55 +0000525 logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800526 return true
527 }
528
529 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Neha Sharma94f16a92020-06-26 04:17:55 +0000530 logger.Info(ctx, "is-liveness-error-connection-refused")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800531 return true
532 }
533
Scott Baker718bee02020-01-07 09:52:02 -0800534 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Neha Sharma94f16a92020-06-26 04:17:55 +0000535 logger.Info(ctx, "is-liveness-error-io-timeout")
Scott Baker718bee02020-01-07 09:52:02 -0800536 return true
537 }
538
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800539 // Other errors shouldn't trigger a loss of liveness
540
Neha Sharma94f16a92020-06-26 04:17:55 +0000541 logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800542
543 return false
544}
545
Scott Baker2c1c4822019-10-16 11:02:41 -0700546// send formats and sends the request onto the kafka messaging bus.
Neha Sharma94f16a92020-06-26 04:17:55 +0000547func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700548
549 // Assert message is a proto message
550 var protoMsg proto.Message
551 var ok bool
552 // ascertain the value interface type is a proto.Message
553 if protoMsg, ok = msg.(proto.Message); !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000554 logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800555 return fmt.Errorf("not-a-proto-msg-%s", msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700556 }
557
558 var marshalled []byte
559 var err error
560 // Create the Sarama producer message
561 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000562 logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700563 return err
564 }
565 key := ""
566 if len(keys) > 0 {
567 key = keys[0] // Only the first key is relevant
568 }
569 kafkaMsg := &sarama.ProducerMessage{
570 Topic: topic.Name,
571 Key: sarama.StringEncoder(key),
572 Value: sarama.ByteEncoder(marshalled),
573 }
574
575 // Send message to kafka
576 sc.producer.Input() <- kafkaMsg
577 // Wait for result
578 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
579 select {
580 case ok := <-sc.producer.Successes():
Neha Sharma94f16a92020-06-26 04:17:55 +0000581 logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
582 sc.updateLiveness(ctx, true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700583 case notOk := <-sc.producer.Errors():
Neha Sharma94f16a92020-06-26 04:17:55 +0000584 logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
585 if sc.isLivenessError(ctx, notOk) {
586 sc.updateLiveness(ctx, false)
Scott Baker104b67d2019-10-29 15:56:27 -0700587 }
588 return notOk
589 }
590 return nil
591}
592
593// Enable the liveness monitor channel. This channel will report
594// a "true" or "false" on every publish, which indicates whether
595// or not the channel is still live. This channel is then picked up
596// by the service (i.e. rw_core / ro_core) to update readiness status
597// and/or take other actions.
Neha Sharma94f16a92020-06-26 04:17:55 +0000598func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
599 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Baker104b67d2019-10-29 15:56:27 -0700600 if enable {
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700601 sc.livenessMutex.Lock()
602 defer sc.livenessMutex.Unlock()
Scott Baker104b67d2019-10-29 15:56:27 -0700603 if sc.liveness == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000604 logger.Info(ctx, "kafka-create-liveness-channel")
Scott Baker104b67d2019-10-29 15:56:27 -0700605 // At least 1, so we can immediately post to it without blocking
606 // Setting a bigger number (10) allows the monitor to fall behind
607 // without blocking others. The monitor shouldn't really fall
608 // behind...
609 sc.liveness = make(chan bool, 10)
khenaidoo26721882021-08-11 17:42:52 -0400610 // post initial state to the channel
Scott Baker104b67d2019-10-29 15:56:27 -0700611 sc.liveness <- sc.alive
612 }
613 } else {
614 // TODO: Think about whether we need the ability to turn off
615 // liveness monitoring
616 panic("Turning off liveness reporting is not supported")
617 }
618 return sc.liveness
619}
620
Scott Baker0fef6982019-12-12 09:49:42 -0800621// Enable the Healthiness monitor channel. This channel will report "false"
622// if the kafka consumers die, or some other problem occurs which is
623// catastrophic that would require re-creating the client.
Neha Sharma94f16a92020-06-26 04:17:55 +0000624func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
625 logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker0fef6982019-12-12 09:49:42 -0800626 if enable {
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700627 sc.healthinessMutex.Lock()
628 defer sc.healthinessMutex.Unlock()
Scott Baker0fef6982019-12-12 09:49:42 -0800629 if sc.healthiness == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000630 logger.Info(ctx, "kafka-create-healthiness-channel")
Scott Baker0fef6982019-12-12 09:49:42 -0800631 // At least 1, so we can immediately post to it without blocking
632 // Setting a bigger number (10) allows the monitor to fall behind
633 // without blocking others. The monitor shouldn't really fall
634 // behind...
635 sc.healthiness = make(chan bool, 10)
khenaidoo26721882021-08-11 17:42:52 -0400636 // post initial state to the channel
Scott Baker0fef6982019-12-12 09:49:42 -0800637 sc.healthiness <- sc.healthy
638 }
639 } else {
640 // TODO: Think about whether we need the ability to turn off
641 // liveness monitoring
642 panic("Turning off healthiness reporting is not supported")
643 }
644 return sc.healthiness
645}
646
Scott Baker104b67d2019-10-29 15:56:27 -0700647// send an empty message on the liveness channel to check whether connectivity has
648// been restored.
Neha Sharma94f16a92020-06-26 04:17:55 +0000649func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
Scott Baker104b67d2019-10-29 15:56:27 -0700650 if !sc.started {
651 return fmt.Errorf("SendLiveness() called while not started")
652 }
653
654 kafkaMsg := &sarama.ProducerMessage{
655 Topic: "_liveness_test",
656 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
657 }
658
659 // Send message to kafka
660 sc.producer.Input() <- kafkaMsg
661 // Wait for result
662 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
663 select {
664 case ok := <-sc.producer.Successes():
Neha Sharma94f16a92020-06-26 04:17:55 +0000665 logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
666 sc.updateLiveness(ctx, true)
Scott Baker104b67d2019-10-29 15:56:27 -0700667 case notOk := <-sc.producer.Errors():
Neha Sharma94f16a92020-06-26 04:17:55 +0000668 logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
669 if sc.isLivenessError(ctx, notOk) {
670 sc.updateLiveness(ctx, false)
Scott Baker104b67d2019-10-29 15:56:27 -0700671 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700672 return notOk
673 }
674 return nil
675}
676
677// getGroupId returns the group id from the key-value args.
678func getGroupId(kvArgs ...*KVArg) string {
679 for _, arg := range kvArgs {
680 if arg.Key == GroupIdKey {
681 return arg.Value.(string)
682 }
683 }
684 return ""
685}
686
687// getOffset returns the offset from the key-value args.
688func getOffset(kvArgs ...*KVArg) int64 {
689 for _, arg := range kvArgs {
690 if arg.Key == Offset {
691 return arg.Value.(int64)
692 }
693 }
694 return sarama.OffsetNewest
695}
696
Neha Sharma94f16a92020-06-26 04:17:55 +0000697func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700698 config := sarama.NewConfig()
699 config.Version = sarama.V1_0_0_0
700
701 // Create a cluster Admin
702 var cAdmin sarama.ClusterAdmin
703 var err error
Neha Sharmadd9af392020-04-28 09:03:57 +0000704 if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000705 logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
Scott Baker2c1c4822019-10-16 11:02:41 -0700706 return err
707 }
708 sc.cAdmin = cAdmin
709 return nil
710}
711
712func (sc *SaramaClient) lockTopic(topic *Topic) {
713 sc.lockOfTopicLockMap.Lock()
714 if _, exist := sc.topicLockMap[topic.Name]; exist {
715 sc.lockOfTopicLockMap.Unlock()
716 sc.topicLockMap[topic.Name].Lock()
717 } else {
718 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
719 sc.lockOfTopicLockMap.Unlock()
720 sc.topicLockMap[topic.Name].Lock()
721 }
722}
723
724func (sc *SaramaClient) unLockTopic(topic *Topic) {
725 sc.lockOfTopicLockMap.Lock()
726 defer sc.lockOfTopicLockMap.Unlock()
727 if _, exist := sc.topicLockMap[topic.Name]; exist {
728 sc.topicLockMap[topic.Name].Unlock()
729 }
730}
731
732func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
733 sc.lockTopicToConsumerChannelMap.Lock()
734 defer sc.lockTopicToConsumerChannelMap.Unlock()
735 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
736 sc.topicToConsumerChannelMap[id] = arg
737 }
738}
739
Scott Baker2c1c4822019-10-16 11:02:41 -0700740func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
741 sc.lockTopicToConsumerChannelMap.RLock()
742 defer sc.lockTopicToConsumerChannelMap.RUnlock()
743
744 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
745 return consumerCh
746 }
747 return nil
748}
749
khenaidoo26721882021-08-11 17:42:52 -0400750func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700751 sc.lockTopicToConsumerChannelMap.Lock()
752 defer sc.lockTopicToConsumerChannelMap.Unlock()
753 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
754 consumerCh.channels = append(consumerCh.channels, ch)
755 return
756 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000757 logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700758}
759
760//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
Neha Sharma94f16a92020-06-26 04:17:55 +0000761func closeConsumers(ctx context.Context, consumers []interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700762 var err error
763 for _, consumer := range consumers {
764 // Is it a partition consumers?
765 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
766 if errTemp := partionConsumer.Close(); errTemp != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000767 logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
Scott Baker2c1c4822019-10-16 11:02:41 -0700768 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
769 // This can occur on race condition
770 err = nil
771 } else {
772 err = errTemp
773 }
774 }
775 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
776 if errTemp := groupConsumer.Close(); errTemp != nil {
777 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
778 // This can occur on race condition
779 err = nil
780 } else {
781 err = errTemp
782 }
783 }
784 }
785 }
786 return err
787}
788
khenaidoo26721882021-08-11 17:42:52 -0400789func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700790 sc.lockTopicToConsumerChannelMap.Lock()
791 defer sc.lockTopicToConsumerChannelMap.Unlock()
792 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
793 // Channel will be closed in the removeChannel method
Neha Sharma94f16a92020-06-26 04:17:55 +0000794 consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700795 // If there are no more channels then we can close the consumers itself
796 if len(consumerCh.channels) == 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000797 logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
798 err := closeConsumers(ctx, consumerCh.consumers)
Scott Baker2c1c4822019-10-16 11:02:41 -0700799 //err := consumerCh.consumers.Close()
800 delete(sc.topicToConsumerChannelMap, topic.Name)
801 return err
802 }
803 return nil
804 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000805 logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700806 return errors.New("topic-does-not-exist")
807}
808
Neha Sharma94f16a92020-06-26 04:17:55 +0000809func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700810 sc.lockTopicToConsumerChannelMap.Lock()
811 defer sc.lockTopicToConsumerChannelMap.Unlock()
812 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
813 for _, ch := range consumerCh.channels {
814 // Channel will be closed in the removeChannel method
Neha Sharma94f16a92020-06-26 04:17:55 +0000815 removeChannel(ctx, consumerCh.channels, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700816 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000817 err := closeConsumers(ctx, consumerCh.consumers)
Scott Baker2c1c4822019-10-16 11:02:41 -0700818 //if err == sarama.ErrUnknownTopicOrPartition {
819 // // Not an error
820 // err = nil
821 //}
822 //err := consumerCh.consumers.Close()
823 delete(sc.topicToConsumerChannelMap, topic.Name)
824 return err
825 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000826 logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700827 return nil
828}
829
Scott Baker2c1c4822019-10-16 11:02:41 -0700830//createPublisher creates the publisher which is used to send a message onto kafka
Neha Sharma94f16a92020-06-26 04:17:55 +0000831func (sc *SaramaClient) createPublisher(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700832 // This Creates the publisher
833 config := sarama.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530834 config.Version = sarama.V1_0_0_0
Scott Baker2c1c4822019-10-16 11:02:41 -0700835 config.Producer.Partitioner = sarama.NewRandomPartitioner
836 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
837 config.Producer.Flush.Messages = sc.producerFlushMessages
838 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
839 config.Producer.Return.Errors = sc.producerReturnErrors
840 config.Producer.Return.Successes = sc.producerReturnSuccess
841 //config.Producer.RequiredAcks = sarama.WaitForAll
842 config.Producer.RequiredAcks = sarama.WaitForLocal
843
Neha Sharmadd9af392020-04-28 09:03:57 +0000844 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700845
846 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000847 logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700848 return err
849 } else {
850 sc.producer = producer
851 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000852 logger.Info(ctx, "Kafka-publisher-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700853 return nil
854}
855
Neha Sharma94f16a92020-06-26 04:17:55 +0000856func (sc *SaramaClient) createConsumer(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700857 config := sarama.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530858 config.Version = sarama.V1_0_0_0
Scott Baker2c1c4822019-10-16 11:02:41 -0700859 config.Consumer.Return.Errors = true
860 config.Consumer.Fetch.Min = 1
861 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
862 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
863 config.Consumer.Offsets.Initial = sarama.OffsetNewest
864 config.Metadata.Retry.Max = sc.metadataMaxRetry
Neha Sharmadd9af392020-04-28 09:03:57 +0000865 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700866
867 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000868 logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700869 return err
870 } else {
871 sc.consumer = consumer
872 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000873 logger.Info(ctx, "Kafka-consumers-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700874 return nil
875}
876
877// createGroupConsumer creates a consumers group
Neha Sharma94f16a92020-06-26 04:17:55 +0000878func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700879 config := scc.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530880 config.Version = sarama.V1_0_0_0
Scott Baker2c1c4822019-10-16 11:02:41 -0700881 config.ClientID = uuid.New().String()
882 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700883 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
884 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700885 //config.Group.Return.Notifications = false
886 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
887 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
888 config.Consumer.Offsets.Initial = initialOffset
889 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
Neha Sharmadd9af392020-04-28 09:03:57 +0000890 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700891
892 topics := []string{topic.Name}
893 var consumer *scc.Consumer
894 var err error
895
896 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000897 logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700898 return nil, err
899 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000900 logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700901
902 //sc.groupConsumers[topic.Name] = consumer
903 sc.addToGroupConsumers(topic.Name, consumer)
904 return consumer, nil
905}
906
907// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
908// topic via the unique channel each subscriber received during subscription
khenaidoo26721882021-08-11 17:42:52 -0400909func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700910 // Need to go over all channels and publish messages to them - do we need to copy msg?
911 sc.lockTopicToConsumerChannelMap.RLock()
Scott Baker2c1c4822019-10-16 11:02:41 -0700912 for _, ch := range consumerCh.channels {
khenaidoo26721882021-08-11 17:42:52 -0400913 go func(c chan proto.Message) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700914 c <- protoMessage
915 }(ch)
916 }
Kent Hagermanccfa2132019-12-17 13:29:34 -0500917 sc.lockTopicToConsumerChannelMap.RUnlock()
918
919 if callback := sc.metadataCallback; callback != nil {
khenaidoo26721882021-08-11 17:42:52 -0400920 callback(fromTopic, ts)
Kent Hagermanccfa2132019-12-17 13:29:34 -0500921 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700922}
923
Neha Sharma94f16a92020-06-26 04:17:55 +0000924func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
925 logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700926startloop:
927 for {
928 select {
929 case err, ok := <-consumer.Errors():
930 if ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000931 if sc.isLivenessError(ctx, err) {
932 sc.updateLiveness(ctx, false)
933 logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
cbabud4978652019-12-04 08:04:21 +0100934 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700935 } else {
936 // Channel is closed
937 break startloop
938 }
939 case msg, ok := <-consumer.Messages():
Neha Sharma94f16a92020-06-26 04:17:55 +0000940 //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700941 if !ok {
942 // channel is closed
943 break startloop
944 }
945 msgBody := msg.Value
Neha Sharma94f16a92020-06-26 04:17:55 +0000946 sc.updateLiveness(ctx, true)
947 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo26721882021-08-11 17:42:52 -0400948 var protoMsg proto.Message
949 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000950 logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700951 continue
952 }
khenaidoo26721882021-08-11 17:42:52 -0400953 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
Scott Baker2c1c4822019-10-16 11:02:41 -0700954 case <-sc.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +0000955 logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700956 break startloop
957 }
958 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000959 logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
960 sc.setUnhealthy(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700961}
962
Neha Sharma94f16a92020-06-26 04:17:55 +0000963func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
964 logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700965
966startloop:
967 for {
968 select {
969 case err, ok := <-consumer.Errors():
970 if ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000971 if sc.isLivenessError(ctx, err) {
972 sc.updateLiveness(ctx, false)
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800973 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000974 logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700975 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000976 logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700977 // channel is closed
978 break startloop
979 }
980 case msg, ok := <-consumer.Messages():
981 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000982 logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700983 // Channel closed
984 break startloop
985 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000986 sc.updateLiveness(ctx, true)
987 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700988 msgBody := msg.Value
khenaidoo26721882021-08-11 17:42:52 -0400989 var protoMsg proto.Message
990 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000991 logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700992 continue
993 }
khenaidoo26721882021-08-11 17:42:52 -0400994 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
Scott Baker2c1c4822019-10-16 11:02:41 -0700995 consumer.MarkOffset(msg, "")
996 case ntf := <-consumer.Notifications():
Neha Sharma94f16a92020-06-26 04:17:55 +0000997 logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
Scott Baker2c1c4822019-10-16 11:02:41 -0700998 case <-sc.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +0000999 logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001000 break startloop
1001 }
1002 }
Neha Sharma94f16a92020-06-26 04:17:55 +00001003 logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
1004 sc.setUnhealthy(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -07001005}
1006
Neha Sharma94f16a92020-06-26 04:17:55 +00001007func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
1008 logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001009 var consumerCh *consumerChannels
1010 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001011 logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001012 return errors.New("consumers-not-exist")
1013 }
1014 // For each consumer listening for that topic, start a consumption loop
1015 for _, consumer := range consumerCh.consumers {
1016 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001017 go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
Scott Baker2c1c4822019-10-16 11:02:41 -07001018 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001019 go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
Scott Baker2c1c4822019-10-16 11:02:41 -07001020 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +00001021 logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001022 return errors.New("invalid-consumer")
1023 }
1024 }
1025 return nil
1026}
1027
1028//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1029//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo26721882021-08-11 17:42:52 -04001030func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001031 var pConsumers []sarama.PartitionConsumer
1032 var err error
1033
Neha Sharma94f16a92020-06-26 04:17:55 +00001034 if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
1035 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001036 return nil, err
1037 }
1038
1039 consumersIf := make([]interface{}, 0)
1040 for _, pConsumer := range pConsumers {
1041 consumersIf = append(consumersIf, pConsumer)
1042 }
1043
1044 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1045 // unbuffered to verify race conditions.
khenaidoo26721882021-08-11 17:42:52 -04001046 consumerListeningChannel := make(chan proto.Message)
Scott Baker2c1c4822019-10-16 11:02:41 -07001047 cc := &consumerChannels{
1048 consumers: consumersIf,
khenaidoo26721882021-08-11 17:42:52 -04001049 channels: []chan proto.Message{consumerListeningChannel},
Scott Baker2c1c4822019-10-16 11:02:41 -07001050 }
1051
1052 // Add the consumers channel to the map
1053 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1054
1055 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001056 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +00001057 if err := sc.startConsumers(ctx, topic); err != nil {
1058 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001059 "topic": topic,
1060 "error": err})
1061 }
1062 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001063
1064 return consumerListeningChannel, nil
1065}
1066
1067// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1068// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo26721882021-08-11 17:42:52 -04001069func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001070 // TODO: Replace this development partition consumers with a group consumers
1071 var pConsumer *scc.Consumer
1072 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +00001073 if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1074 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001075 return nil, err
1076 }
1077 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1078 // unbuffered to verify race conditions.
khenaidoo26721882021-08-11 17:42:52 -04001079 consumerListeningChannel := make(chan proto.Message)
Scott Baker2c1c4822019-10-16 11:02:41 -07001080 cc := &consumerChannels{
1081 consumers: []interface{}{pConsumer},
khenaidoo26721882021-08-11 17:42:52 -04001082 channels: []chan proto.Message{consumerListeningChannel},
Scott Baker2c1c4822019-10-16 11:02:41 -07001083 }
1084
1085 // Add the consumers channel to the map
1086 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1087
1088 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001089 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +00001090 if err := sc.startConsumers(ctx, topic); err != nil {
1091 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001092 "topic": topic,
1093 "error": err})
1094 }
1095 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001096
1097 return consumerListeningChannel, nil
1098}
1099
Neha Sharma94f16a92020-06-26 04:17:55 +00001100func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1101 logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001102 partitionList, err := sc.consumer.Partitions(topic.Name)
1103 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001104 logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001105 return nil, err
1106 }
1107
1108 pConsumers := make([]sarama.PartitionConsumer, 0)
1109 for _, partition := range partitionList {
1110 var pConsumer sarama.PartitionConsumer
1111 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001112 logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001113 return nil, err
1114 }
1115 pConsumers = append(pConsumers, pConsumer)
1116 }
1117 return pConsumers, nil
1118}
1119
khenaidoo26721882021-08-11 17:42:52 -04001120func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
Scott Baker2c1c4822019-10-16 11:02:41 -07001121 var i int
khenaidoo26721882021-08-11 17:42:52 -04001122 var channel chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -07001123 for i, channel = range channels {
1124 if channel == ch {
1125 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1126 close(channel)
Neha Sharma94f16a92020-06-26 04:17:55 +00001127 logger.Debug(ctx, "channel-closed")
Scott Baker2c1c4822019-10-16 11:02:41 -07001128 return channels[:len(channels)-1]
1129 }
1130 }
1131 return channels
1132}
1133
1134func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1135 sc.lockOfGroupConsumers.Lock()
1136 defer sc.lockOfGroupConsumers.Unlock()
1137 if _, exist := sc.groupConsumers[topic]; !exist {
1138 sc.groupConsumers[topic] = consumer
1139 }
1140}
1141
Neha Sharma94f16a92020-06-26 04:17:55 +00001142func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -07001143 sc.lockOfGroupConsumers.Lock()
1144 defer sc.lockOfGroupConsumers.Unlock()
1145 if _, exist := sc.groupConsumers[topic]; exist {
1146 consumer := sc.groupConsumers[topic]
1147 delete(sc.groupConsumers, topic)
1148 if err := consumer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001149 logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001150 return err
1151 }
1152 }
1153 return nil
1154}