blob: c05df69a6c6e2b719643f000b8482bd3a58ca3a5 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080019 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070020 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 scc "github.com/bsm/sarama-cluster"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080024 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070025 "github.com/golang/protobuf/proto"
26 "github.com/google/uuid"
Scott Bakerce767002019-10-23 13:30:24 -070027 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Bakerf1b096c2019-11-01 12:36:30 -070028 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070029 "strings"
30 "sync"
31 "time"
32)
33
34func init() {
35 log.AddPackage(log.JSON, log.DebugLevel, nil)
36}
37
38type returnErrorFunction func() error
39
40// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
41// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
42//consumer or a group consumer
43type consumerChannels struct {
44 consumers []interface{}
45 channels []chan *ic.InterContainerMessage
46}
47
48// SaramaClient represents the messaging proxy
49type SaramaClient struct {
50 cAdmin sarama.ClusterAdmin
51 client sarama.Client
52 KafkaHost string
53 KafkaPort int
54 producer sarama.AsyncProducer
55 consumer sarama.Consumer
56 groupConsumers map[string]*scc.Consumer
57 lockOfGroupConsumers sync.RWMutex
58 consumerGroupPrefix string
59 consumerType int
60 consumerGroupName string
61 producerFlushFrequency int
62 producerFlushMessages int
63 producerFlushMaxmessages int
64 producerRetryMax int
65 producerRetryBackOff time.Duration
66 producerReturnSuccess bool
67 producerReturnErrors bool
68 consumerMaxwait int
69 maxProcessingTime int
70 numPartitions int
71 numReplicas int
72 autoCreateTopic bool
73 doneCh chan int
74 topicToConsumerChannelMap map[string]*consumerChannels
75 lockTopicToConsumerChannelMap sync.RWMutex
76 topicLockMap map[string]*sync.RWMutex
77 lockOfTopicLockMap sync.RWMutex
78 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070079 alive bool
80 liveness chan bool
81 livenessChannelInterval time.Duration
82 lastLivenessTime time.Time
83 started bool
Scott Baker0fef6982019-12-12 09:49:42 -080084 healthy bool
85 healthiness chan bool
Scott Baker2c1c4822019-10-16 11:02:41 -070086}
87
88type SaramaClientOption func(*SaramaClient)
89
90func Host(host string) SaramaClientOption {
91 return func(args *SaramaClient) {
92 args.KafkaHost = host
93 }
94}
95
96func Port(port int) SaramaClientOption {
97 return func(args *SaramaClient) {
98 args.KafkaPort = port
99 }
100}
101
102func ConsumerGroupPrefix(prefix string) SaramaClientOption {
103 return func(args *SaramaClient) {
104 args.consumerGroupPrefix = prefix
105 }
106}
107
108func ConsumerGroupName(name string) SaramaClientOption {
109 return func(args *SaramaClient) {
110 args.consumerGroupName = name
111 }
112}
113
114func ConsumerType(consumer int) SaramaClientOption {
115 return func(args *SaramaClient) {
116 args.consumerType = consumer
117 }
118}
119
120func ProducerFlushFrequency(frequency int) SaramaClientOption {
121 return func(args *SaramaClient) {
122 args.producerFlushFrequency = frequency
123 }
124}
125
126func ProducerFlushMessages(num int) SaramaClientOption {
127 return func(args *SaramaClient) {
128 args.producerFlushMessages = num
129 }
130}
131
132func ProducerFlushMaxMessages(num int) SaramaClientOption {
133 return func(args *SaramaClient) {
134 args.producerFlushMaxmessages = num
135 }
136}
137
138func ProducerMaxRetries(num int) SaramaClientOption {
139 return func(args *SaramaClient) {
140 args.producerRetryMax = num
141 }
142}
143
144func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
145 return func(args *SaramaClient) {
146 args.producerRetryBackOff = duration
147 }
148}
149
150func ProducerReturnOnErrors(opt bool) SaramaClientOption {
151 return func(args *SaramaClient) {
152 args.producerReturnErrors = opt
153 }
154}
155
156func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
157 return func(args *SaramaClient) {
158 args.producerReturnSuccess = opt
159 }
160}
161
162func ConsumerMaxWait(wait int) SaramaClientOption {
163 return func(args *SaramaClient) {
164 args.consumerMaxwait = wait
165 }
166}
167
168func MaxProcessingTime(pTime int) SaramaClientOption {
169 return func(args *SaramaClient) {
170 args.maxProcessingTime = pTime
171 }
172}
173
174func NumPartitions(number int) SaramaClientOption {
175 return func(args *SaramaClient) {
176 args.numPartitions = number
177 }
178}
179
180func NumReplicas(number int) SaramaClientOption {
181 return func(args *SaramaClient) {
182 args.numReplicas = number
183 }
184}
185
186func AutoCreateTopic(opt bool) SaramaClientOption {
187 return func(args *SaramaClient) {
188 args.autoCreateTopic = opt
189 }
190}
191
192func MetadatMaxRetries(retry int) SaramaClientOption {
193 return func(args *SaramaClient) {
194 args.metadataMaxRetry = retry
195 }
196}
197
Scott Baker104b67d2019-10-29 15:56:27 -0700198func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
199 return func(args *SaramaClient) {
200 args.livenessChannelInterval = opt
201 }
202}
203
Scott Baker2c1c4822019-10-16 11:02:41 -0700204func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
205 client := &SaramaClient{
206 KafkaHost: DefaultKafkaHost,
207 KafkaPort: DefaultKafkaPort,
208 }
209 client.consumerType = DefaultConsumerType
210 client.producerFlushFrequency = DefaultProducerFlushFrequency
211 client.producerFlushMessages = DefaultProducerFlushMessages
212 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
213 client.producerReturnErrors = DefaultProducerReturnErrors
214 client.producerReturnSuccess = DefaultProducerReturnSuccess
215 client.producerRetryMax = DefaultProducerRetryMax
216 client.producerRetryBackOff = DefaultProducerRetryBackoff
217 client.consumerMaxwait = DefaultConsumerMaxwait
218 client.maxProcessingTime = DefaultMaxProcessingTime
219 client.numPartitions = DefaultNumberPartitions
220 client.numReplicas = DefaultNumberReplicas
221 client.autoCreateTopic = DefaultAutoCreateTopic
222 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700223 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700224
225 for _, option := range opts {
226 option(client)
227 }
228
229 client.groupConsumers = make(map[string]*scc.Consumer)
230
231 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
232 client.topicLockMap = make(map[string]*sync.RWMutex)
233 client.lockOfTopicLockMap = sync.RWMutex{}
234 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700235
Scott Baker0fef6982019-12-12 09:49:42 -0800236 // healthy and alive until proven otherwise
Scott Baker104b67d2019-10-29 15:56:27 -0700237 client.alive = true
Scott Baker0fef6982019-12-12 09:49:42 -0800238 client.healthy = true
Scott Baker104b67d2019-10-29 15:56:27 -0700239
Scott Baker2c1c4822019-10-16 11:02:41 -0700240 return client
241}
242
243func (sc *SaramaClient) Start() error {
244 log.Info("Starting-kafka-sarama-client")
245
246 // Create the Done channel
247 sc.doneCh = make(chan int, 1)
248
249 var err error
250
251 // Add a cleanup in case of failure to startup
252 defer func() {
253 if err != nil {
254 sc.Stop()
255 }
256 }()
257
258 // Create the Cluster Admin
259 if err = sc.createClusterAdmin(); err != nil {
260 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
261 return err
262 }
263
264 // Create the Publisher
265 if err := sc.createPublisher(); err != nil {
266 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
267 return err
268 }
269
270 if sc.consumerType == DefaultConsumerType {
271 // Create the master consumers
272 if err := sc.createConsumer(); err != nil {
273 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
274 return err
275 }
276 }
277
278 // Create the topic to consumers/channel map
279 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
280
281 log.Info("kafka-sarama-client-started")
282
Scott Baker104b67d2019-10-29 15:56:27 -0700283 sc.started = true
284
Scott Baker2c1c4822019-10-16 11:02:41 -0700285 return nil
286}
287
288func (sc *SaramaClient) Stop() {
289 log.Info("stopping-sarama-client")
290
Scott Baker104b67d2019-10-29 15:56:27 -0700291 sc.started = false
292
Scott Baker2c1c4822019-10-16 11:02:41 -0700293 //Send a message over the done channel to close all long running routines
294 sc.doneCh <- 1
295
296 if sc.producer != nil {
297 if err := sc.producer.Close(); err != nil {
298 log.Errorw("closing-producer-failed", log.Fields{"error": err})
299 }
300 }
301
302 if sc.consumer != nil {
303 if err := sc.consumer.Close(); err != nil {
304 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
305 }
306 }
307
308 for key, val := range sc.groupConsumers {
309 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
310 if err := val.Close(); err != nil {
311 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
312 }
313 }
314
315 if sc.cAdmin != nil {
316 if err := sc.cAdmin.Close(); err != nil {
317 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
318 }
319 }
320
321 //TODO: Clear the consumers map
322 //sc.clearConsumerChannelMap()
323
324 log.Info("sarama-client-stopped")
325}
326
327//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
328// the invoking function must hold the lock
329func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
330 // Set the topic details
331 topicDetail := &sarama.TopicDetail{}
332 topicDetail.NumPartitions = int32(numPartition)
333 topicDetail.ReplicationFactor = int16(repFactor)
334 topicDetail.ConfigEntries = make(map[string]*string)
335 topicDetails := make(map[string]*sarama.TopicDetail)
336 topicDetails[topic.Name] = topicDetail
337
338 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
339 if err == sarama.ErrTopicAlreadyExists {
340 // Not an error
341 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
342 return nil
343 }
344 log.Errorw("create-topic-failure", log.Fields{"error": err})
345 return err
346 }
347 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
348 // do so.
349 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
350 return nil
351}
352
353//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
354// ensure no two go routines are performing operations on the same topic
355func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
356 sc.lockTopic(topic)
357 defer sc.unLockTopic(topic)
358
359 return sc.createTopic(topic, numPartition, repFactor)
360}
361
362//DeleteTopic removes a topic from the kafka Broker
363func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
364 sc.lockTopic(topic)
365 defer sc.unLockTopic(topic)
366
367 // Remove the topic from the broker
368 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
369 if err == sarama.ErrUnknownTopicOrPartition {
370 // Not an error as does not exist
371 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
372 return nil
373 }
374 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
375 return err
376 }
377
378 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
379 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
380 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
381 return err
382 }
383 return nil
384}
385
386// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
387// messages from that topic
388func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
389 sc.lockTopic(topic)
390 defer sc.unLockTopic(topic)
391
392 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
393
394 // If a consumers already exist for that topic then resuse it
395 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
396 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
397 // Create a channel specific for that consumers and add it to the consumers channel map
398 ch := make(chan *ic.InterContainerMessage)
399 sc.addChannelToConsumerChannelMap(topic, ch)
400 return ch, nil
401 }
402
403 // Register for the topic and set it up
404 var consumerListeningChannel chan *ic.InterContainerMessage
405 var err error
406
407 // Use the consumerType option to figure out the type of consumer to launch
408 if sc.consumerType == PartitionConsumer {
409 if sc.autoCreateTopic {
410 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
411 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
412 return nil, err
413 }
414 }
415 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
416 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
417 return nil, err
418 }
419 } else if sc.consumerType == GroupCustomer {
420 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
421 // does not consume from a precreated topic in some scenarios
422 //if sc.autoCreateTopic {
423 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
424 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
425 // return nil, err
426 // }
427 //}
428 //groupId := sc.consumerGroupName
429 groupId := getGroupId(kvArgs...)
430 // Include the group prefix
431 if groupId != "" {
432 groupId = sc.consumerGroupPrefix + groupId
433 } else {
434 // Need to use a unique group Id per topic
435 groupId = sc.consumerGroupPrefix + topic.Name
436 }
437 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
438 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
439 return nil, err
440 }
441
442 } else {
443 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
444 return nil, errors.New("unknown-consumer-type")
445 }
446
447 return consumerListeningChannel, nil
448}
449
450//UnSubscribe unsubscribe a consumer from a given topic
451func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
452 sc.lockTopic(topic)
453 defer sc.unLockTopic(topic)
454
455 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
456 var err error
457 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
458 log.Errorw("failed-removing-channel", log.Fields{"error": err})
459 }
460 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
461 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
462 }
463 return err
464}
465
Scott Baker104b67d2019-10-29 15:56:27 -0700466func (sc *SaramaClient) updateLiveness(alive bool) {
467 // Post a consistent stream of liveness data to the channel,
468 // so that in a live state, the core does not timeout and
469 // send a forced liveness message. Production of liveness
470 // events to the channel is rate-limited by livenessChannelInterval.
471 if sc.liveness != nil {
472 if sc.alive != alive {
473 log.Info("update-liveness-channel-because-change")
474 sc.liveness <- alive
475 sc.lastLivenessTime = time.Now()
476 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
477 log.Info("update-liveness-channel-because-interval")
478 sc.liveness <- alive
479 sc.lastLivenessTime = time.Now()
480 }
481 }
482
483 // Only emit a log message when the state changes
484 if sc.alive != alive {
485 log.Info("set-client-alive", log.Fields{"alive": alive})
486 sc.alive = alive
487 }
488}
489
Scott Baker0fef6982019-12-12 09:49:42 -0800490// Once unhealthy, we never go back
491func (sc *SaramaClient) setUnhealthy() {
492 sc.healthy = false
493 if sc.healthiness != nil {
494 log.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
495 sc.healthiness <- sc.healthy
496 }
497}
498
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800499func (sc *SaramaClient) isLivenessError(err error) bool {
500 // Sarama producers and consumers encapsulate the error inside
501 // a ProducerError or ConsumerError struct.
502 if prodError, ok := err.(*sarama.ProducerError); ok {
503 err = prodError.Err
504 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
505 err = consumerError.Err
506 }
507
508 // Sarama-Cluster will compose the error into a ClusterError struct,
509 // which we can't do a compare by reference. To handle that, we the
510 // best we can do is compare the error strings.
511
512 switch err.Error() {
513 case context.DeadlineExceeded.Error():
514 log.Info("is-liveness-error-timeout")
515 return true
516 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
517 log.Info("is-liveness-error-no-brokers")
518 return true
519 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
520 log.Info("is-liveness-error-shutting-down")
521 return true
522 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
523 log.Info("is-liveness-error-not-available")
524 return true
525 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
526 log.Info("is-liveness-error-circuit-breaker-open")
527 return true
528 }
529
530 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
531 log.Info("is-liveness-error-connection-refused")
532 return true
533 }
534
Scott Baker718bee02020-01-07 09:52:02 -0800535 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
536 log.Info("is-liveness-error-io-timeout")
537 return true
538 }
539
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800540 // Other errors shouldn't trigger a loss of liveness
541
542 log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
543
544 return false
545}
546
Scott Baker2c1c4822019-10-16 11:02:41 -0700547// send formats and sends the request onto the kafka messaging bus.
548func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
549
550 // Assert message is a proto message
551 var protoMsg proto.Message
552 var ok bool
553 // ascertain the value interface type is a proto.Message
554 if protoMsg, ok = msg.(proto.Message); !ok {
555 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
556 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
557 }
558
559 var marshalled []byte
560 var err error
561 // Create the Sarama producer message
562 if marshalled, err = proto.Marshal(protoMsg); err != nil {
563 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
564 return err
565 }
566 key := ""
567 if len(keys) > 0 {
568 key = keys[0] // Only the first key is relevant
569 }
570 kafkaMsg := &sarama.ProducerMessage{
571 Topic: topic.Name,
572 Key: sarama.StringEncoder(key),
573 Value: sarama.ByteEncoder(marshalled),
574 }
575
576 // Send message to kafka
577 sc.producer.Input() <- kafkaMsg
578 // Wait for result
579 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
580 select {
581 case ok := <-sc.producer.Successes():
582 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700583 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700584 case notOk := <-sc.producer.Errors():
585 log.Debugw("error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800586 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700587 sc.updateLiveness(false)
588 }
589 return notOk
590 }
591 return nil
592}
593
594// Enable the liveness monitor channel. This channel will report
595// a "true" or "false" on every publish, which indicates whether
596// or not the channel is still live. This channel is then picked up
597// by the service (i.e. rw_core / ro_core) to update readiness status
598// and/or take other actions.
599func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
600 log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
601 if enable {
602 if sc.liveness == nil {
603 log.Info("kafka-create-liveness-channel")
604 // At least 1, so we can immediately post to it without blocking
605 // Setting a bigger number (10) allows the monitor to fall behind
606 // without blocking others. The monitor shouldn't really fall
607 // behind...
608 sc.liveness = make(chan bool, 10)
609 // post intial state to the channel
610 sc.liveness <- sc.alive
611 }
612 } else {
613 // TODO: Think about whether we need the ability to turn off
614 // liveness monitoring
615 panic("Turning off liveness reporting is not supported")
616 }
617 return sc.liveness
618}
619
Scott Baker0fef6982019-12-12 09:49:42 -0800620// Enable the Healthiness monitor channel. This channel will report "false"
621// if the kafka consumers die, or some other problem occurs which is
622// catastrophic that would require re-creating the client.
623func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
624 log.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
625 if enable {
626 if sc.healthiness == nil {
627 log.Info("kafka-create-healthiness-channel")
628 // At least 1, so we can immediately post to it without blocking
629 // Setting a bigger number (10) allows the monitor to fall behind
630 // without blocking others. The monitor shouldn't really fall
631 // behind...
632 sc.healthiness = make(chan bool, 10)
633 // post intial state to the channel
634 sc.healthiness <- sc.healthy
635 }
636 } else {
637 // TODO: Think about whether we need the ability to turn off
638 // liveness monitoring
639 panic("Turning off healthiness reporting is not supported")
640 }
641 return sc.healthiness
642}
643
Scott Baker104b67d2019-10-29 15:56:27 -0700644// send an empty message on the liveness channel to check whether connectivity has
645// been restored.
646func (sc *SaramaClient) SendLiveness() error {
647 if !sc.started {
648 return fmt.Errorf("SendLiveness() called while not started")
649 }
650
651 kafkaMsg := &sarama.ProducerMessage{
652 Topic: "_liveness_test",
653 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
654 }
655
656 // Send message to kafka
657 sc.producer.Input() <- kafkaMsg
658 // Wait for result
659 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
660 select {
661 case ok := <-sc.producer.Successes():
662 log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
663 sc.updateLiveness(true)
664 case notOk := <-sc.producer.Errors():
665 log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800666 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700667 sc.updateLiveness(false)
668 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700669 return notOk
670 }
671 return nil
672}
673
674// getGroupId returns the group id from the key-value args.
675func getGroupId(kvArgs ...*KVArg) string {
676 for _, arg := range kvArgs {
677 if arg.Key == GroupIdKey {
678 return arg.Value.(string)
679 }
680 }
681 return ""
682}
683
684// getOffset returns the offset from the key-value args.
685func getOffset(kvArgs ...*KVArg) int64 {
686 for _, arg := range kvArgs {
687 if arg.Key == Offset {
688 return arg.Value.(int64)
689 }
690 }
691 return sarama.OffsetNewest
692}
693
694func (sc *SaramaClient) createClusterAdmin() error {
695 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
696 config := sarama.NewConfig()
697 config.Version = sarama.V1_0_0_0
698
699 // Create a cluster Admin
700 var cAdmin sarama.ClusterAdmin
701 var err error
702 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
703 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
704 return err
705 }
706 sc.cAdmin = cAdmin
707 return nil
708}
709
710func (sc *SaramaClient) lockTopic(topic *Topic) {
711 sc.lockOfTopicLockMap.Lock()
712 if _, exist := sc.topicLockMap[topic.Name]; exist {
713 sc.lockOfTopicLockMap.Unlock()
714 sc.topicLockMap[topic.Name].Lock()
715 } else {
716 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
717 sc.lockOfTopicLockMap.Unlock()
718 sc.topicLockMap[topic.Name].Lock()
719 }
720}
721
722func (sc *SaramaClient) unLockTopic(topic *Topic) {
723 sc.lockOfTopicLockMap.Lock()
724 defer sc.lockOfTopicLockMap.Unlock()
725 if _, exist := sc.topicLockMap[topic.Name]; exist {
726 sc.topicLockMap[topic.Name].Unlock()
727 }
728}
729
730func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
731 sc.lockTopicToConsumerChannelMap.Lock()
732 defer sc.lockTopicToConsumerChannelMap.Unlock()
733 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
734 sc.topicToConsumerChannelMap[id] = arg
735 }
736}
737
738func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
739 sc.lockTopicToConsumerChannelMap.Lock()
740 defer sc.lockTopicToConsumerChannelMap.Unlock()
741 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
742 delete(sc.topicToConsumerChannelMap, id)
743 }
744}
745
746func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
747 sc.lockTopicToConsumerChannelMap.RLock()
748 defer sc.lockTopicToConsumerChannelMap.RUnlock()
749
750 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
751 return consumerCh
752 }
753 return nil
754}
755
756func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
757 sc.lockTopicToConsumerChannelMap.Lock()
758 defer sc.lockTopicToConsumerChannelMap.Unlock()
759 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
760 consumerCh.channels = append(consumerCh.channels, ch)
761 return
762 }
763 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
764}
765
766//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
767func closeConsumers(consumers []interface{}) error {
768 var err error
769 for _, consumer := range consumers {
770 // Is it a partition consumers?
771 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
772 if errTemp := partionConsumer.Close(); errTemp != nil {
773 log.Debugw("partition!!!", log.Fields{"err": errTemp})
774 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
775 // This can occur on race condition
776 err = nil
777 } else {
778 err = errTemp
779 }
780 }
781 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
782 if errTemp := groupConsumer.Close(); errTemp != nil {
783 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
784 // This can occur on race condition
785 err = nil
786 } else {
787 err = errTemp
788 }
789 }
790 }
791 }
792 return err
793}
794
795func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
796 sc.lockTopicToConsumerChannelMap.Lock()
797 defer sc.lockTopicToConsumerChannelMap.Unlock()
798 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
799 // Channel will be closed in the removeChannel method
800 consumerCh.channels = removeChannel(consumerCh.channels, ch)
801 // If there are no more channels then we can close the consumers itself
802 if len(consumerCh.channels) == 0 {
803 log.Debugw("closing-consumers", log.Fields{"topic": topic})
804 err := closeConsumers(consumerCh.consumers)
805 //err := consumerCh.consumers.Close()
806 delete(sc.topicToConsumerChannelMap, topic.Name)
807 return err
808 }
809 return nil
810 }
811 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
812 return errors.New("topic-does-not-exist")
813}
814
815func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
816 sc.lockTopicToConsumerChannelMap.Lock()
817 defer sc.lockTopicToConsumerChannelMap.Unlock()
818 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
819 for _, ch := range consumerCh.channels {
820 // Channel will be closed in the removeChannel method
821 removeChannel(consumerCh.channels, ch)
822 }
823 err := closeConsumers(consumerCh.consumers)
824 //if err == sarama.ErrUnknownTopicOrPartition {
825 // // Not an error
826 // err = nil
827 //}
828 //err := consumerCh.consumers.Close()
829 delete(sc.topicToConsumerChannelMap, topic.Name)
830 return err
831 }
832 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
833 return nil
834}
835
836func (sc *SaramaClient) clearConsumerChannelMap() error {
837 sc.lockTopicToConsumerChannelMap.Lock()
838 defer sc.lockTopicToConsumerChannelMap.Unlock()
839 var err error
840 for topic, consumerCh := range sc.topicToConsumerChannelMap {
841 for _, ch := range consumerCh.channels {
842 // Channel will be closed in the removeChannel method
843 removeChannel(consumerCh.channels, ch)
844 }
845 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
846 err = errTemp
847 }
848 //err = consumerCh.consumers.Close()
849 delete(sc.topicToConsumerChannelMap, topic)
850 }
851 return err
852}
853
854//createPublisher creates the publisher which is used to send a message onto kafka
855func (sc *SaramaClient) createPublisher() error {
856 // This Creates the publisher
857 config := sarama.NewConfig()
858 config.Producer.Partitioner = sarama.NewRandomPartitioner
859 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
860 config.Producer.Flush.Messages = sc.producerFlushMessages
861 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
862 config.Producer.Return.Errors = sc.producerReturnErrors
863 config.Producer.Return.Successes = sc.producerReturnSuccess
864 //config.Producer.RequiredAcks = sarama.WaitForAll
865 config.Producer.RequiredAcks = sarama.WaitForLocal
866
867 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
868 brokers := []string{kafkaFullAddr}
869
870 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
871 log.Errorw("error-starting-publisher", log.Fields{"error": err})
872 return err
873 } else {
874 sc.producer = producer
875 }
876 log.Info("Kafka-publisher-created")
877 return nil
878}
879
880func (sc *SaramaClient) createConsumer() error {
881 config := sarama.NewConfig()
882 config.Consumer.Return.Errors = true
883 config.Consumer.Fetch.Min = 1
884 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
885 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
886 config.Consumer.Offsets.Initial = sarama.OffsetNewest
887 config.Metadata.Retry.Max = sc.metadataMaxRetry
888 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
889 brokers := []string{kafkaFullAddr}
890
891 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
892 log.Errorw("error-starting-consumers", log.Fields{"error": err})
893 return err
894 } else {
895 sc.consumer = consumer
896 }
897 log.Info("Kafka-consumers-created")
898 return nil
899}
900
901// createGroupConsumer creates a consumers group
902func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
903 config := scc.NewConfig()
904 config.ClientID = uuid.New().String()
905 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700906 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
907 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700908 //config.Group.Return.Notifications = false
909 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
910 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
911 config.Consumer.Offsets.Initial = initialOffset
912 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
913 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
914 brokers := []string{kafkaFullAddr}
915
916 topics := []string{topic.Name}
917 var consumer *scc.Consumer
918 var err error
919
920 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
921 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
922 return nil, err
923 }
924 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
925
926 //sc.groupConsumers[topic.Name] = consumer
927 sc.addToGroupConsumers(topic.Name, consumer)
928 return consumer, nil
929}
930
931// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
932// topic via the unique channel each subscriber received during subscription
933func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
934 // Need to go over all channels and publish messages to them - do we need to copy msg?
935 sc.lockTopicToConsumerChannelMap.RLock()
936 defer sc.lockTopicToConsumerChannelMap.RUnlock()
937 for _, ch := range consumerCh.channels {
938 go func(c chan *ic.InterContainerMessage) {
939 c <- protoMessage
940 }(ch)
941 }
942}
943
944func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
945 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
946startloop:
947 for {
948 select {
949 case err, ok := <-consumer.Errors():
950 if ok {
cbabud4978652019-12-04 08:04:21 +0100951 if sc.isLivenessError(err) {
952 sc.updateLiveness(false)
953 log.Warnw("partition-consumers-error", log.Fields{"error": err})
954 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700955 } else {
956 // Channel is closed
957 break startloop
958 }
959 case msg, ok := <-consumer.Messages():
960 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
961 if !ok {
962 // channel is closed
963 break startloop
964 }
965 msgBody := msg.Value
cbabud4978652019-12-04 08:04:21 +0100966 sc.updateLiveness(true)
967 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700968 icm := &ic.InterContainerMessage{}
969 if err := proto.Unmarshal(msgBody, icm); err != nil {
970 log.Warnw("partition-invalid-message", log.Fields{"error": err})
971 continue
972 }
973 go sc.dispatchToConsumers(consumerChnls, icm)
974 case <-sc.doneCh:
975 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
976 break startloop
977 }
978 }
979 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -0800980 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -0700981}
982
983func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
984 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
985
986startloop:
987 for {
988 select {
989 case err, ok := <-consumer.Errors():
990 if ok {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800991 if sc.isLivenessError(err) {
992 sc.updateLiveness(false)
993 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700994 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
995 } else {
Scott Baker104b67d2019-10-29 15:56:27 -0700996 log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700997 // channel is closed
998 break startloop
999 }
1000 case msg, ok := <-consumer.Messages():
1001 if !ok {
Scott Baker104b67d2019-10-29 15:56:27 -07001002 log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001003 // Channel closed
1004 break startloop
1005 }
Scott Baker104b67d2019-10-29 15:56:27 -07001006 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -07001007 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
1008 msgBody := msg.Value
1009 icm := &ic.InterContainerMessage{}
1010 if err := proto.Unmarshal(msgBody, icm); err != nil {
1011 log.Warnw("invalid-message", log.Fields{"error": err})
1012 continue
1013 }
1014 go sc.dispatchToConsumers(consumerChnls, icm)
1015 consumer.MarkOffset(msg, "")
1016 case ntf := <-consumer.Notifications():
1017 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
1018 case <-sc.doneCh:
1019 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
1020 break startloop
1021 }
1022 }
1023 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -08001024 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -07001025}
1026
1027func (sc *SaramaClient) startConsumers(topic *Topic) error {
1028 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
1029 var consumerCh *consumerChannels
1030 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
1031 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
1032 return errors.New("consumers-not-exist")
1033 }
1034 // For each consumer listening for that topic, start a consumption loop
1035 for _, consumer := range consumerCh.consumers {
1036 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1037 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1038 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1039 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1040 } else {
1041 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
1042 return errors.New("invalid-consumer")
1043 }
1044 }
1045 return nil
1046}
1047
1048//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1049//// for that topic. It also starts the routine that listens for messages on that topic.
1050func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1051 var pConsumers []sarama.PartitionConsumer
1052 var err error
1053
1054 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
1055 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1056 return nil, err
1057 }
1058
1059 consumersIf := make([]interface{}, 0)
1060 for _, pConsumer := range pConsumers {
1061 consumersIf = append(consumersIf, pConsumer)
1062 }
1063
1064 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1065 // unbuffered to verify race conditions.
1066 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1067 cc := &consumerChannels{
1068 consumers: consumersIf,
1069 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1070 }
1071
1072 // Add the consumers channel to the map
1073 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1074
1075 //Start a consumers to listen on that specific topic
1076 go sc.startConsumers(topic)
1077
1078 return consumerListeningChannel, nil
1079}
1080
1081// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1082// for that topic. It also starts the routine that listens for messages on that topic.
1083func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1084 // TODO: Replace this development partition consumers with a group consumers
1085 var pConsumer *scc.Consumer
1086 var err error
1087 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1088 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1089 return nil, err
1090 }
1091 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1092 // unbuffered to verify race conditions.
1093 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1094 cc := &consumerChannels{
1095 consumers: []interface{}{pConsumer},
1096 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1097 }
1098
1099 // Add the consumers channel to the map
1100 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1101
1102 //Start a consumers to listen on that specific topic
1103 go sc.startConsumers(topic)
1104
1105 return consumerListeningChannel, nil
1106}
1107
1108func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1109 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
1110 partitionList, err := sc.consumer.Partitions(topic.Name)
1111 if err != nil {
1112 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1113 return nil, err
1114 }
1115
1116 pConsumers := make([]sarama.PartitionConsumer, 0)
1117 for _, partition := range partitionList {
1118 var pConsumer sarama.PartitionConsumer
1119 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1120 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1121 return nil, err
1122 }
1123 pConsumers = append(pConsumers, pConsumer)
1124 }
1125 return pConsumers, nil
1126}
1127
1128func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1129 var i int
1130 var channel chan *ic.InterContainerMessage
1131 for i, channel = range channels {
1132 if channel == ch {
1133 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1134 close(channel)
1135 log.Debug("channel-closed")
1136 return channels[:len(channels)-1]
1137 }
1138 }
1139 return channels
1140}
1141
1142func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1143 sc.lockOfGroupConsumers.Lock()
1144 defer sc.lockOfGroupConsumers.Unlock()
1145 if _, exist := sc.groupConsumers[topic]; !exist {
1146 sc.groupConsumers[topic] = consumer
1147 }
1148}
1149
1150func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1151 sc.lockOfGroupConsumers.Lock()
1152 defer sc.lockOfGroupConsumers.Unlock()
1153 if _, exist := sc.groupConsumers[topic]; exist {
1154 consumer := sc.groupConsumers[topic]
1155 delete(sc.groupConsumers, topic)
1156 if err := consumer.Close(); err != nil {
1157 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
1158 return err
1159 }
1160 }
1161 return nil
1162}