blob: 388917582c431734edf60f68e02f8aca3320603d [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
Joey Armstrong7f8436c2023-07-09 20:23:27 -04002* Copyright 2018-2023 Open Networking Foundation (ONF) and the ONF Contributors
Scott Baker2c1c4822019-10-16 11:02:41 -07003
Joey Armstrong7f8436c2023-07-09 20:23:27 -04004* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
Scott Baker2c1c4822019-10-16 11:02:41 -07007
Joey Armstrong7f8436c2023-07-09 20:23:27 -04008* http://www.apache.org/licenses/LICENSE-2.0
Scott Baker2c1c4822019-10-16 11:02:41 -07009
Joey Armstrong7f8436c2023-07-09 20:23:27 -040010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
Scott Baker2c1c4822019-10-16 11:02:41 -070015 */
16package kafka
17
18import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080019 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070020 "errors"
21 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022 "strings"
23 "sync"
24 "time"
25
Scott Baker2c1c4822019-10-16 11:02:41 -070026 "github.com/Shopify/sarama"
27 scc "github.com/bsm/sarama-cluster"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080028 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070029 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
khenaidoo26721882021-08-11 17:42:52 -040031 "github.com/opencord/voltha-lib-go/v7/pkg/log"
Scott Baker2c1c4822019-10-16 11:02:41 -070032)
33
Scott Baker2c1c4822019-10-16 11:02:41 -070034// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
35// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
Joey Armstrong7f8436c2023-07-09 20:23:27 -040036// consumer or a group consumer
Scott Baker2c1c4822019-10-16 11:02:41 -070037type consumerChannels struct {
38 consumers []interface{}
khenaidoo26721882021-08-11 17:42:52 -040039 channels []chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -070040}
41
Kent Hagermanccfa2132019-12-17 13:29:34 -050042// static check to ensure SaramaClient implements Client
43var _ Client = &SaramaClient{}
44
Scott Baker2c1c4822019-10-16 11:02:41 -070045// SaramaClient represents the messaging proxy
46type SaramaClient struct {
47 cAdmin sarama.ClusterAdmin
Neha Sharmadd9af392020-04-28 09:03:57 +000048 KafkaAddress string
Scott Baker2c1c4822019-10-16 11:02:41 -070049 producer sarama.AsyncProducer
50 consumer sarama.Consumer
51 groupConsumers map[string]*scc.Consumer
52 lockOfGroupConsumers sync.RWMutex
53 consumerGroupPrefix string
54 consumerType int
55 consumerGroupName string
56 producerFlushFrequency int
57 producerFlushMessages int
58 producerFlushMaxmessages int
59 producerRetryMax int
60 producerRetryBackOff time.Duration
61 producerReturnSuccess bool
62 producerReturnErrors bool
63 consumerMaxwait int
64 maxProcessingTime int
65 numPartitions int
66 numReplicas int
67 autoCreateTopic bool
68 doneCh chan int
Scott Baker84a55ce2020-04-17 10:11:30 -070069 metadataCallback func(fromTopic string, timestamp time.Time)
Scott Baker2c1c4822019-10-16 11:02:41 -070070 topicToConsumerChannelMap map[string]*consumerChannels
71 lockTopicToConsumerChannelMap sync.RWMutex
72 topicLockMap map[string]*sync.RWMutex
73 lockOfTopicLockMap sync.RWMutex
74 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070075 alive bool
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -070076 livenessMutex sync.Mutex
Scott Baker104b67d2019-10-29 15:56:27 -070077 liveness chan bool
78 livenessChannelInterval time.Duration
79 lastLivenessTime time.Time
80 started bool
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -070081 healthinessMutex sync.Mutex
Scott Baker0fef6982019-12-12 09:49:42 -080082 healthy bool
83 healthiness chan bool
Scott Baker2c1c4822019-10-16 11:02:41 -070084}
85
86type SaramaClientOption func(*SaramaClient)
87
Neha Sharmadd9af392020-04-28 09:03:57 +000088func Address(address string) SaramaClientOption {
Scott Baker2c1c4822019-10-16 11:02:41 -070089 return func(args *SaramaClient) {
Neha Sharmadd9af392020-04-28 09:03:57 +000090 args.KafkaAddress = address
Scott Baker2c1c4822019-10-16 11:02:41 -070091 }
92}
93
94func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
Scott Baker104b67d2019-10-29 15:56:27 -0700190func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.livenessChannelInterval = opt
193 }
194}
195
Scott Baker2c1c4822019-10-16 11:02:41 -0700196func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
197 client := &SaramaClient{
Neha Sharmadd9af392020-04-28 09:03:57 +0000198 KafkaAddress: DefaultKafkaAddress,
Scott Baker2c1c4822019-10-16 11:02:41 -0700199 }
200 client.consumerType = DefaultConsumerType
201 client.producerFlushFrequency = DefaultProducerFlushFrequency
202 client.producerFlushMessages = DefaultProducerFlushMessages
203 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
204 client.producerReturnErrors = DefaultProducerReturnErrors
205 client.producerReturnSuccess = DefaultProducerReturnSuccess
206 client.producerRetryMax = DefaultProducerRetryMax
207 client.producerRetryBackOff = DefaultProducerRetryBackoff
208 client.consumerMaxwait = DefaultConsumerMaxwait
209 client.maxProcessingTime = DefaultMaxProcessingTime
210 client.numPartitions = DefaultNumberPartitions
211 client.numReplicas = DefaultNumberReplicas
212 client.autoCreateTopic = DefaultAutoCreateTopic
213 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700214 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700215
216 for _, option := range opts {
217 option(client)
218 }
219
220 client.groupConsumers = make(map[string]*scc.Consumer)
221
222 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
223 client.topicLockMap = make(map[string]*sync.RWMutex)
224 client.lockOfTopicLockMap = sync.RWMutex{}
225 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700226
Scott Baker0fef6982019-12-12 09:49:42 -0800227 // healthy and alive until proven otherwise
Scott Baker104b67d2019-10-29 15:56:27 -0700228 client.alive = true
Scott Baker0fef6982019-12-12 09:49:42 -0800229 client.healthy = true
Scott Baker104b67d2019-10-29 15:56:27 -0700230
Scott Baker2c1c4822019-10-16 11:02:41 -0700231 return client
232}
233
Neha Sharma94f16a92020-06-26 04:17:55 +0000234func (sc *SaramaClient) Start(ctx context.Context) error {
235 logger.Info(ctx, "Starting-kafka-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700236
237 // Create the Done channel
238 sc.doneCh = make(chan int, 1)
239
240 var err error
241
242 // Add a cleanup in case of failure to startup
243 defer func() {
244 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000245 sc.Stop(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700246 }
247 }()
248
249 // Create the Cluster Admin
Neha Sharma94f16a92020-06-26 04:17:55 +0000250 if err = sc.createClusterAdmin(ctx); err != nil {
251 logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700252 return err
253 }
254
255 // Create the Publisher
Neha Sharma94f16a92020-06-26 04:17:55 +0000256 if err := sc.createPublisher(ctx); err != nil {
257 logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700258 return err
259 }
260
261 if sc.consumerType == DefaultConsumerType {
262 // Create the master consumers
Neha Sharma94f16a92020-06-26 04:17:55 +0000263 if err := sc.createConsumer(ctx); err != nil {
264 logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700265 return err
266 }
267 }
268
269 // Create the topic to consumers/channel map
270 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
271
Neha Sharma94f16a92020-06-26 04:17:55 +0000272 logger.Info(ctx, "kafka-sarama-client-started")
Scott Baker2c1c4822019-10-16 11:02:41 -0700273
Scott Baker104b67d2019-10-29 15:56:27 -0700274 sc.started = true
275
Scott Baker2c1c4822019-10-16 11:02:41 -0700276 return nil
277}
278
Neha Sharma94f16a92020-06-26 04:17:55 +0000279func (sc *SaramaClient) Stop(ctx context.Context) {
280 logger.Info(ctx, "stopping-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700281
Scott Baker104b67d2019-10-29 15:56:27 -0700282 sc.started = false
283
Scott Baker2c1c4822019-10-16 11:02:41 -0700284 //Send a message over the done channel to close all long running routines
285 sc.doneCh <- 1
286
287 if sc.producer != nil {
288 if err := sc.producer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000289 logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700290 }
291 }
292
293 if sc.consumer != nil {
294 if err := sc.consumer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000295 logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700296 }
297 }
298
299 for key, val := range sc.groupConsumers {
Neha Sharma94f16a92020-06-26 04:17:55 +0000300 logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700301 if err := val.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000302 logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700303 }
304 }
305
306 if sc.cAdmin != nil {
307 if err := sc.cAdmin.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000308 logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700309 }
310 }
311
312 //TODO: Clear the consumers map
313 //sc.clearConsumerChannelMap()
314
Neha Sharma94f16a92020-06-26 04:17:55 +0000315 logger.Info(ctx, "sarama-client-stopped")
Scott Baker2c1c4822019-10-16 11:02:41 -0700316}
317
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400318// createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
Scott Baker2c1c4822019-10-16 11:02:41 -0700319// the invoking function must hold the lock
Neha Sharma94f16a92020-06-26 04:17:55 +0000320func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700321 // Set the topic details
322 topicDetail := &sarama.TopicDetail{}
323 topicDetail.NumPartitions = int32(numPartition)
324 topicDetail.ReplicationFactor = int16(repFactor)
325 topicDetail.ConfigEntries = make(map[string]*string)
326 topicDetails := make(map[string]*sarama.TopicDetail)
327 topicDetails[topic.Name] = topicDetail
328
329 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
kesavandd85e52b2022-03-15 16:38:08 +0530330 switch typedErr := err.(type) {
331 case *sarama.TopicError:
332 if typedErr.Err == sarama.ErrTopicAlreadyExists {
333 err = nil
334 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700335 }
kesavandd85e52b2022-03-15 16:38:08 +0530336 if err != nil {
337 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
338 return err
339 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700340 }
341 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
342 // do so.
Neha Sharma94f16a92020-06-26 04:17:55 +0000343 logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
Scott Baker2c1c4822019-10-16 11:02:41 -0700344 return nil
345}
346
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400347// CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
Scott Baker2c1c4822019-10-16 11:02:41 -0700348// ensure no two go routines are performing operations on the same topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000349func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700350 sc.lockTopic(topic)
351 defer sc.unLockTopic(topic)
352
Neha Sharma94f16a92020-06-26 04:17:55 +0000353 return sc.createTopic(ctx, topic, numPartition, repFactor)
Scott Baker2c1c4822019-10-16 11:02:41 -0700354}
355
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400356// DeleteTopic removes a topic from the kafka Broker
Neha Sharma94f16a92020-06-26 04:17:55 +0000357func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700358 sc.lockTopic(topic)
359 defer sc.unLockTopic(topic)
360
361 // Remove the topic from the broker
362 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
363 if err == sarama.ErrUnknownTopicOrPartition {
364 // Not an error as does not exist
Neha Sharma94f16a92020-06-26 04:17:55 +0000365 logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700366 return nil
367 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000368 logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700369 return err
370 }
371
372 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
Neha Sharma94f16a92020-06-26 04:17:55 +0000373 if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
374 logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700375 return err
376 }
377 return nil
378}
379
380// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
381// messages from that topic
khenaidoo26721882021-08-11 17:42:52 -0400382func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700383 sc.lockTopic(topic)
384 defer sc.unLockTopic(topic)
385
Neha Sharma94f16a92020-06-26 04:17:55 +0000386 logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700387
388 // If a consumers already exist for that topic then resuse it
389 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000390 logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700391 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo26721882021-08-11 17:42:52 -0400392 ch := make(chan proto.Message)
Neha Sharma94f16a92020-06-26 04:17:55 +0000393 sc.addChannelToConsumerChannelMap(ctx, topic, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700394 return ch, nil
395 }
396
397 // Register for the topic and set it up
khenaidoo26721882021-08-11 17:42:52 -0400398 var consumerListeningChannel chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -0700399 var err error
400
401 // Use the consumerType option to figure out the type of consumer to launch
402 if sc.consumerType == PartitionConsumer {
403 if sc.autoCreateTopic {
Neha Sharma94f16a92020-06-26 04:17:55 +0000404 if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
405 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700406 return nil, err
407 }
408 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000409 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
410 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700411 return nil, err
412 }
413 } else if sc.consumerType == GroupCustomer {
414 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
415 // does not consume from a precreated topic in some scenarios
416 //if sc.autoCreateTopic {
417 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000418 // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700419 // return nil, err
420 // }
421 //}
422 //groupId := sc.consumerGroupName
423 groupId := getGroupId(kvArgs...)
424 // Include the group prefix
425 if groupId != "" {
426 groupId = sc.consumerGroupPrefix + groupId
427 } else {
428 // Need to use a unique group Id per topic
429 groupId = sc.consumerGroupPrefix + topic.Name
430 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000431 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
432 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700433 return nil, err
434 }
435
436 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000437 logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700438 return nil, errors.New("unknown-consumer-type")
439 }
440
441 return consumerListeningChannel, nil
442}
443
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400444// UnSubscribe unsubscribe a consumer from a given topic
khenaidoo26721882021-08-11 17:42:52 -0400445func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700446 sc.lockTopic(topic)
447 defer sc.unLockTopic(topic)
448
Neha Sharma94f16a92020-06-26 04:17:55 +0000449 logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700450 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000451 if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
452 logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700453 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000454 if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
455 logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700456 }
457 return err
458}
459
Neha Sharma94f16a92020-06-26 04:17:55 +0000460func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
Kent Hagermanccfa2132019-12-17 13:29:34 -0500461 sc.metadataCallback = callback
462}
463
Neha Sharma94f16a92020-06-26 04:17:55 +0000464func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
Scott Baker104b67d2019-10-29 15:56:27 -0700465 // Post a consistent stream of liveness data to the channel,
466 // so that in a live state, the core does not timeout and
467 // send a forced liveness message. Production of liveness
468 // events to the channel is rate-limited by livenessChannelInterval.
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700469 sc.livenessMutex.Lock()
470 defer sc.livenessMutex.Unlock()
Scott Baker104b67d2019-10-29 15:56:27 -0700471 if sc.liveness != nil {
472 if sc.alive != alive {
Neha Sharma94f16a92020-06-26 04:17:55 +0000473 logger.Info(ctx, "update-liveness-channel-because-change")
Scott Baker104b67d2019-10-29 15:56:27 -0700474 sc.liveness <- alive
475 sc.lastLivenessTime = time.Now()
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800476 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Neha Sharma94f16a92020-06-26 04:17:55 +0000477 logger.Info(ctx, "update-liveness-channel-because-interval")
Scott Baker104b67d2019-10-29 15:56:27 -0700478 sc.liveness <- alive
479 sc.lastLivenessTime = time.Now()
480 }
481 }
482
483 // Only emit a log message when the state changes
484 if sc.alive != alive {
Neha Sharma94f16a92020-06-26 04:17:55 +0000485 logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
Scott Baker104b67d2019-10-29 15:56:27 -0700486 sc.alive = alive
487 }
488}
489
Scott Baker0fef6982019-12-12 09:49:42 -0800490// Once unhealthy, we never go back
Neha Sharma94f16a92020-06-26 04:17:55 +0000491func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
Scott Baker0fef6982019-12-12 09:49:42 -0800492 sc.healthy = false
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700493 sc.healthinessMutex.Lock()
494 defer sc.healthinessMutex.Unlock()
Scott Baker0fef6982019-12-12 09:49:42 -0800495 if sc.healthiness != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000496 logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker0fef6982019-12-12 09:49:42 -0800497 sc.healthiness <- sc.healthy
498 }
499}
500
Neha Sharma94f16a92020-06-26 04:17:55 +0000501func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800502 // Sarama producers and consumers encapsulate the error inside
503 // a ProducerError or ConsumerError struct.
504 if prodError, ok := err.(*sarama.ProducerError); ok {
505 err = prodError.Err
506 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
507 err = consumerError.Err
508 }
509
510 // Sarama-Cluster will compose the error into a ClusterError struct,
511 // which we can't do a compare by reference. To handle that, we the
512 // best we can do is compare the error strings.
513
514 switch err.Error() {
515 case context.DeadlineExceeded.Error():
Neha Sharma94f16a92020-06-26 04:17:55 +0000516 logger.Info(ctx, "is-liveness-error-timeout")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800517 return true
518 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Neha Sharma94f16a92020-06-26 04:17:55 +0000519 logger.Info(ctx, "is-liveness-error-no-brokers")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800520 return true
521 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Neha Sharma94f16a92020-06-26 04:17:55 +0000522 logger.Info(ctx, "is-liveness-error-shutting-down")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800523 return true
524 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Neha Sharma94f16a92020-06-26 04:17:55 +0000525 logger.Info(ctx, "is-liveness-error-not-available")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800526 return true
527 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Neha Sharma94f16a92020-06-26 04:17:55 +0000528 logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800529 return true
530 }
531
532 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Neha Sharma94f16a92020-06-26 04:17:55 +0000533 logger.Info(ctx, "is-liveness-error-connection-refused")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800534 return true
535 }
536
Scott Baker718bee02020-01-07 09:52:02 -0800537 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Neha Sharma94f16a92020-06-26 04:17:55 +0000538 logger.Info(ctx, "is-liveness-error-io-timeout")
Scott Baker718bee02020-01-07 09:52:02 -0800539 return true
540 }
541
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800542 // Other errors shouldn't trigger a loss of liveness
543
Neha Sharma94f16a92020-06-26 04:17:55 +0000544 logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800545
546 return false
547}
548
Scott Baker2c1c4822019-10-16 11:02:41 -0700549// send formats and sends the request onto the kafka messaging bus.
Neha Sharma94f16a92020-06-26 04:17:55 +0000550func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700551
552 // Assert message is a proto message
553 var protoMsg proto.Message
554 var ok bool
555 // ascertain the value interface type is a proto.Message
556 if protoMsg, ok = msg.(proto.Message); !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000557 logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800558 return fmt.Errorf("not-a-proto-msg-%s", msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700559 }
560
561 var marshalled []byte
562 var err error
563 // Create the Sarama producer message
564 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000565 logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700566 return err
567 }
568 key := ""
569 if len(keys) > 0 {
570 key = keys[0] // Only the first key is relevant
571 }
572 kafkaMsg := &sarama.ProducerMessage{
573 Topic: topic.Name,
574 Key: sarama.StringEncoder(key),
575 Value: sarama.ByteEncoder(marshalled),
576 }
577
578 // Send message to kafka
579 sc.producer.Input() <- kafkaMsg
580 // Wait for result
581 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
582 select {
583 case ok := <-sc.producer.Successes():
Neha Sharma94f16a92020-06-26 04:17:55 +0000584 logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
585 sc.updateLiveness(ctx, true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700586 case notOk := <-sc.producer.Errors():
Neha Sharma94f16a92020-06-26 04:17:55 +0000587 logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
588 if sc.isLivenessError(ctx, notOk) {
589 sc.updateLiveness(ctx, false)
Scott Baker104b67d2019-10-29 15:56:27 -0700590 }
591 return notOk
592 }
593 return nil
594}
595
596// Enable the liveness monitor channel. This channel will report
597// a "true" or "false" on every publish, which indicates whether
598// or not the channel is still live. This channel is then picked up
599// by the service (i.e. rw_core / ro_core) to update readiness status
600// and/or take other actions.
Neha Sharma94f16a92020-06-26 04:17:55 +0000601func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
602 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Baker104b67d2019-10-29 15:56:27 -0700603 if enable {
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700604 sc.livenessMutex.Lock()
605 defer sc.livenessMutex.Unlock()
Scott Baker104b67d2019-10-29 15:56:27 -0700606 if sc.liveness == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000607 logger.Info(ctx, "kafka-create-liveness-channel")
Scott Baker104b67d2019-10-29 15:56:27 -0700608 // At least 1, so we can immediately post to it without blocking
609 // Setting a bigger number (10) allows the monitor to fall behind
610 // without blocking others. The monitor shouldn't really fall
611 // behind...
612 sc.liveness = make(chan bool, 10)
khenaidoo26721882021-08-11 17:42:52 -0400613 // post initial state to the channel
Scott Baker104b67d2019-10-29 15:56:27 -0700614 sc.liveness <- sc.alive
615 }
616 } else {
617 // TODO: Think about whether we need the ability to turn off
618 // liveness monitoring
619 panic("Turning off liveness reporting is not supported")
620 }
621 return sc.liveness
622}
623
Scott Baker0fef6982019-12-12 09:49:42 -0800624// Enable the Healthiness monitor channel. This channel will report "false"
625// if the kafka consumers die, or some other problem occurs which is
626// catastrophic that would require re-creating the client.
Neha Sharma94f16a92020-06-26 04:17:55 +0000627func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
628 logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker0fef6982019-12-12 09:49:42 -0800629 if enable {
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700630 sc.healthinessMutex.Lock()
631 defer sc.healthinessMutex.Unlock()
Scott Baker0fef6982019-12-12 09:49:42 -0800632 if sc.healthiness == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000633 logger.Info(ctx, "kafka-create-healthiness-channel")
Scott Baker0fef6982019-12-12 09:49:42 -0800634 // At least 1, so we can immediately post to it without blocking
635 // Setting a bigger number (10) allows the monitor to fall behind
636 // without blocking others. The monitor shouldn't really fall
637 // behind...
638 sc.healthiness = make(chan bool, 10)
khenaidoo26721882021-08-11 17:42:52 -0400639 // post initial state to the channel
Scott Baker0fef6982019-12-12 09:49:42 -0800640 sc.healthiness <- sc.healthy
641 }
642 } else {
643 // TODO: Think about whether we need the ability to turn off
644 // liveness monitoring
645 panic("Turning off healthiness reporting is not supported")
646 }
647 return sc.healthiness
648}
649
Scott Baker104b67d2019-10-29 15:56:27 -0700650// send an empty message on the liveness channel to check whether connectivity has
651// been restored.
Neha Sharma94f16a92020-06-26 04:17:55 +0000652func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
Scott Baker104b67d2019-10-29 15:56:27 -0700653 if !sc.started {
654 return fmt.Errorf("SendLiveness() called while not started")
655 }
656
657 kafkaMsg := &sarama.ProducerMessage{
658 Topic: "_liveness_test",
659 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
660 }
661
662 // Send message to kafka
663 sc.producer.Input() <- kafkaMsg
664 // Wait for result
665 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
666 select {
667 case ok := <-sc.producer.Successes():
Neha Sharma94f16a92020-06-26 04:17:55 +0000668 logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
669 sc.updateLiveness(ctx, true)
Scott Baker104b67d2019-10-29 15:56:27 -0700670 case notOk := <-sc.producer.Errors():
Neha Sharma94f16a92020-06-26 04:17:55 +0000671 logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
672 if sc.isLivenessError(ctx, notOk) {
673 sc.updateLiveness(ctx, false)
Scott Baker104b67d2019-10-29 15:56:27 -0700674 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700675 return notOk
676 }
677 return nil
678}
679
680// getGroupId returns the group id from the key-value args.
681func getGroupId(kvArgs ...*KVArg) string {
682 for _, arg := range kvArgs {
683 if arg.Key == GroupIdKey {
684 return arg.Value.(string)
685 }
686 }
687 return ""
688}
689
690// getOffset returns the offset from the key-value args.
691func getOffset(kvArgs ...*KVArg) int64 {
692 for _, arg := range kvArgs {
693 if arg.Key == Offset {
694 return arg.Value.(int64)
695 }
696 }
697 return sarama.OffsetNewest
698}
699
Neha Sharma94f16a92020-06-26 04:17:55 +0000700func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700701 config := sarama.NewConfig()
702 config.Version = sarama.V1_0_0_0
703
704 // Create a cluster Admin
705 var cAdmin sarama.ClusterAdmin
706 var err error
Neha Sharmadd9af392020-04-28 09:03:57 +0000707 if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000708 logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
Scott Baker2c1c4822019-10-16 11:02:41 -0700709 return err
710 }
711 sc.cAdmin = cAdmin
712 return nil
713}
714
715func (sc *SaramaClient) lockTopic(topic *Topic) {
716 sc.lockOfTopicLockMap.Lock()
717 if _, exist := sc.topicLockMap[topic.Name]; exist {
718 sc.lockOfTopicLockMap.Unlock()
719 sc.topicLockMap[topic.Name].Lock()
720 } else {
721 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
722 sc.lockOfTopicLockMap.Unlock()
723 sc.topicLockMap[topic.Name].Lock()
724 }
725}
726
727func (sc *SaramaClient) unLockTopic(topic *Topic) {
728 sc.lockOfTopicLockMap.Lock()
729 defer sc.lockOfTopicLockMap.Unlock()
730 if _, exist := sc.topicLockMap[topic.Name]; exist {
731 sc.topicLockMap[topic.Name].Unlock()
732 }
733}
734
735func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
739 sc.topicToConsumerChannelMap[id] = arg
740 }
741}
742
Scott Baker2c1c4822019-10-16 11:02:41 -0700743func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
744 sc.lockTopicToConsumerChannelMap.RLock()
745 defer sc.lockTopicToConsumerChannelMap.RUnlock()
746
747 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
748 return consumerCh
749 }
750 return nil
751}
752
khenaidoo26721882021-08-11 17:42:52 -0400753func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700754 sc.lockTopicToConsumerChannelMap.Lock()
755 defer sc.lockTopicToConsumerChannelMap.Unlock()
756 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
757 consumerCh.channels = append(consumerCh.channels, ch)
758 return
759 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000760 logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700761}
762
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400763// closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
Neha Sharma94f16a92020-06-26 04:17:55 +0000764func closeConsumers(ctx context.Context, consumers []interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700765 var err error
766 for _, consumer := range consumers {
767 // Is it a partition consumers?
768 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
769 if errTemp := partionConsumer.Close(); errTemp != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000770 logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
Scott Baker2c1c4822019-10-16 11:02:41 -0700771 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
772 // This can occur on race condition
773 err = nil
774 } else {
775 err = errTemp
776 }
777 }
778 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
779 if errTemp := groupConsumer.Close(); errTemp != nil {
780 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
781 // This can occur on race condition
782 err = nil
783 } else {
784 err = errTemp
785 }
786 }
787 }
788 }
789 return err
790}
791
khenaidoo26721882021-08-11 17:42:52 -0400792func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700793 sc.lockTopicToConsumerChannelMap.Lock()
794 defer sc.lockTopicToConsumerChannelMap.Unlock()
795 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
796 // Channel will be closed in the removeChannel method
Neha Sharma94f16a92020-06-26 04:17:55 +0000797 consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700798 // If there are no more channels then we can close the consumers itself
799 if len(consumerCh.channels) == 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000800 logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
801 err := closeConsumers(ctx, consumerCh.consumers)
Scott Baker2c1c4822019-10-16 11:02:41 -0700802 //err := consumerCh.consumers.Close()
803 delete(sc.topicToConsumerChannelMap, topic.Name)
804 return err
805 }
806 return nil
807 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000808 logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700809 return errors.New("topic-does-not-exist")
810}
811
Neha Sharma94f16a92020-06-26 04:17:55 +0000812func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700813 sc.lockTopicToConsumerChannelMap.Lock()
814 defer sc.lockTopicToConsumerChannelMap.Unlock()
815 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
816 for _, ch := range consumerCh.channels {
817 // Channel will be closed in the removeChannel method
Neha Sharma94f16a92020-06-26 04:17:55 +0000818 removeChannel(ctx, consumerCh.channels, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700819 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000820 err := closeConsumers(ctx, consumerCh.consumers)
Scott Baker2c1c4822019-10-16 11:02:41 -0700821 //if err == sarama.ErrUnknownTopicOrPartition {
822 // // Not an error
823 // err = nil
824 //}
825 //err := consumerCh.consumers.Close()
826 delete(sc.topicToConsumerChannelMap, topic.Name)
827 return err
828 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000829 logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700830 return nil
831}
832
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400833// createPublisher creates the publisher which is used to send a message onto kafka
Neha Sharma94f16a92020-06-26 04:17:55 +0000834func (sc *SaramaClient) createPublisher(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700835 // This Creates the publisher
836 config := sarama.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530837 config.Version = sarama.V1_0_0_0
kesavandd85e52b2022-03-15 16:38:08 +0530838 config.Producer.Partitioner = sarama.NewHashPartitioner
Scott Baker2c1c4822019-10-16 11:02:41 -0700839 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
840 config.Producer.Flush.Messages = sc.producerFlushMessages
841 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
842 config.Producer.Return.Errors = sc.producerReturnErrors
843 config.Producer.Return.Successes = sc.producerReturnSuccess
844 //config.Producer.RequiredAcks = sarama.WaitForAll
845 config.Producer.RequiredAcks = sarama.WaitForLocal
846
Neha Sharmadd9af392020-04-28 09:03:57 +0000847 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700848
849 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000850 logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700851 return err
852 } else {
853 sc.producer = producer
854 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000855 logger.Info(ctx, "Kafka-publisher-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700856 return nil
857}
858
Neha Sharma94f16a92020-06-26 04:17:55 +0000859func (sc *SaramaClient) createConsumer(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700860 config := sarama.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530861 config.Version = sarama.V1_0_0_0
Scott Baker2c1c4822019-10-16 11:02:41 -0700862 config.Consumer.Return.Errors = true
863 config.Consumer.Fetch.Min = 1
864 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
865 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
866 config.Consumer.Offsets.Initial = sarama.OffsetNewest
867 config.Metadata.Retry.Max = sc.metadataMaxRetry
Neha Sharmadd9af392020-04-28 09:03:57 +0000868 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700869
870 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000871 logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700872 return err
873 } else {
874 sc.consumer = consumer
875 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000876 logger.Info(ctx, "Kafka-consumers-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700877 return nil
878}
879
880// createGroupConsumer creates a consumers group
Neha Sharma94f16a92020-06-26 04:17:55 +0000881func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700882 config := scc.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530883 config.Version = sarama.V1_0_0_0
Scott Baker2c1c4822019-10-16 11:02:41 -0700884 config.ClientID = uuid.New().String()
885 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700886 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
887 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700888 //config.Group.Return.Notifications = false
889 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
890 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
891 config.Consumer.Offsets.Initial = initialOffset
892 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
Neha Sharmadd9af392020-04-28 09:03:57 +0000893 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700894
895 topics := []string{topic.Name}
896 var consumer *scc.Consumer
897 var err error
898
899 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000900 logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700901 return nil, err
902 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000903 logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700904
905 //sc.groupConsumers[topic.Name] = consumer
906 sc.addToGroupConsumers(topic.Name, consumer)
907 return consumer, nil
908}
909
910// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
911// topic via the unique channel each subscriber received during subscription
khenaidoo26721882021-08-11 17:42:52 -0400912func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700913 // Need to go over all channels and publish messages to them - do we need to copy msg?
914 sc.lockTopicToConsumerChannelMap.RLock()
Scott Baker2c1c4822019-10-16 11:02:41 -0700915 for _, ch := range consumerCh.channels {
khenaidoo26721882021-08-11 17:42:52 -0400916 go func(c chan proto.Message) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700917 c <- protoMessage
918 }(ch)
919 }
Kent Hagermanccfa2132019-12-17 13:29:34 -0500920 sc.lockTopicToConsumerChannelMap.RUnlock()
921
922 if callback := sc.metadataCallback; callback != nil {
khenaidoo26721882021-08-11 17:42:52 -0400923 callback(fromTopic, ts)
Kent Hagermanccfa2132019-12-17 13:29:34 -0500924 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700925}
926
Neha Sharma94f16a92020-06-26 04:17:55 +0000927func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
928 logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700929startloop:
930 for {
931 select {
932 case err, ok := <-consumer.Errors():
933 if ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000934 if sc.isLivenessError(ctx, err) {
935 sc.updateLiveness(ctx, false)
936 logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
cbabud4978652019-12-04 08:04:21 +0100937 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700938 } else {
939 // Channel is closed
940 break startloop
941 }
942 case msg, ok := <-consumer.Messages():
Neha Sharma94f16a92020-06-26 04:17:55 +0000943 //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700944 if !ok {
945 // channel is closed
946 break startloop
947 }
948 msgBody := msg.Value
Neha Sharma94f16a92020-06-26 04:17:55 +0000949 sc.updateLiveness(ctx, true)
950 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo26721882021-08-11 17:42:52 -0400951 var protoMsg proto.Message
952 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000953 logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700954 continue
955 }
khenaidoo26721882021-08-11 17:42:52 -0400956 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
Scott Baker2c1c4822019-10-16 11:02:41 -0700957 case <-sc.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +0000958 logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700959 break startloop
960 }
961 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000962 logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
963 sc.setUnhealthy(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700964}
965
Neha Sharma94f16a92020-06-26 04:17:55 +0000966func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
967 logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700968
969startloop:
970 for {
971 select {
972 case err, ok := <-consumer.Errors():
973 if ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000974 if sc.isLivenessError(ctx, err) {
975 sc.updateLiveness(ctx, false)
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800976 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000977 logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700978 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000979 logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700980 // channel is closed
981 break startloop
982 }
983 case msg, ok := <-consumer.Messages():
984 if !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000985 logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700986 // Channel closed
987 break startloop
988 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000989 sc.updateLiveness(ctx, true)
990 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700991 msgBody := msg.Value
khenaidoo26721882021-08-11 17:42:52 -0400992 var protoMsg proto.Message
993 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000994 logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700995 continue
996 }
khenaidoo26721882021-08-11 17:42:52 -0400997 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
Scott Baker2c1c4822019-10-16 11:02:41 -0700998 consumer.MarkOffset(msg, "")
999 case ntf := <-consumer.Notifications():
Neha Sharma94f16a92020-06-26 04:17:55 +00001000 logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
Scott Baker2c1c4822019-10-16 11:02:41 -07001001 case <-sc.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +00001002 logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001003 break startloop
1004 }
1005 }
Neha Sharma94f16a92020-06-26 04:17:55 +00001006 logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
1007 sc.setUnhealthy(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -07001008}
1009
Neha Sharma94f16a92020-06-26 04:17:55 +00001010func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
1011 logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001012 var consumerCh *consumerChannels
1013 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001014 logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001015 return errors.New("consumers-not-exist")
1016 }
1017 // For each consumer listening for that topic, start a consumption loop
1018 for _, consumer := range consumerCh.consumers {
1019 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001020 go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
Scott Baker2c1c4822019-10-16 11:02:41 -07001021 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001022 go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
Scott Baker2c1c4822019-10-16 11:02:41 -07001023 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +00001024 logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001025 return errors.New("invalid-consumer")
1026 }
1027 }
1028 return nil
1029}
1030
Joey Armstrong7f8436c2023-07-09 20:23:27 -04001031// // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1032// // for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo26721882021-08-11 17:42:52 -04001033func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001034 var pConsumers []sarama.PartitionConsumer
1035 var err error
1036
Neha Sharma94f16a92020-06-26 04:17:55 +00001037 if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
1038 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001039 return nil, err
1040 }
1041
1042 consumersIf := make([]interface{}, 0)
1043 for _, pConsumer := range pConsumers {
1044 consumersIf = append(consumersIf, pConsumer)
1045 }
1046
1047 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1048 // unbuffered to verify race conditions.
khenaidoo26721882021-08-11 17:42:52 -04001049 consumerListeningChannel := make(chan proto.Message)
Scott Baker2c1c4822019-10-16 11:02:41 -07001050 cc := &consumerChannels{
1051 consumers: consumersIf,
khenaidoo26721882021-08-11 17:42:52 -04001052 channels: []chan proto.Message{consumerListeningChannel},
Scott Baker2c1c4822019-10-16 11:02:41 -07001053 }
1054
1055 // Add the consumers channel to the map
1056 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1057
1058 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001059 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +00001060 if err := sc.startConsumers(ctx, topic); err != nil {
1061 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001062 "topic": topic,
1063 "error": err})
1064 }
1065 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001066
1067 return consumerListeningChannel, nil
1068}
1069
1070// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1071// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo26721882021-08-11 17:42:52 -04001072func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001073 // TODO: Replace this development partition consumers with a group consumers
1074 var pConsumer *scc.Consumer
1075 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +00001076 if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1077 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001078 return nil, err
1079 }
1080 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1081 // unbuffered to verify race conditions.
khenaidoo26721882021-08-11 17:42:52 -04001082 consumerListeningChannel := make(chan proto.Message)
Scott Baker2c1c4822019-10-16 11:02:41 -07001083 cc := &consumerChannels{
1084 consumers: []interface{}{pConsumer},
khenaidoo26721882021-08-11 17:42:52 -04001085 channels: []chan proto.Message{consumerListeningChannel},
Scott Baker2c1c4822019-10-16 11:02:41 -07001086 }
1087
1088 // Add the consumers channel to the map
1089 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1090
1091 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001092 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +00001093 if err := sc.startConsumers(ctx, topic); err != nil {
1094 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001095 "topic": topic,
1096 "error": err})
1097 }
1098 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001099
1100 return consumerListeningChannel, nil
1101}
1102
Neha Sharma94f16a92020-06-26 04:17:55 +00001103func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1104 logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001105 partitionList, err := sc.consumer.Partitions(topic.Name)
1106 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001107 logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001108 return nil, err
1109 }
1110
1111 pConsumers := make([]sarama.PartitionConsumer, 0)
1112 for _, partition := range partitionList {
1113 var pConsumer sarama.PartitionConsumer
1114 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001115 logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001116 return nil, err
1117 }
1118 pConsumers = append(pConsumers, pConsumer)
1119 }
1120 return pConsumers, nil
1121}
1122
khenaidoo26721882021-08-11 17:42:52 -04001123func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
Scott Baker2c1c4822019-10-16 11:02:41 -07001124 var i int
khenaidoo26721882021-08-11 17:42:52 -04001125 var channel chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -07001126 for i, channel = range channels {
1127 if channel == ch {
1128 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1129 close(channel)
Neha Sharma94f16a92020-06-26 04:17:55 +00001130 logger.Debug(ctx, "channel-closed")
Scott Baker2c1c4822019-10-16 11:02:41 -07001131 return channels[:len(channels)-1]
1132 }
1133 }
1134 return channels
1135}
1136
1137func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1138 sc.lockOfGroupConsumers.Lock()
1139 defer sc.lockOfGroupConsumers.Unlock()
1140 if _, exist := sc.groupConsumers[topic]; !exist {
1141 sc.groupConsumers[topic] = consumer
1142 }
1143}
1144
Neha Sharma94f16a92020-06-26 04:17:55 +00001145func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -07001146 sc.lockOfGroupConsumers.Lock()
1147 defer sc.lockOfGroupConsumers.Unlock()
1148 if _, exist := sc.groupConsumers[topic]; exist {
1149 consumer := sc.groupConsumers[topic]
1150 delete(sc.groupConsumers, topic)
1151 if err := consumer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001152 logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001153 return err
1154 }
1155 }
1156 return nil
1157}
kesavandd85e52b2022-03-15 16:38:08 +05301158
1159func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error) {
1160
1161 config := sarama.NewConfig()
1162 client, err := sarama.NewClient([]string{sc.KafkaAddress}, config)
1163 if err != nil {
1164 logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
1165 return nil, err
1166 }
1167
1168 topics, err := client.Topics()
1169 if err != nil {
1170 logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
1171 return nil, err
1172 }
1173
1174 return topics, nil
1175}