blob: d1a1a3a3f8261610a39e85abdf88c300317dffd0 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
Joey Armstrong9cdee9f2024-01-03 04:56:14 -05002* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
Scott Baker2c1c4822019-10-16 11:02:41 -07003
Joey Armstrong7f8436c2023-07-09 20:23:27 -04004* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
Scott Baker2c1c4822019-10-16 11:02:41 -07007
Joey Armstrong7f8436c2023-07-09 20:23:27 -04008* http://www.apache.org/licenses/LICENSE-2.0
Scott Baker2c1c4822019-10-16 11:02:41 -07009
Joey Armstrong7f8436c2023-07-09 20:23:27 -040010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
Scott Baker2c1c4822019-10-16 11:02:41 -070015 */
Akash Reddy Kankanala05aff182025-05-06 12:57:32 +053016//nolint:staticcheck
Scott Baker2c1c4822019-10-16 11:02:41 -070017package kafka
18
19import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080020 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070021 "errors"
22 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070023 "strings"
24 "sync"
25 "time"
26
Scott Baker2c1c4822019-10-16 11:02:41 -070027 "github.com/Shopify/sarama"
28 scc "github.com/bsm/sarama-cluster"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080029 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070030 "github.com/golang/protobuf/proto"
31 "github.com/google/uuid"
khenaidoo26721882021-08-11 17:42:52 -040032 "github.com/opencord/voltha-lib-go/v7/pkg/log"
Scott Baker2c1c4822019-10-16 11:02:41 -070033)
34
Scott Baker2c1c4822019-10-16 11:02:41 -070035// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
36// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
Joey Armstrong7f8436c2023-07-09 20:23:27 -040037// consumer or a group consumer
Scott Baker2c1c4822019-10-16 11:02:41 -070038type consumerChannels struct {
39 consumers []interface{}
khenaidoo26721882021-08-11 17:42:52 -040040 channels []chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -070041}
42
Kent Hagermanccfa2132019-12-17 13:29:34 -050043// static check to ensure SaramaClient implements Client
44var _ Client = &SaramaClient{}
45
Scott Baker2c1c4822019-10-16 11:02:41 -070046// SaramaClient represents the messaging proxy
47type SaramaClient struct {
48 cAdmin sarama.ClusterAdmin
Neha Sharmadd9af392020-04-28 09:03:57 +000049 KafkaAddress string
Scott Baker2c1c4822019-10-16 11:02:41 -070050 producer sarama.AsyncProducer
51 consumer sarama.Consumer
52 groupConsumers map[string]*scc.Consumer
53 lockOfGroupConsumers sync.RWMutex
54 consumerGroupPrefix string
55 consumerType int
56 consumerGroupName string
57 producerFlushFrequency int
58 producerFlushMessages int
59 producerFlushMaxmessages int
60 producerRetryMax int
61 producerRetryBackOff time.Duration
62 producerReturnSuccess bool
63 producerReturnErrors bool
64 consumerMaxwait int
65 maxProcessingTime int
66 numPartitions int
67 numReplicas int
68 autoCreateTopic bool
69 doneCh chan int
Scott Baker84a55ce2020-04-17 10:11:30 -070070 metadataCallback func(fromTopic string, timestamp time.Time)
Scott Baker2c1c4822019-10-16 11:02:41 -070071 topicToConsumerChannelMap map[string]*consumerChannels
72 lockTopicToConsumerChannelMap sync.RWMutex
73 topicLockMap map[string]*sync.RWMutex
74 lockOfTopicLockMap sync.RWMutex
75 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070076 alive bool
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -070077 livenessMutex sync.Mutex
Scott Baker104b67d2019-10-29 15:56:27 -070078 liveness chan bool
79 livenessChannelInterval time.Duration
80 lastLivenessTime time.Time
81 started bool
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -070082 healthinessMutex sync.Mutex
Scott Baker0fef6982019-12-12 09:49:42 -080083 healthy bool
84 healthiness chan bool
Scott Baker2c1c4822019-10-16 11:02:41 -070085}
86
87type SaramaClientOption func(*SaramaClient)
88
Neha Sharmadd9af392020-04-28 09:03:57 +000089func Address(address string) SaramaClientOption {
Scott Baker2c1c4822019-10-16 11:02:41 -070090 return func(args *SaramaClient) {
Neha Sharmadd9af392020-04-28 09:03:57 +000091 args.KafkaAddress = address
Scott Baker2c1c4822019-10-16 11:02:41 -070092 }
93}
94
95func ConsumerGroupPrefix(prefix string) SaramaClientOption {
96 return func(args *SaramaClient) {
97 args.consumerGroupPrefix = prefix
98 }
99}
100
101func ConsumerGroupName(name string) SaramaClientOption {
102 return func(args *SaramaClient) {
103 args.consumerGroupName = name
104 }
105}
106
107func ConsumerType(consumer int) SaramaClientOption {
108 return func(args *SaramaClient) {
109 args.consumerType = consumer
110 }
111}
112
113func ProducerFlushFrequency(frequency int) SaramaClientOption {
114 return func(args *SaramaClient) {
115 args.producerFlushFrequency = frequency
116 }
117}
118
119func ProducerFlushMessages(num int) SaramaClientOption {
120 return func(args *SaramaClient) {
121 args.producerFlushMessages = num
122 }
123}
124
125func ProducerFlushMaxMessages(num int) SaramaClientOption {
126 return func(args *SaramaClient) {
127 args.producerFlushMaxmessages = num
128 }
129}
130
131func ProducerMaxRetries(num int) SaramaClientOption {
132 return func(args *SaramaClient) {
133 args.producerRetryMax = num
134 }
135}
136
137func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
138 return func(args *SaramaClient) {
139 args.producerRetryBackOff = duration
140 }
141}
142
143func ProducerReturnOnErrors(opt bool) SaramaClientOption {
144 return func(args *SaramaClient) {
145 args.producerReturnErrors = opt
146 }
147}
148
149func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
150 return func(args *SaramaClient) {
151 args.producerReturnSuccess = opt
152 }
153}
154
155func ConsumerMaxWait(wait int) SaramaClientOption {
156 return func(args *SaramaClient) {
157 args.consumerMaxwait = wait
158 }
159}
160
161func MaxProcessingTime(pTime int) SaramaClientOption {
162 return func(args *SaramaClient) {
163 args.maxProcessingTime = pTime
164 }
165}
166
167func NumPartitions(number int) SaramaClientOption {
168 return func(args *SaramaClient) {
169 args.numPartitions = number
170 }
171}
172
173func NumReplicas(number int) SaramaClientOption {
174 return func(args *SaramaClient) {
175 args.numReplicas = number
176 }
177}
178
179func AutoCreateTopic(opt bool) SaramaClientOption {
180 return func(args *SaramaClient) {
181 args.autoCreateTopic = opt
182 }
183}
184
185func MetadatMaxRetries(retry int) SaramaClientOption {
186 return func(args *SaramaClient) {
187 args.metadataMaxRetry = retry
188 }
189}
190
Scott Baker104b67d2019-10-29 15:56:27 -0700191func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
192 return func(args *SaramaClient) {
193 args.livenessChannelInterval = opt
194 }
195}
196
Scott Baker2c1c4822019-10-16 11:02:41 -0700197func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
198 client := &SaramaClient{
Neha Sharmadd9af392020-04-28 09:03:57 +0000199 KafkaAddress: DefaultKafkaAddress,
Scott Baker2c1c4822019-10-16 11:02:41 -0700200 }
201 client.consumerType = DefaultConsumerType
202 client.producerFlushFrequency = DefaultProducerFlushFrequency
203 client.producerFlushMessages = DefaultProducerFlushMessages
204 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
205 client.producerReturnErrors = DefaultProducerReturnErrors
206 client.producerReturnSuccess = DefaultProducerReturnSuccess
207 client.producerRetryMax = DefaultProducerRetryMax
208 client.producerRetryBackOff = DefaultProducerRetryBackoff
209 client.consumerMaxwait = DefaultConsumerMaxwait
210 client.maxProcessingTime = DefaultMaxProcessingTime
211 client.numPartitions = DefaultNumberPartitions
212 client.numReplicas = DefaultNumberReplicas
213 client.autoCreateTopic = DefaultAutoCreateTopic
214 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700215 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700216
217 for _, option := range opts {
218 option(client)
219 }
220
221 client.groupConsumers = make(map[string]*scc.Consumer)
222
223 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
224 client.topicLockMap = make(map[string]*sync.RWMutex)
225 client.lockOfTopicLockMap = sync.RWMutex{}
226 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700227
Scott Baker0fef6982019-12-12 09:49:42 -0800228 // healthy and alive until proven otherwise
Scott Baker104b67d2019-10-29 15:56:27 -0700229 client.alive = true
Scott Baker0fef6982019-12-12 09:49:42 -0800230 client.healthy = true
Scott Baker104b67d2019-10-29 15:56:27 -0700231
Scott Baker2c1c4822019-10-16 11:02:41 -0700232 return client
233}
234
Neha Sharma94f16a92020-06-26 04:17:55 +0000235func (sc *SaramaClient) Start(ctx context.Context) error {
236 logger.Info(ctx, "Starting-kafka-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700237
238 // Create the Done channel
239 sc.doneCh = make(chan int, 1)
240
241 var err error
242
243 // Add a cleanup in case of failure to startup
244 defer func() {
245 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000246 sc.Stop(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700247 }
248 }()
249
250 // Create the Cluster Admin
Neha Sharma94f16a92020-06-26 04:17:55 +0000251 if err = sc.createClusterAdmin(ctx); err != nil {
252 logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700253 return err
254 }
255
256 // Create the Publisher
Neha Sharma94f16a92020-06-26 04:17:55 +0000257 if err := sc.createPublisher(ctx); err != nil {
258 logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700259 return err
260 }
261
262 if sc.consumerType == DefaultConsumerType {
263 // Create the master consumers
Neha Sharma94f16a92020-06-26 04:17:55 +0000264 if err := sc.createConsumer(ctx); err != nil {
265 logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700266 return err
267 }
268 }
269
270 // Create the topic to consumers/channel map
271 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
272
Neha Sharma94f16a92020-06-26 04:17:55 +0000273 logger.Info(ctx, "kafka-sarama-client-started")
Scott Baker2c1c4822019-10-16 11:02:41 -0700274
Scott Baker104b67d2019-10-29 15:56:27 -0700275 sc.started = true
276
Scott Baker2c1c4822019-10-16 11:02:41 -0700277 return nil
278}
279
Neha Sharma94f16a92020-06-26 04:17:55 +0000280func (sc *SaramaClient) Stop(ctx context.Context) {
281 logger.Info(ctx, "stopping-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700282
Scott Baker104b67d2019-10-29 15:56:27 -0700283 sc.started = false
284
Scott Baker2c1c4822019-10-16 11:02:41 -0700285 //Send a message over the done channel to close all long running routines
286 sc.doneCh <- 1
287
288 if sc.producer != nil {
289 if err := sc.producer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000290 logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700291 }
292 }
293
294 if sc.consumer != nil {
295 if err := sc.consumer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000296 logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700297 }
298 }
299
300 for key, val := range sc.groupConsumers {
Neha Sharma94f16a92020-06-26 04:17:55 +0000301 logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700302 if err := val.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000303 logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700304 }
305 }
306
307 if sc.cAdmin != nil {
308 if err := sc.cAdmin.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000309 logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700310 }
311 }
312
313 //TODO: Clear the consumers map
314 //sc.clearConsumerChannelMap()
315
Neha Sharma94f16a92020-06-26 04:17:55 +0000316 logger.Info(ctx, "sarama-client-stopped")
Scott Baker2c1c4822019-10-16 11:02:41 -0700317}
318
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400319// createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
Scott Baker2c1c4822019-10-16 11:02:41 -0700320// the invoking function must hold the lock
Neha Sharma94f16a92020-06-26 04:17:55 +0000321func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700322 // Set the topic details
323 topicDetail := &sarama.TopicDetail{}
324 topicDetail.NumPartitions = int32(numPartition)
325 topicDetail.ReplicationFactor = int16(repFactor)
326 topicDetail.ConfigEntries = make(map[string]*string)
327 topicDetails := make(map[string]*sarama.TopicDetail)
328 topicDetails[topic.Name] = topicDetail
329
330 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
kesavandd85e52b2022-03-15 16:38:08 +0530331 switch typedErr := err.(type) {
332 case *sarama.TopicError:
333 if typedErr.Err == sarama.ErrTopicAlreadyExists {
334 err = nil
335 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700336 }
kesavandd85e52b2022-03-15 16:38:08 +0530337 if err != nil {
338 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
339 return err
340 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700341 }
342 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
343 // do so.
Neha Sharma94f16a92020-06-26 04:17:55 +0000344 logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
Scott Baker2c1c4822019-10-16 11:02:41 -0700345 return nil
346}
347
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400348// CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
Scott Baker2c1c4822019-10-16 11:02:41 -0700349// ensure no two go routines are performing operations on the same topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000350func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700351 sc.lockTopic(topic)
352 defer sc.unLockTopic(topic)
353
Neha Sharma94f16a92020-06-26 04:17:55 +0000354 return sc.createTopic(ctx, topic, numPartition, repFactor)
Scott Baker2c1c4822019-10-16 11:02:41 -0700355}
356
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400357// DeleteTopic removes a topic from the kafka Broker
Neha Sharma94f16a92020-06-26 04:17:55 +0000358func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700359 sc.lockTopic(topic)
360 defer sc.unLockTopic(topic)
361
362 // Remove the topic from the broker
363 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
364 if err == sarama.ErrUnknownTopicOrPartition {
365 // Not an error as does not exist
Neha Sharma94f16a92020-06-26 04:17:55 +0000366 logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700367 return nil
368 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000369 logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700370 return err
371 }
372
373 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
Neha Sharma94f16a92020-06-26 04:17:55 +0000374 if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
375 logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700376 return err
377 }
378 return nil
379}
380
381// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
382// messages from that topic
khenaidoo26721882021-08-11 17:42:52 -0400383func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700384 sc.lockTopic(topic)
385 defer sc.unLockTopic(topic)
386
Neha Sharma94f16a92020-06-26 04:17:55 +0000387 logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700388
389 // If a consumers already exist for that topic then resuse it
390 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000391 logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700392 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo26721882021-08-11 17:42:52 -0400393 ch := make(chan proto.Message)
Neha Sharma94f16a92020-06-26 04:17:55 +0000394 sc.addChannelToConsumerChannelMap(ctx, topic, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700395 return ch, nil
396 }
397
398 // Register for the topic and set it up
khenaidoo26721882021-08-11 17:42:52 -0400399 var consumerListeningChannel chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -0700400 var err error
401
402 // Use the consumerType option to figure out the type of consumer to launch
403 if sc.consumerType == PartitionConsumer {
404 if sc.autoCreateTopic {
Neha Sharma94f16a92020-06-26 04:17:55 +0000405 if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
406 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700407 return nil, err
408 }
409 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000410 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
411 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700412 return nil, err
413 }
414 } else if sc.consumerType == GroupCustomer {
415 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
416 // does not consume from a precreated topic in some scenarios
417 //if sc.autoCreateTopic {
418 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000419 // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700420 // return nil, err
421 // }
422 //}
423 //groupId := sc.consumerGroupName
424 groupId := getGroupId(kvArgs...)
425 // Include the group prefix
426 if groupId != "" {
427 groupId = sc.consumerGroupPrefix + groupId
428 } else {
429 // Need to use a unique group Id per topic
430 groupId = sc.consumerGroupPrefix + topic.Name
431 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000432 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
433 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700434 return nil, err
435 }
436
437 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000438 logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700439 return nil, errors.New("unknown-consumer-type")
440 }
441
442 return consumerListeningChannel, nil
443}
444
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400445// UnSubscribe unsubscribe a consumer from a given topic
khenaidoo26721882021-08-11 17:42:52 -0400446func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700447 sc.lockTopic(topic)
448 defer sc.unLockTopic(topic)
449
Neha Sharma94f16a92020-06-26 04:17:55 +0000450 logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700451 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000452 if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
453 logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700454 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000455 if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
456 logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700457 }
458 return err
459}
460
Neha Sharma94f16a92020-06-26 04:17:55 +0000461func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
Kent Hagermanccfa2132019-12-17 13:29:34 -0500462 sc.metadataCallback = callback
463}
464
Neha Sharma94f16a92020-06-26 04:17:55 +0000465func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
Scott Baker104b67d2019-10-29 15:56:27 -0700466 // Post a consistent stream of liveness data to the channel,
467 // so that in a live state, the core does not timeout and
468 // send a forced liveness message. Production of liveness
469 // events to the channel is rate-limited by livenessChannelInterval.
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700470 sc.livenessMutex.Lock()
471 defer sc.livenessMutex.Unlock()
Scott Baker104b67d2019-10-29 15:56:27 -0700472 if sc.liveness != nil {
473 if sc.alive != alive {
Neha Sharma94f16a92020-06-26 04:17:55 +0000474 logger.Info(ctx, "update-liveness-channel-because-change")
Scott Baker104b67d2019-10-29 15:56:27 -0700475 sc.liveness <- alive
476 sc.lastLivenessTime = time.Now()
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800477 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Akash Soni7b911a02024-06-20 13:06:21 +0530478 logger.Debugf(ctx, "update-liveness-channel-because-interval")
Scott Baker104b67d2019-10-29 15:56:27 -0700479 sc.liveness <- alive
480 sc.lastLivenessTime = time.Now()
481 }
482 }
483
484 // Only emit a log message when the state changes
485 if sc.alive != alive {
Neha Sharma94f16a92020-06-26 04:17:55 +0000486 logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
Scott Baker104b67d2019-10-29 15:56:27 -0700487 sc.alive = alive
488 }
489}
490
Scott Baker0fef6982019-12-12 09:49:42 -0800491// Once unhealthy, we never go back
Neha Sharma94f16a92020-06-26 04:17:55 +0000492func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
Scott Baker0fef6982019-12-12 09:49:42 -0800493 sc.healthy = false
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700494 sc.healthinessMutex.Lock()
495 defer sc.healthinessMutex.Unlock()
Scott Baker0fef6982019-12-12 09:49:42 -0800496 if sc.healthiness != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000497 logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker0fef6982019-12-12 09:49:42 -0800498 sc.healthiness <- sc.healthy
499 }
500}
501
Neha Sharma94f16a92020-06-26 04:17:55 +0000502func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800503 // Sarama producers and consumers encapsulate the error inside
504 // a ProducerError or ConsumerError struct.
505 if prodError, ok := err.(*sarama.ProducerError); ok {
506 err = prodError.Err
507 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
508 err = consumerError.Err
509 }
510
511 // Sarama-Cluster will compose the error into a ClusterError struct,
512 // which we can't do a compare by reference. To handle that, we the
513 // best we can do is compare the error strings.
514
515 switch err.Error() {
516 case context.DeadlineExceeded.Error():
Neha Sharma94f16a92020-06-26 04:17:55 +0000517 logger.Info(ctx, "is-liveness-error-timeout")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800518 return true
519 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Neha Sharma94f16a92020-06-26 04:17:55 +0000520 logger.Info(ctx, "is-liveness-error-no-brokers")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800521 return true
522 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Neha Sharma94f16a92020-06-26 04:17:55 +0000523 logger.Info(ctx, "is-liveness-error-shutting-down")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800524 return true
525 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Neha Sharma94f16a92020-06-26 04:17:55 +0000526 logger.Info(ctx, "is-liveness-error-not-available")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800527 return true
528 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Neha Sharma94f16a92020-06-26 04:17:55 +0000529 logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800530 return true
531 }
532
533 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Neha Sharma94f16a92020-06-26 04:17:55 +0000534 logger.Info(ctx, "is-liveness-error-connection-refused")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800535 return true
536 }
537
Scott Baker718bee02020-01-07 09:52:02 -0800538 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Neha Sharma94f16a92020-06-26 04:17:55 +0000539 logger.Info(ctx, "is-liveness-error-io-timeout")
Scott Baker718bee02020-01-07 09:52:02 -0800540 return true
541 }
542
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800543 // Other errors shouldn't trigger a loss of liveness
544
Neha Sharma94f16a92020-06-26 04:17:55 +0000545 logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800546
547 return false
548}
549
Scott Baker2c1c4822019-10-16 11:02:41 -0700550// send formats and sends the request onto the kafka messaging bus.
Neha Sharma94f16a92020-06-26 04:17:55 +0000551func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700552
553 // Assert message is a proto message
554 var protoMsg proto.Message
555 var ok bool
556 // ascertain the value interface type is a proto.Message
557 if protoMsg, ok = msg.(proto.Message); !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000558 logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800559 return fmt.Errorf("not-a-proto-msg-%s", msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700560 }
561
562 var marshalled []byte
563 var err error
564 // Create the Sarama producer message
565 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000566 logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700567 return err
568 }
569 key := ""
570 if len(keys) > 0 {
571 key = keys[0] // Only the first key is relevant
572 }
573 kafkaMsg := &sarama.ProducerMessage{
574 Topic: topic.Name,
575 Key: sarama.StringEncoder(key),
576 Value: sarama.ByteEncoder(marshalled),
577 }
578
579 // Send message to kafka
580 sc.producer.Input() <- kafkaMsg
581 // Wait for result
582 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
583 select {
584 case ok := <-sc.producer.Successes():
Neha Sharma94f16a92020-06-26 04:17:55 +0000585 logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
586 sc.updateLiveness(ctx, true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700587 case notOk := <-sc.producer.Errors():
Neha Sharma94f16a92020-06-26 04:17:55 +0000588 logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
589 if sc.isLivenessError(ctx, notOk) {
590 sc.updateLiveness(ctx, false)
Scott Baker104b67d2019-10-29 15:56:27 -0700591 }
592 return notOk
593 }
594 return nil
595}
596
597// Enable the liveness monitor channel. This channel will report
598// a "true" or "false" on every publish, which indicates whether
599// or not the channel is still live. This channel is then picked up
600// by the service (i.e. rw_core / ro_core) to update readiness status
601// and/or take other actions.
Neha Sharma94f16a92020-06-26 04:17:55 +0000602func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
603 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Baker104b67d2019-10-29 15:56:27 -0700604 if enable {
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700605 sc.livenessMutex.Lock()
606 defer sc.livenessMutex.Unlock()
Scott Baker104b67d2019-10-29 15:56:27 -0700607 if sc.liveness == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000608 logger.Info(ctx, "kafka-create-liveness-channel")
Scott Baker104b67d2019-10-29 15:56:27 -0700609 // At least 1, so we can immediately post to it without blocking
610 // Setting a bigger number (10) allows the monitor to fall behind
611 // without blocking others. The monitor shouldn't really fall
612 // behind...
613 sc.liveness = make(chan bool, 10)
khenaidoo26721882021-08-11 17:42:52 -0400614 // post initial state to the channel
Scott Baker104b67d2019-10-29 15:56:27 -0700615 sc.liveness <- sc.alive
616 }
617 } else {
618 // TODO: Think about whether we need the ability to turn off
619 // liveness monitoring
620 panic("Turning off liveness reporting is not supported")
621 }
622 return sc.liveness
623}
624
Scott Baker0fef6982019-12-12 09:49:42 -0800625// Enable the Healthiness monitor channel. This channel will report "false"
626// if the kafka consumers die, or some other problem occurs which is
627// catastrophic that would require re-creating the client.
Neha Sharma94f16a92020-06-26 04:17:55 +0000628func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
629 logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker0fef6982019-12-12 09:49:42 -0800630 if enable {
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700631 sc.healthinessMutex.Lock()
632 defer sc.healthinessMutex.Unlock()
Scott Baker0fef6982019-12-12 09:49:42 -0800633 if sc.healthiness == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000634 logger.Info(ctx, "kafka-create-healthiness-channel")
Scott Baker0fef6982019-12-12 09:49:42 -0800635 // At least 1, so we can immediately post to it without blocking
636 // Setting a bigger number (10) allows the monitor to fall behind
637 // without blocking others. The monitor shouldn't really fall
638 // behind...
639 sc.healthiness = make(chan bool, 10)
khenaidoo26721882021-08-11 17:42:52 -0400640 // post initial state to the channel
Scott Baker0fef6982019-12-12 09:49:42 -0800641 sc.healthiness <- sc.healthy
642 }
643 } else {
644 // TODO: Think about whether we need the ability to turn off
645 // liveness monitoring
646 panic("Turning off healthiness reporting is not supported")
647 }
648 return sc.healthiness
649}
650
Scott Baker104b67d2019-10-29 15:56:27 -0700651// send an empty message on the liveness channel to check whether connectivity has
652// been restored.
Neha Sharma94f16a92020-06-26 04:17:55 +0000653func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
Scott Baker104b67d2019-10-29 15:56:27 -0700654 if !sc.started {
655 return fmt.Errorf("SendLiveness() called while not started")
656 }
657
658 kafkaMsg := &sarama.ProducerMessage{
659 Topic: "_liveness_test",
660 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
661 }
662
663 // Send message to kafka
664 sc.producer.Input() <- kafkaMsg
665 // Wait for result
666 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
667 select {
668 case ok := <-sc.producer.Successes():
Neha Sharma94f16a92020-06-26 04:17:55 +0000669 logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
670 sc.updateLiveness(ctx, true)
Scott Baker104b67d2019-10-29 15:56:27 -0700671 case notOk := <-sc.producer.Errors():
Neha Sharma94f16a92020-06-26 04:17:55 +0000672 logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
673 if sc.isLivenessError(ctx, notOk) {
674 sc.updateLiveness(ctx, false)
Scott Baker104b67d2019-10-29 15:56:27 -0700675 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700676 return notOk
677 }
678 return nil
679}
680
681// getGroupId returns the group id from the key-value args.
682func getGroupId(kvArgs ...*KVArg) string {
683 for _, arg := range kvArgs {
684 if arg.Key == GroupIdKey {
685 return arg.Value.(string)
686 }
687 }
688 return ""
689}
690
691// getOffset returns the offset from the key-value args.
692func getOffset(kvArgs ...*KVArg) int64 {
693 for _, arg := range kvArgs {
694 if arg.Key == Offset {
695 return arg.Value.(int64)
696 }
697 }
698 return sarama.OffsetNewest
699}
700
Neha Sharma94f16a92020-06-26 04:17:55 +0000701func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700702 config := sarama.NewConfig()
703 config.Version = sarama.V1_0_0_0
704
705 // Create a cluster Admin
706 var cAdmin sarama.ClusterAdmin
707 var err error
Neha Sharmadd9af392020-04-28 09:03:57 +0000708 if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000709 logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
Scott Baker2c1c4822019-10-16 11:02:41 -0700710 return err
711 }
712 sc.cAdmin = cAdmin
713 return nil
714}
715
716func (sc *SaramaClient) lockTopic(topic *Topic) {
717 sc.lockOfTopicLockMap.Lock()
718 if _, exist := sc.topicLockMap[topic.Name]; exist {
719 sc.lockOfTopicLockMap.Unlock()
720 sc.topicLockMap[topic.Name].Lock()
721 } else {
722 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
723 sc.lockOfTopicLockMap.Unlock()
724 sc.topicLockMap[topic.Name].Lock()
725 }
726}
727
728func (sc *SaramaClient) unLockTopic(topic *Topic) {
729 sc.lockOfTopicLockMap.Lock()
730 defer sc.lockOfTopicLockMap.Unlock()
731 if _, exist := sc.topicLockMap[topic.Name]; exist {
732 sc.topicLockMap[topic.Name].Unlock()
733 }
734}
735
736func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
737 sc.lockTopicToConsumerChannelMap.Lock()
738 defer sc.lockTopicToConsumerChannelMap.Unlock()
739 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
740 sc.topicToConsumerChannelMap[id] = arg
741 }
742}
743
Scott Baker2c1c4822019-10-16 11:02:41 -0700744func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
745 sc.lockTopicToConsumerChannelMap.RLock()
746 defer sc.lockTopicToConsumerChannelMap.RUnlock()
747
748 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
749 return consumerCh
750 }
751 return nil
752}
753
khenaidoo26721882021-08-11 17:42:52 -0400754func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700755 sc.lockTopicToConsumerChannelMap.Lock()
756 defer sc.lockTopicToConsumerChannelMap.Unlock()
757 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
758 consumerCh.channels = append(consumerCh.channels, ch)
759 return
760 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000761 logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700762}
763
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400764// closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
Neha Sharma94f16a92020-06-26 04:17:55 +0000765func closeConsumers(ctx context.Context, consumers []interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700766 var err error
767 for _, consumer := range consumers {
768 // Is it a partition consumers?
769 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
770 if errTemp := partionConsumer.Close(); errTemp != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000771 logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
Scott Baker2c1c4822019-10-16 11:02:41 -0700772 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
773 // This can occur on race condition
774 err = nil
775 } else {
776 err = errTemp
777 }
778 }
779 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
780 if errTemp := groupConsumer.Close(); errTemp != nil {
781 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
782 // This can occur on race condition
783 err = nil
784 } else {
785 err = errTemp
786 }
787 }
788 }
789 }
790 return err
791}
792
khenaidoo26721882021-08-11 17:42:52 -0400793func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700794 sc.lockTopicToConsumerChannelMap.Lock()
795 defer sc.lockTopicToConsumerChannelMap.Unlock()
796 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
797 // Channel will be closed in the removeChannel method
Neha Sharma94f16a92020-06-26 04:17:55 +0000798 consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700799 // If there are no more channels then we can close the consumers itself
800 if len(consumerCh.channels) == 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000801 logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
802 err := closeConsumers(ctx, consumerCh.consumers)
Scott Baker2c1c4822019-10-16 11:02:41 -0700803 //err := consumerCh.consumers.Close()
804 delete(sc.topicToConsumerChannelMap, topic.Name)
805 return err
806 }
807 return nil
808 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000809 logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700810 return errors.New("topic-does-not-exist")
811}
812
Neha Sharma94f16a92020-06-26 04:17:55 +0000813func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700814 sc.lockTopicToConsumerChannelMap.Lock()
815 defer sc.lockTopicToConsumerChannelMap.Unlock()
816 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
817 for _, ch := range consumerCh.channels {
818 // Channel will be closed in the removeChannel method
Neha Sharma94f16a92020-06-26 04:17:55 +0000819 removeChannel(ctx, consumerCh.channels, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700820 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000821 err := closeConsumers(ctx, consumerCh.consumers)
Scott Baker2c1c4822019-10-16 11:02:41 -0700822 //if err == sarama.ErrUnknownTopicOrPartition {
823 // // Not an error
824 // err = nil
825 //}
826 //err := consumerCh.consumers.Close()
827 delete(sc.topicToConsumerChannelMap, topic.Name)
828 return err
829 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000830 logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700831 return nil
832}
833
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400834// createPublisher creates the publisher which is used to send a message onto kafka
Neha Sharma94f16a92020-06-26 04:17:55 +0000835func (sc *SaramaClient) createPublisher(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700836 // This Creates the publisher
837 config := sarama.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530838 config.Version = sarama.V1_0_0_0
kesavandd85e52b2022-03-15 16:38:08 +0530839 config.Producer.Partitioner = sarama.NewHashPartitioner
Scott Baker2c1c4822019-10-16 11:02:41 -0700840 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
841 config.Producer.Flush.Messages = sc.producerFlushMessages
842 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
843 config.Producer.Return.Errors = sc.producerReturnErrors
844 config.Producer.Return.Successes = sc.producerReturnSuccess
845 //config.Producer.RequiredAcks = sarama.WaitForAll
846 config.Producer.RequiredAcks = sarama.WaitForLocal
847
Neha Sharmadd9af392020-04-28 09:03:57 +0000848 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700849
850 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000851 logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700852 return err
853 } else {
854 sc.producer = producer
855 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000856 logger.Info(ctx, "Kafka-publisher-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700857 return nil
858}
859
Neha Sharma94f16a92020-06-26 04:17:55 +0000860func (sc *SaramaClient) createConsumer(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700861 config := sarama.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530862 config.Version = sarama.V1_0_0_0
Scott Baker2c1c4822019-10-16 11:02:41 -0700863 config.Consumer.Return.Errors = true
864 config.Consumer.Fetch.Min = 1
865 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
866 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
867 config.Consumer.Offsets.Initial = sarama.OffsetNewest
868 config.Metadata.Retry.Max = sc.metadataMaxRetry
Neha Sharmadd9af392020-04-28 09:03:57 +0000869 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700870
871 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000872 logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700873 return err
874 } else {
875 sc.consumer = consumer
876 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000877 logger.Info(ctx, "Kafka-consumers-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700878 return nil
879}
880
881// createGroupConsumer creates a consumers group
Neha Sharma94f16a92020-06-26 04:17:55 +0000882func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700883 config := scc.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530884 config.Version = sarama.V1_0_0_0
Scott Baker2c1c4822019-10-16 11:02:41 -0700885 config.ClientID = uuid.New().String()
886 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700887 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
888 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700889 //config.Group.Return.Notifications = false
890 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
891 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
892 config.Consumer.Offsets.Initial = initialOffset
893 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
Neha Sharmadd9af392020-04-28 09:03:57 +0000894 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700895
896 topics := []string{topic.Name}
897 var consumer *scc.Consumer
898 var err error
899
900 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000901 logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700902 return nil, err
903 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000904 logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700905
906 //sc.groupConsumers[topic.Name] = consumer
907 sc.addToGroupConsumers(topic.Name, consumer)
908 return consumer, nil
909}
910
911// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
912// topic via the unique channel each subscriber received during subscription
khenaidoo26721882021-08-11 17:42:52 -0400913func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700914 // Need to go over all channels and publish messages to them - do we need to copy msg?
915 sc.lockTopicToConsumerChannelMap.RLock()
Scott Baker2c1c4822019-10-16 11:02:41 -0700916 for _, ch := range consumerCh.channels {
khenaidoo26721882021-08-11 17:42:52 -0400917 go func(c chan proto.Message) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700918 c <- protoMessage
919 }(ch)
920 }
Kent Hagermanccfa2132019-12-17 13:29:34 -0500921 sc.lockTopicToConsumerChannelMap.RUnlock()
922
923 if callback := sc.metadataCallback; callback != nil {
khenaidoo26721882021-08-11 17:42:52 -0400924 callback(fromTopic, ts)
Kent Hagermanccfa2132019-12-17 13:29:34 -0500925 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700926}
927
Neha Sharma94f16a92020-06-26 04:17:55 +0000928func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
929 logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700930startloop:
931 for {
932 select {
933 case err, ok := <-consumer.Errors():
934 if ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000935 if sc.isLivenessError(ctx, err) {
936 sc.updateLiveness(ctx, false)
937 logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
cbabud4978652019-12-04 08:04:21 +0100938 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700939 } else {
940 // Channel is closed
941 break startloop
942 }
943 case msg, ok := <-consumer.Messages():
Neha Sharma94f16a92020-06-26 04:17:55 +0000944 //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700945 if !ok {
946 // channel is closed
947 break startloop
948 }
949 msgBody := msg.Value
Neha Sharma94f16a92020-06-26 04:17:55 +0000950 sc.updateLiveness(ctx, true)
951 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo26721882021-08-11 17:42:52 -0400952 var protoMsg proto.Message
953 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000954 logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700955 continue
956 }
khenaidoo26721882021-08-11 17:42:52 -0400957 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
Scott Baker2c1c4822019-10-16 11:02:41 -0700958 case <-sc.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +0000959 logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700960 break startloop
961 }
962 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000963 logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
964 sc.setUnhealthy(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700965}
966
Neha Sharma94f16a92020-06-26 04:17:55 +0000967func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
968 logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700969
970startloop:
971 for {
972 select {
973 case err, ok := <-consumer.Errors():
974 if ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000975 if sc.isLivenessError(ctx, err) {
976 sc.updateLiveness(ctx, false)
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800977 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000978 logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700979 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000980 logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700981 // channel is closed
982 break startloop
983 }
984 case msg, ok := <-consumer.Messages():
985 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000986 logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700987 // Channel closed
988 break startloop
989 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000990 sc.updateLiveness(ctx, true)
991 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700992 msgBody := msg.Value
khenaidoo26721882021-08-11 17:42:52 -0400993 var protoMsg proto.Message
994 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000995 logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700996 continue
997 }
khenaidoo26721882021-08-11 17:42:52 -0400998 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
Scott Baker2c1c4822019-10-16 11:02:41 -0700999 consumer.MarkOffset(msg, "")
1000 case ntf := <-consumer.Notifications():
Neha Sharma94f16a92020-06-26 04:17:55 +00001001 logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
Scott Baker2c1c4822019-10-16 11:02:41 -07001002 case <-sc.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +00001003 logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001004 break startloop
1005 }
1006 }
Neha Sharma94f16a92020-06-26 04:17:55 +00001007 logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
1008 sc.setUnhealthy(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -07001009}
1010
Neha Sharma94f16a92020-06-26 04:17:55 +00001011func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
1012 logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001013 var consumerCh *consumerChannels
1014 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001015 logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001016 return errors.New("consumers-not-exist")
1017 }
1018 // For each consumer listening for that topic, start a consumption loop
1019 for _, consumer := range consumerCh.consumers {
1020 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001021 go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
Scott Baker2c1c4822019-10-16 11:02:41 -07001022 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001023 go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
Scott Baker2c1c4822019-10-16 11:02:41 -07001024 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +00001025 logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001026 return errors.New("invalid-consumer")
1027 }
1028 }
1029 return nil
1030}
1031
Joey Armstrong7f8436c2023-07-09 20:23:27 -04001032// // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1033// // for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo26721882021-08-11 17:42:52 -04001034func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001035 var pConsumers []sarama.PartitionConsumer
1036 var err error
1037
Neha Sharma94f16a92020-06-26 04:17:55 +00001038 if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
1039 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001040 return nil, err
1041 }
1042
1043 consumersIf := make([]interface{}, 0)
1044 for _, pConsumer := range pConsumers {
1045 consumersIf = append(consumersIf, pConsumer)
1046 }
1047
1048 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1049 // unbuffered to verify race conditions.
khenaidoo26721882021-08-11 17:42:52 -04001050 consumerListeningChannel := make(chan proto.Message)
Scott Baker2c1c4822019-10-16 11:02:41 -07001051 cc := &consumerChannels{
1052 consumers: consumersIf,
khenaidoo26721882021-08-11 17:42:52 -04001053 channels: []chan proto.Message{consumerListeningChannel},
Scott Baker2c1c4822019-10-16 11:02:41 -07001054 }
1055
1056 // Add the consumers channel to the map
1057 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1058
1059 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001060 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +00001061 if err := sc.startConsumers(ctx, topic); err != nil {
1062 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001063 "topic": topic,
1064 "error": err})
1065 }
1066 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001067
1068 return consumerListeningChannel, nil
1069}
1070
1071// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1072// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo26721882021-08-11 17:42:52 -04001073func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001074 // TODO: Replace this development partition consumers with a group consumers
1075 var pConsumer *scc.Consumer
1076 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +00001077 if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1078 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001079 return nil, err
1080 }
1081 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1082 // unbuffered to verify race conditions.
khenaidoo26721882021-08-11 17:42:52 -04001083 consumerListeningChannel := make(chan proto.Message)
Scott Baker2c1c4822019-10-16 11:02:41 -07001084 cc := &consumerChannels{
1085 consumers: []interface{}{pConsumer},
khenaidoo26721882021-08-11 17:42:52 -04001086 channels: []chan proto.Message{consumerListeningChannel},
Scott Baker2c1c4822019-10-16 11:02:41 -07001087 }
1088
1089 // Add the consumers channel to the map
1090 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1091
1092 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001093 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +00001094 if err := sc.startConsumers(ctx, topic); err != nil {
1095 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001096 "topic": topic,
1097 "error": err})
1098 }
1099 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001100
1101 return consumerListeningChannel, nil
1102}
1103
Neha Sharma94f16a92020-06-26 04:17:55 +00001104func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1105 logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001106 partitionList, err := sc.consumer.Partitions(topic.Name)
1107 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001108 logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001109 return nil, err
1110 }
1111
1112 pConsumers := make([]sarama.PartitionConsumer, 0)
1113 for _, partition := range partitionList {
1114 var pConsumer sarama.PartitionConsumer
1115 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001116 logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001117 return nil, err
1118 }
1119 pConsumers = append(pConsumers, pConsumer)
1120 }
1121 return pConsumers, nil
1122}
1123
khenaidoo26721882021-08-11 17:42:52 -04001124func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
Scott Baker2c1c4822019-10-16 11:02:41 -07001125 var i int
khenaidoo26721882021-08-11 17:42:52 -04001126 var channel chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -07001127 for i, channel = range channels {
1128 if channel == ch {
1129 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1130 close(channel)
Neha Sharma94f16a92020-06-26 04:17:55 +00001131 logger.Debug(ctx, "channel-closed")
Scott Baker2c1c4822019-10-16 11:02:41 -07001132 return channels[:len(channels)-1]
1133 }
1134 }
1135 return channels
1136}
1137
1138func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1139 sc.lockOfGroupConsumers.Lock()
1140 defer sc.lockOfGroupConsumers.Unlock()
1141 if _, exist := sc.groupConsumers[topic]; !exist {
1142 sc.groupConsumers[topic] = consumer
1143 }
1144}
1145
Neha Sharma94f16a92020-06-26 04:17:55 +00001146func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -07001147 sc.lockOfGroupConsumers.Lock()
1148 defer sc.lockOfGroupConsumers.Unlock()
1149 if _, exist := sc.groupConsumers[topic]; exist {
1150 consumer := sc.groupConsumers[topic]
1151 delete(sc.groupConsumers, topic)
1152 if err := consumer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001153 logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001154 return err
1155 }
1156 }
1157 return nil
1158}
kesavandd85e52b2022-03-15 16:38:08 +05301159
1160func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error) {
1161
1162 config := sarama.NewConfig()
1163 client, err := sarama.NewClient([]string{sc.KafkaAddress}, config)
1164 if err != nil {
1165 logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
1166 return nil, err
1167 }
1168
1169 topics, err := client.Topics()
1170 if err != nil {
1171 logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
1172 return nil, err
1173 }
1174
1175 return topics, nil
1176}