blob: 680aa67c0514f21fe421a081ecb0eeb474ca33b7 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "strings"
23 "sync"
24 "time"
25
26 "github.com/Shopify/sarama"
27 scc "github.com/bsm/sarama-cluster"
28 "github.com/eapache/go-resiliency/breaker"
29 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
31 "github.com/opencord/voltha-lib-go/v7/pkg/log"
32)
33
34// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
35// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
36//consumer or a group consumer
37type consumerChannels struct {
38 consumers []interface{}
39 channels []chan proto.Message
40}
41
42// static check to ensure SaramaClient implements Client
43var _ Client = &SaramaClient{}
44
45// SaramaClient represents the messaging proxy
46type SaramaClient struct {
47 cAdmin sarama.ClusterAdmin
48 KafkaAddress string
49 producer sarama.AsyncProducer
50 consumer sarama.Consumer
51 groupConsumers map[string]*scc.Consumer
52 lockOfGroupConsumers sync.RWMutex
53 consumerGroupPrefix string
54 consumerType int
55 consumerGroupName string
56 producerFlushFrequency int
57 producerFlushMessages int
58 producerFlushMaxmessages int
59 producerRetryMax int
60 producerRetryBackOff time.Duration
61 producerReturnSuccess bool
62 producerReturnErrors bool
63 consumerMaxwait int
64 maxProcessingTime int
65 numPartitions int
66 numReplicas int
67 autoCreateTopic bool
68 doneCh chan int
69 metadataCallback func(fromTopic string, timestamp time.Time)
70 topicToConsumerChannelMap map[string]*consumerChannels
71 lockTopicToConsumerChannelMap sync.RWMutex
72 topicLockMap map[string]*sync.RWMutex
73 lockOfTopicLockMap sync.RWMutex
74 metadataMaxRetry int
75 alive bool
76 livenessMutex sync.Mutex
77 liveness chan bool
78 livenessChannelInterval time.Duration
79 lastLivenessTime time.Time
80 started bool
81 healthinessMutex sync.Mutex
82 healthy bool
83 healthiness chan bool
84}
85
86type SaramaClientOption func(*SaramaClient)
87
88func Address(address string) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaAddress = address
91 }
92}
93
94func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
190func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.livenessChannelInterval = opt
193 }
194}
195
196func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
197 client := &SaramaClient{
198 KafkaAddress: DefaultKafkaAddress,
199 }
200 client.consumerType = DefaultConsumerType
201 client.producerFlushFrequency = DefaultProducerFlushFrequency
202 client.producerFlushMessages = DefaultProducerFlushMessages
203 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
204 client.producerReturnErrors = DefaultProducerReturnErrors
205 client.producerReturnSuccess = DefaultProducerReturnSuccess
206 client.producerRetryMax = DefaultProducerRetryMax
207 client.producerRetryBackOff = DefaultProducerRetryBackoff
208 client.consumerMaxwait = DefaultConsumerMaxwait
209 client.maxProcessingTime = DefaultMaxProcessingTime
210 client.numPartitions = DefaultNumberPartitions
211 client.numReplicas = DefaultNumberReplicas
212 client.autoCreateTopic = DefaultAutoCreateTopic
213 client.metadataMaxRetry = DefaultMetadataMaxRetry
214 client.livenessChannelInterval = DefaultLivenessChannelInterval
215
216 for _, option := range opts {
217 option(client)
218 }
219
220 client.groupConsumers = make(map[string]*scc.Consumer)
221
222 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
223 client.topicLockMap = make(map[string]*sync.RWMutex)
224 client.lockOfTopicLockMap = sync.RWMutex{}
225 client.lockOfGroupConsumers = sync.RWMutex{}
226
227 // healthy and alive until proven otherwise
228 client.alive = true
229 client.healthy = true
230
231 return client
232}
233
234func (sc *SaramaClient) Start(ctx context.Context) error {
235 logger.Info(ctx, "Starting-kafka-sarama-client")
236
237 // Create the Done channel
238 sc.doneCh = make(chan int, 1)
239
240 var err error
241
242 // Add a cleanup in case of failure to startup
243 defer func() {
244 if err != nil {
245 sc.Stop(ctx)
246 }
247 }()
248
249 // Create the Cluster Admin
250 if err = sc.createClusterAdmin(ctx); err != nil {
251 logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
252 return err
253 }
254
255 // Create the Publisher
256 if err := sc.createPublisher(ctx); err != nil {
257 logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
258 return err
259 }
260
261 if sc.consumerType == DefaultConsumerType {
262 // Create the master consumers
263 if err := sc.createConsumer(ctx); err != nil {
264 logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
265 return err
266 }
267 }
268
269 // Create the topic to consumers/channel map
270 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
271
272 logger.Info(ctx, "kafka-sarama-client-started")
273
274 sc.started = true
275
276 return nil
277}
278
279func (sc *SaramaClient) Stop(ctx context.Context) {
280 logger.Info(ctx, "stopping-sarama-client")
281
282 sc.started = false
283
284 //Send a message over the done channel to close all long running routines
285 sc.doneCh <- 1
286
287 if sc.producer != nil {
288 if err := sc.producer.Close(); err != nil {
289 logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
290 }
291 }
292
293 if sc.consumer != nil {
294 if err := sc.consumer.Close(); err != nil {
295 logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
296 }
297 }
298
299 for key, val := range sc.groupConsumers {
300 logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
301 if err := val.Close(); err != nil {
302 logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
303 }
304 }
305
306 if sc.cAdmin != nil {
307 if err := sc.cAdmin.Close(); err != nil {
308 logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
309 }
310 }
311
312 //TODO: Clear the consumers map
313 //sc.clearConsumerChannelMap()
314
315 logger.Info(ctx, "sarama-client-stopped")
316}
317
318//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
319// the invoking function must hold the lock
320func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
321 // Set the topic details
322 topicDetail := &sarama.TopicDetail{}
323 topicDetail.NumPartitions = int32(numPartition)
324 topicDetail.ReplicationFactor = int16(repFactor)
325 topicDetail.ConfigEntries = make(map[string]*string)
326 topicDetails := make(map[string]*sarama.TopicDetail)
327 topicDetails[topic.Name] = topicDetail
328
329 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
kesavand92fac102022-03-16 12:33:06 +0530330 switch typedErr := err.(type) {
331 case *sarama.TopicError:
332 if typedErr.Err == sarama.ErrTopicAlreadyExists {
333 err = nil
334 }
khenaidood948f772021-08-11 17:49:24 -0400335 }
kesavand92fac102022-03-16 12:33:06 +0530336 if err != nil {
337 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
338 return err
339 }
khenaidood948f772021-08-11 17:49:24 -0400340 }
341 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
342 // do so.
343 logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
344 return nil
345}
346
347//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
348// ensure no two go routines are performing operations on the same topic
349func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
350 sc.lockTopic(topic)
351 defer sc.unLockTopic(topic)
352
353 return sc.createTopic(ctx, topic, numPartition, repFactor)
354}
355
356//DeleteTopic removes a topic from the kafka Broker
357func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
358 sc.lockTopic(topic)
359 defer sc.unLockTopic(topic)
360
361 // Remove the topic from the broker
362 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
363 if err == sarama.ErrUnknownTopicOrPartition {
364 // Not an error as does not exist
365 logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
366 return nil
367 }
368 logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
369 return err
370 }
371
372 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
373 if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
374 logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
375 return err
376 }
377 return nil
378}
379
380// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
381// messages from that topic
382func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
383 sc.lockTopic(topic)
384 defer sc.unLockTopic(topic)
385
386 logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
387
388 // If a consumers already exist for that topic then resuse it
389 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
390 logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
391 // Create a channel specific for that consumers and add it to the consumers channel map
392 ch := make(chan proto.Message)
393 sc.addChannelToConsumerChannelMap(ctx, topic, ch)
394 return ch, nil
395 }
396
397 // Register for the topic and set it up
398 var consumerListeningChannel chan proto.Message
399 var err error
400
401 // Use the consumerType option to figure out the type of consumer to launch
402 if sc.consumerType == PartitionConsumer {
403 if sc.autoCreateTopic {
404 if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
405 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
406 return nil, err
407 }
408 }
409 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
410 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
411 return nil, err
412 }
413 } else if sc.consumerType == GroupCustomer {
414 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
415 // does not consume from a precreated topic in some scenarios
416 //if sc.autoCreateTopic {
417 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
418 // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
419 // return nil, err
420 // }
421 //}
422 //groupId := sc.consumerGroupName
423 groupId := getGroupId(kvArgs...)
424 // Include the group prefix
425 if groupId != "" {
426 groupId = sc.consumerGroupPrefix + groupId
427 } else {
428 // Need to use a unique group Id per topic
429 groupId = sc.consumerGroupPrefix + topic.Name
430 }
431 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
432 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
433 return nil, err
434 }
435
436 } else {
437 logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
438 return nil, errors.New("unknown-consumer-type")
439 }
440
441 return consumerListeningChannel, nil
442}
443
444//UnSubscribe unsubscribe a consumer from a given topic
445func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
446 sc.lockTopic(topic)
447 defer sc.unLockTopic(topic)
448
449 logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
450 var err error
451 if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
452 logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
453 }
454 if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
455 logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
456 }
457 return err
458}
459
460func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
461 sc.metadataCallback = callback
462}
463
464func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
465 // Post a consistent stream of liveness data to the channel,
466 // so that in a live state, the core does not timeout and
467 // send a forced liveness message. Production of liveness
468 // events to the channel is rate-limited by livenessChannelInterval.
469 sc.livenessMutex.Lock()
470 defer sc.livenessMutex.Unlock()
471 if sc.liveness != nil {
472 if sc.alive != alive {
473 logger.Info(ctx, "update-liveness-channel-because-change")
474 sc.liveness <- alive
475 sc.lastLivenessTime = time.Now()
476 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
477 logger.Info(ctx, "update-liveness-channel-because-interval")
478 sc.liveness <- alive
479 sc.lastLivenessTime = time.Now()
480 }
481 }
482
483 // Only emit a log message when the state changes
484 if sc.alive != alive {
485 logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
486 sc.alive = alive
487 }
488}
489
490// Once unhealthy, we never go back
491func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
492 sc.healthy = false
493 sc.healthinessMutex.Lock()
494 defer sc.healthinessMutex.Unlock()
495 if sc.healthiness != nil {
496 logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
497 sc.healthiness <- sc.healthy
498 }
499}
500
501func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
502 // Sarama producers and consumers encapsulate the error inside
503 // a ProducerError or ConsumerError struct.
504 if prodError, ok := err.(*sarama.ProducerError); ok {
505 err = prodError.Err
506 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
507 err = consumerError.Err
508 }
509
510 // Sarama-Cluster will compose the error into a ClusterError struct,
511 // which we can't do a compare by reference. To handle that, we the
512 // best we can do is compare the error strings.
513
514 switch err.Error() {
515 case context.DeadlineExceeded.Error():
516 logger.Info(ctx, "is-liveness-error-timeout")
517 return true
518 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
519 logger.Info(ctx, "is-liveness-error-no-brokers")
520 return true
521 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
522 logger.Info(ctx, "is-liveness-error-shutting-down")
523 return true
524 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
525 logger.Info(ctx, "is-liveness-error-not-available")
526 return true
527 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
528 logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
529 return true
530 }
531
532 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
533 logger.Info(ctx, "is-liveness-error-connection-refused")
534 return true
535 }
536
537 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
538 logger.Info(ctx, "is-liveness-error-io-timeout")
539 return true
540 }
541
542 // Other errors shouldn't trigger a loss of liveness
543
544 logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
545
546 return false
547}
548
549// send formats and sends the request onto the kafka messaging bus.
550func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
551
552 // Assert message is a proto message
553 var protoMsg proto.Message
554 var ok bool
555 // ascertain the value interface type is a proto.Message
556 if protoMsg, ok = msg.(proto.Message); !ok {
557 logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
558 return fmt.Errorf("not-a-proto-msg-%s", msg)
559 }
560
561 var marshalled []byte
562 var err error
563 // Create the Sarama producer message
564 if marshalled, err = proto.Marshal(protoMsg); err != nil {
565 logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
566 return err
567 }
568 key := ""
569 if len(keys) > 0 {
570 key = keys[0] // Only the first key is relevant
571 }
572 kafkaMsg := &sarama.ProducerMessage{
573 Topic: topic.Name,
574 Key: sarama.StringEncoder(key),
575 Value: sarama.ByteEncoder(marshalled),
576 }
577
578 // Send message to kafka
579 sc.producer.Input() <- kafkaMsg
580 // Wait for result
581 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
582 select {
583 case ok := <-sc.producer.Successes():
584 logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
585 sc.updateLiveness(ctx, true)
586 case notOk := <-sc.producer.Errors():
587 logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
588 if sc.isLivenessError(ctx, notOk) {
589 sc.updateLiveness(ctx, false)
590 }
591 return notOk
592 }
593 return nil
594}
595
596// Enable the liveness monitor channel. This channel will report
597// a "true" or "false" on every publish, which indicates whether
598// or not the channel is still live. This channel is then picked up
599// by the service (i.e. rw_core / ro_core) to update readiness status
600// and/or take other actions.
601func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
602 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
603 if enable {
604 sc.livenessMutex.Lock()
605 defer sc.livenessMutex.Unlock()
606 if sc.liveness == nil {
607 logger.Info(ctx, "kafka-create-liveness-channel")
608 // At least 1, so we can immediately post to it without blocking
609 // Setting a bigger number (10) allows the monitor to fall behind
610 // without blocking others. The monitor shouldn't really fall
611 // behind...
612 sc.liveness = make(chan bool, 10)
613 // post initial state to the channel
614 sc.liveness <- sc.alive
615 }
616 } else {
617 // TODO: Think about whether we need the ability to turn off
618 // liveness monitoring
619 panic("Turning off liveness reporting is not supported")
620 }
621 return sc.liveness
622}
623
624// Enable the Healthiness monitor channel. This channel will report "false"
625// if the kafka consumers die, or some other problem occurs which is
626// catastrophic that would require re-creating the client.
627func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
628 logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
629 if enable {
630 sc.healthinessMutex.Lock()
631 defer sc.healthinessMutex.Unlock()
632 if sc.healthiness == nil {
633 logger.Info(ctx, "kafka-create-healthiness-channel")
634 // At least 1, so we can immediately post to it without blocking
635 // Setting a bigger number (10) allows the monitor to fall behind
636 // without blocking others. The monitor shouldn't really fall
637 // behind...
638 sc.healthiness = make(chan bool, 10)
639 // post initial state to the channel
640 sc.healthiness <- sc.healthy
641 }
642 } else {
643 // TODO: Think about whether we need the ability to turn off
644 // liveness monitoring
645 panic("Turning off healthiness reporting is not supported")
646 }
647 return sc.healthiness
648}
649
650// send an empty message on the liveness channel to check whether connectivity has
651// been restored.
652func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
653 if !sc.started {
654 return fmt.Errorf("SendLiveness() called while not started")
655 }
656
657 kafkaMsg := &sarama.ProducerMessage{
658 Topic: "_liveness_test",
659 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
660 }
661
662 // Send message to kafka
663 sc.producer.Input() <- kafkaMsg
664 // Wait for result
665 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
666 select {
667 case ok := <-sc.producer.Successes():
668 logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
669 sc.updateLiveness(ctx, true)
670 case notOk := <-sc.producer.Errors():
671 logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
672 if sc.isLivenessError(ctx, notOk) {
673 sc.updateLiveness(ctx, false)
674 }
675 return notOk
676 }
677 return nil
678}
679
680// getGroupId returns the group id from the key-value args.
681func getGroupId(kvArgs ...*KVArg) string {
682 for _, arg := range kvArgs {
683 if arg.Key == GroupIdKey {
684 return arg.Value.(string)
685 }
686 }
687 return ""
688}
689
690// getOffset returns the offset from the key-value args.
691func getOffset(kvArgs ...*KVArg) int64 {
692 for _, arg := range kvArgs {
693 if arg.Key == Offset {
694 return arg.Value.(int64)
695 }
696 }
697 return sarama.OffsetNewest
698}
699
700func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
701 config := sarama.NewConfig()
702 config.Version = sarama.V1_0_0_0
703
704 // Create a cluster Admin
705 var cAdmin sarama.ClusterAdmin
706 var err error
707 if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
708 logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
709 return err
710 }
711 sc.cAdmin = cAdmin
712 return nil
713}
714
715func (sc *SaramaClient) lockTopic(topic *Topic) {
716 sc.lockOfTopicLockMap.Lock()
717 if _, exist := sc.topicLockMap[topic.Name]; exist {
718 sc.lockOfTopicLockMap.Unlock()
719 sc.topicLockMap[topic.Name].Lock()
720 } else {
721 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
722 sc.lockOfTopicLockMap.Unlock()
723 sc.topicLockMap[topic.Name].Lock()
724 }
725}
726
727func (sc *SaramaClient) unLockTopic(topic *Topic) {
728 sc.lockOfTopicLockMap.Lock()
729 defer sc.lockOfTopicLockMap.Unlock()
730 if _, exist := sc.topicLockMap[topic.Name]; exist {
731 sc.topicLockMap[topic.Name].Unlock()
732 }
733}
734
735func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
739 sc.topicToConsumerChannelMap[id] = arg
740 }
741}
742
743func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
744 sc.lockTopicToConsumerChannelMap.RLock()
745 defer sc.lockTopicToConsumerChannelMap.RUnlock()
746
747 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
748 return consumerCh
749 }
750 return nil
751}
752
753func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
754 sc.lockTopicToConsumerChannelMap.Lock()
755 defer sc.lockTopicToConsumerChannelMap.Unlock()
756 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
757 consumerCh.channels = append(consumerCh.channels, ch)
758 return
759 }
760 logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
761}
762
763//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
764func closeConsumers(ctx context.Context, consumers []interface{}) error {
765 var err error
766 for _, consumer := range consumers {
767 // Is it a partition consumers?
768 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
769 if errTemp := partionConsumer.Close(); errTemp != nil {
770 logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
771 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
772 // This can occur on race condition
773 err = nil
774 } else {
775 err = errTemp
776 }
777 }
778 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
779 if errTemp := groupConsumer.Close(); errTemp != nil {
780 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
781 // This can occur on race condition
782 err = nil
783 } else {
784 err = errTemp
785 }
786 }
787 }
788 }
789 return err
790}
791
792func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
793 sc.lockTopicToConsumerChannelMap.Lock()
794 defer sc.lockTopicToConsumerChannelMap.Unlock()
795 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
796 // Channel will be closed in the removeChannel method
797 consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
798 // If there are no more channels then we can close the consumers itself
799 if len(consumerCh.channels) == 0 {
800 logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
801 err := closeConsumers(ctx, consumerCh.consumers)
802 //err := consumerCh.consumers.Close()
803 delete(sc.topicToConsumerChannelMap, topic.Name)
804 return err
805 }
806 return nil
807 }
808 logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
809 return errors.New("topic-does-not-exist")
810}
811
812func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
813 sc.lockTopicToConsumerChannelMap.Lock()
814 defer sc.lockTopicToConsumerChannelMap.Unlock()
815 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
816 for _, ch := range consumerCh.channels {
817 // Channel will be closed in the removeChannel method
818 removeChannel(ctx, consumerCh.channels, ch)
819 }
820 err := closeConsumers(ctx, consumerCh.consumers)
821 //if err == sarama.ErrUnknownTopicOrPartition {
822 // // Not an error
823 // err = nil
824 //}
825 //err := consumerCh.consumers.Close()
826 delete(sc.topicToConsumerChannelMap, topic.Name)
827 return err
828 }
829 logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
830 return nil
831}
832
833//createPublisher creates the publisher which is used to send a message onto kafka
834func (sc *SaramaClient) createPublisher(ctx context.Context) error {
835 // This Creates the publisher
836 config := sarama.NewConfig()
837 config.Version = sarama.V1_0_0_0
kesavand92fac102022-03-16 12:33:06 +0530838 config.Producer.Partitioner = sarama.NewHashPartitioner
khenaidood948f772021-08-11 17:49:24 -0400839 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
840 config.Producer.Flush.Messages = sc.producerFlushMessages
841 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
842 config.Producer.Return.Errors = sc.producerReturnErrors
843 config.Producer.Return.Successes = sc.producerReturnSuccess
844 //config.Producer.RequiredAcks = sarama.WaitForAll
845 config.Producer.RequiredAcks = sarama.WaitForLocal
846
847 brokers := []string{sc.KafkaAddress}
848
849 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
850 logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
851 return err
852 } else {
853 sc.producer = producer
854 }
855 logger.Info(ctx, "Kafka-publisher-created")
856 return nil
857}
858
859func (sc *SaramaClient) createConsumer(ctx context.Context) error {
860 config := sarama.NewConfig()
861 config.Version = sarama.V1_0_0_0
862 config.Consumer.Return.Errors = true
863 config.Consumer.Fetch.Min = 1
864 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
865 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
866 config.Consumer.Offsets.Initial = sarama.OffsetNewest
867 config.Metadata.Retry.Max = sc.metadataMaxRetry
868 brokers := []string{sc.KafkaAddress}
869
870 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
871 logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
872 return err
873 } else {
874 sc.consumer = consumer
875 }
876 logger.Info(ctx, "Kafka-consumers-created")
877 return nil
878}
879
880// createGroupConsumer creates a consumers group
881func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
882 config := scc.NewConfig()
883 config.Version = sarama.V1_0_0_0
884 config.ClientID = uuid.New().String()
885 config.Group.Mode = scc.ConsumerModeMultiplex
886 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
887 config.Consumer.Return.Errors = true
888 //config.Group.Return.Notifications = false
889 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
890 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
891 config.Consumer.Offsets.Initial = initialOffset
892 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
893 brokers := []string{sc.KafkaAddress}
894
895 topics := []string{topic.Name}
896 var consumer *scc.Consumer
897 var err error
898
899 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
900 logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
901 return nil, err
902 }
903 logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
904
905 //sc.groupConsumers[topic.Name] = consumer
906 sc.addToGroupConsumers(topic.Name, consumer)
907 return consumer, nil
908}
909
910// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
911// topic via the unique channel each subscriber received during subscription
912func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
913 // Need to go over all channels and publish messages to them - do we need to copy msg?
914 sc.lockTopicToConsumerChannelMap.RLock()
915 for _, ch := range consumerCh.channels {
916 go func(c chan proto.Message) {
917 c <- protoMessage
918 }(ch)
919 }
920 sc.lockTopicToConsumerChannelMap.RUnlock()
921
922 if callback := sc.metadataCallback; callback != nil {
923 callback(fromTopic, ts)
924 }
925}
926
927func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
928 logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
929startloop:
930 for {
931 select {
932 case err, ok := <-consumer.Errors():
933 if ok {
934 if sc.isLivenessError(ctx, err) {
935 sc.updateLiveness(ctx, false)
936 logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
937 }
938 } else {
939 // Channel is closed
940 break startloop
941 }
942 case msg, ok := <-consumer.Messages():
943 //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
944 if !ok {
945 // channel is closed
946 break startloop
947 }
948 msgBody := msg.Value
949 sc.updateLiveness(ctx, true)
950 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
951 var protoMsg proto.Message
952 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
953 logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
954 continue
955 }
956 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
957 case <-sc.doneCh:
958 logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
959 break startloop
960 }
961 }
962 logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
963 sc.setUnhealthy(ctx)
964}
965
966func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
967 logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
968
969startloop:
970 for {
971 select {
972 case err, ok := <-consumer.Errors():
973 if ok {
974 if sc.isLivenessError(ctx, err) {
975 sc.updateLiveness(ctx, false)
976 }
977 logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
978 } else {
979 logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
980 // channel is closed
981 break startloop
982 }
983 case msg, ok := <-consumer.Messages():
984 if !ok {
985 logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
986 // Channel closed
987 break startloop
988 }
989 sc.updateLiveness(ctx, true)
990 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
991 msgBody := msg.Value
992 var protoMsg proto.Message
993 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
994 logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
995 continue
996 }
997 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
998 consumer.MarkOffset(msg, "")
999 case ntf := <-consumer.Notifications():
1000 logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
1001 case <-sc.doneCh:
1002 logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
1003 break startloop
1004 }
1005 }
1006 logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
1007 sc.setUnhealthy(ctx)
1008}
1009
1010func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
1011 logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
1012 var consumerCh *consumerChannels
1013 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
1014 logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
1015 return errors.New("consumers-not-exist")
1016 }
1017 // For each consumer listening for that topic, start a consumption loop
1018 for _, consumer := range consumerCh.consumers {
1019 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1020 go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
1021 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1022 go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
1023 } else {
1024 logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
1025 return errors.New("invalid-consumer")
1026 }
1027 }
1028 return nil
1029}
1030
1031//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1032//// for that topic. It also starts the routine that listens for messages on that topic.
1033func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
1034 var pConsumers []sarama.PartitionConsumer
1035 var err error
1036
1037 if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
1038 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1039 return nil, err
1040 }
1041
1042 consumersIf := make([]interface{}, 0)
1043 for _, pConsumer := range pConsumers {
1044 consumersIf = append(consumersIf, pConsumer)
1045 }
1046
1047 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1048 // unbuffered to verify race conditions.
1049 consumerListeningChannel := make(chan proto.Message)
1050 cc := &consumerChannels{
1051 consumers: consumersIf,
1052 channels: []chan proto.Message{consumerListeningChannel},
1053 }
1054
1055 // Add the consumers channel to the map
1056 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1057
1058 //Start a consumers to listen on that specific topic
1059 go func() {
1060 if err := sc.startConsumers(ctx, topic); err != nil {
1061 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
1062 "topic": topic,
1063 "error": err})
1064 }
1065 }()
1066
1067 return consumerListeningChannel, nil
1068}
1069
1070// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1071// for that topic. It also starts the routine that listens for messages on that topic.
1072func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
1073 // TODO: Replace this development partition consumers with a group consumers
1074 var pConsumer *scc.Consumer
1075 var err error
1076 if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1077 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1078 return nil, err
1079 }
1080 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1081 // unbuffered to verify race conditions.
1082 consumerListeningChannel := make(chan proto.Message)
1083 cc := &consumerChannels{
1084 consumers: []interface{}{pConsumer},
1085 channels: []chan proto.Message{consumerListeningChannel},
1086 }
1087
1088 // Add the consumers channel to the map
1089 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1090
1091 //Start a consumers to listen on that specific topic
1092 go func() {
1093 if err := sc.startConsumers(ctx, topic); err != nil {
1094 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
1095 "topic": topic,
1096 "error": err})
1097 }
1098 }()
1099
1100 return consumerListeningChannel, nil
1101}
1102
1103func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1104 logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
1105 partitionList, err := sc.consumer.Partitions(topic.Name)
1106 if err != nil {
1107 logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1108 return nil, err
1109 }
1110
1111 pConsumers := make([]sarama.PartitionConsumer, 0)
1112 for _, partition := range partitionList {
1113 var pConsumer sarama.PartitionConsumer
1114 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1115 logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1116 return nil, err
1117 }
1118 pConsumers = append(pConsumers, pConsumer)
1119 }
1120 return pConsumers, nil
1121}
1122
1123func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
1124 var i int
1125 var channel chan proto.Message
1126 for i, channel = range channels {
1127 if channel == ch {
1128 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1129 close(channel)
1130 logger.Debug(ctx, "channel-closed")
1131 return channels[:len(channels)-1]
1132 }
1133 }
1134 return channels
1135}
1136
1137func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1138 sc.lockOfGroupConsumers.Lock()
1139 defer sc.lockOfGroupConsumers.Unlock()
1140 if _, exist := sc.groupConsumers[topic]; !exist {
1141 sc.groupConsumers[topic] = consumer
1142 }
1143}
1144
1145func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
1146 sc.lockOfGroupConsumers.Lock()
1147 defer sc.lockOfGroupConsumers.Unlock()
1148 if _, exist := sc.groupConsumers[topic]; exist {
1149 consumer := sc.groupConsumers[topic]
1150 delete(sc.groupConsumers, topic)
1151 if err := consumer.Close(); err != nil {
1152 logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
1153 return err
1154 }
1155 }
1156 return nil
1157}
kesavand92fac102022-03-16 12:33:06 +05301158
1159func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error) {
1160
1161 config := sarama.NewConfig()
1162 client, err := sarama.NewClient([]string{sc.KafkaAddress}, config)
1163 if err != nil {
1164 logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
1165 return nil, err
1166 }
1167
1168 topics, err := client.Topics()
1169 if err != nil {
1170 logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
1171 return nil, err
1172 }
1173
1174 return topics, nil
1175}