blob: ca88dfd9fe03738ba96ae474bdb68ecf2a842163 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Pauldd23a992019-11-14 07:06:31 +000019 "context"
William Kurkianea869482019-04-09 15:16:11 -040020 "errors"
21 "fmt"
kdarapub26b4502019-10-05 03:02:33 +053022 "github.com/Shopify/sarama"
William Kurkianea869482019-04-09 15:16:11 -040023 scc "github.com/bsm/sarama-cluster"
Devmalya Pauldd23a992019-11-14 07:06:31 +000024 "github.com/eapache/go-resiliency/breaker"
William Kurkianea869482019-04-09 15:16:11 -040025 "github.com/golang/protobuf/proto"
26 "github.com/google/uuid"
Scott Baker51290152019-10-24 14:23:20 -070027 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Bakerc6e54cb2019-11-04 09:31:25 -080028 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
kdarapub26b4502019-10-05 03:02:33 +053029 "strings"
30 "sync"
31 "time"
William Kurkianea869482019-04-09 15:16:11 -040032)
33
34func init() {
35 log.AddPackage(log.JSON, log.DebugLevel, nil)
36}
37
38type returnErrorFunction func() error
39
40// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
41// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
42//consumer or a group consumer
43type consumerChannels struct {
44 consumers []interface{}
45 channels []chan *ic.InterContainerMessage
46}
47
48// SaramaClient represents the messaging proxy
49type SaramaClient struct {
50 cAdmin sarama.ClusterAdmin
51 client sarama.Client
52 KafkaHost string
53 KafkaPort int
54 producer sarama.AsyncProducer
55 consumer sarama.Consumer
56 groupConsumers map[string]*scc.Consumer
Matt Jeanneret384d8c92019-05-06 14:27:31 -040057 lockOfGroupConsumers sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040058 consumerGroupPrefix string
59 consumerType int
60 consumerGroupName string
61 producerFlushFrequency int
62 producerFlushMessages int
63 producerFlushMaxmessages int
64 producerRetryMax int
65 producerRetryBackOff time.Duration
66 producerReturnSuccess bool
67 producerReturnErrors bool
68 consumerMaxwait int
69 maxProcessingTime int
70 numPartitions int
71 numReplicas int
72 autoCreateTopic bool
73 doneCh chan int
74 topicToConsumerChannelMap map[string]*consumerChannels
75 lockTopicToConsumerChannelMap sync.RWMutex
76 topicLockMap map[string]*sync.RWMutex
77 lockOfTopicLockMap sync.RWMutex
Mahir Gunyele77977b2019-06-27 05:36:22 -070078 metadataMaxRetry int
cbabu95f21522019-11-13 14:25:18 +010079 alive bool
80 liveness chan bool
81 livenessChannelInterval time.Duration
82 lastLivenessTime time.Time
83 started bool
Scott Baker86fce9a2019-12-12 09:47:17 -080084 healthy bool
85 healthiness chan bool
William Kurkianea869482019-04-09 15:16:11 -040086}
87
88type SaramaClientOption func(*SaramaClient)
89
90func Host(host string) SaramaClientOption {
91 return func(args *SaramaClient) {
92 args.KafkaHost = host
93 }
94}
95
96func Port(port int) SaramaClientOption {
97 return func(args *SaramaClient) {
98 args.KafkaPort = port
99 }
100}
101
102func ConsumerGroupPrefix(prefix string) SaramaClientOption {
103 return func(args *SaramaClient) {
104 args.consumerGroupPrefix = prefix
105 }
106}
107
108func ConsumerGroupName(name string) SaramaClientOption {
109 return func(args *SaramaClient) {
110 args.consumerGroupName = name
111 }
112}
113
114func ConsumerType(consumer int) SaramaClientOption {
115 return func(args *SaramaClient) {
116 args.consumerType = consumer
117 }
118}
119
120func ProducerFlushFrequency(frequency int) SaramaClientOption {
121 return func(args *SaramaClient) {
122 args.producerFlushFrequency = frequency
123 }
124}
125
126func ProducerFlushMessages(num int) SaramaClientOption {
127 return func(args *SaramaClient) {
128 args.producerFlushMessages = num
129 }
130}
131
132func ProducerFlushMaxMessages(num int) SaramaClientOption {
133 return func(args *SaramaClient) {
134 args.producerFlushMaxmessages = num
135 }
136}
137
138func ProducerMaxRetries(num int) SaramaClientOption {
139 return func(args *SaramaClient) {
140 args.producerRetryMax = num
141 }
142}
143
144func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
145 return func(args *SaramaClient) {
146 args.producerRetryBackOff = duration
147 }
148}
149
150func ProducerReturnOnErrors(opt bool) SaramaClientOption {
151 return func(args *SaramaClient) {
152 args.producerReturnErrors = opt
153 }
154}
155
156func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
157 return func(args *SaramaClient) {
158 args.producerReturnSuccess = opt
159 }
160}
161
162func ConsumerMaxWait(wait int) SaramaClientOption {
163 return func(args *SaramaClient) {
164 args.consumerMaxwait = wait
165 }
166}
167
168func MaxProcessingTime(pTime int) SaramaClientOption {
169 return func(args *SaramaClient) {
170 args.maxProcessingTime = pTime
171 }
172}
173
174func NumPartitions(number int) SaramaClientOption {
175 return func(args *SaramaClient) {
176 args.numPartitions = number
177 }
178}
179
180func NumReplicas(number int) SaramaClientOption {
181 return func(args *SaramaClient) {
182 args.numReplicas = number
183 }
184}
185
186func AutoCreateTopic(opt bool) SaramaClientOption {
187 return func(args *SaramaClient) {
188 args.autoCreateTopic = opt
189 }
190}
191
Mahir Gunyele77977b2019-06-27 05:36:22 -0700192func MetadatMaxRetries(retry int) SaramaClientOption {
193 return func(args *SaramaClient) {
194 args.metadataMaxRetry = retry
195 }
196}
197
cbabu95f21522019-11-13 14:25:18 +0100198func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
199 return func(args *SaramaClient) {
200 args.livenessChannelInterval = opt
201 }
202}
203
William Kurkianea869482019-04-09 15:16:11 -0400204func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
205 client := &SaramaClient{
206 KafkaHost: DefaultKafkaHost,
207 KafkaPort: DefaultKafkaPort,
208 }
209 client.consumerType = DefaultConsumerType
210 client.producerFlushFrequency = DefaultProducerFlushFrequency
211 client.producerFlushMessages = DefaultProducerFlushMessages
212 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
213 client.producerReturnErrors = DefaultProducerReturnErrors
214 client.producerReturnSuccess = DefaultProducerReturnSuccess
215 client.producerRetryMax = DefaultProducerRetryMax
216 client.producerRetryBackOff = DefaultProducerRetryBackoff
217 client.consumerMaxwait = DefaultConsumerMaxwait
218 client.maxProcessingTime = DefaultMaxProcessingTime
219 client.numPartitions = DefaultNumberPartitions
220 client.numReplicas = DefaultNumberReplicas
221 client.autoCreateTopic = DefaultAutoCreateTopic
Mahir Gunyele77977b2019-06-27 05:36:22 -0700222 client.metadataMaxRetry = DefaultMetadataMaxRetry
cbabu95f21522019-11-13 14:25:18 +0100223 client.livenessChannelInterval = DefaultLivenessChannelInterval
William Kurkianea869482019-04-09 15:16:11 -0400224
225 for _, option := range opts {
226 option(client)
227 }
228
229 client.groupConsumers = make(map[string]*scc.Consumer)
230
231 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
232 client.topicLockMap = make(map[string]*sync.RWMutex)
233 client.lockOfTopicLockMap = sync.RWMutex{}
234 client.lockOfGroupConsumers = sync.RWMutex{}
cbabu95f21522019-11-13 14:25:18 +0100235
Scott Baker86fce9a2019-12-12 09:47:17 -0800236 // healthy and alive until proven otherwise
cbabu95f21522019-11-13 14:25:18 +0100237 client.alive = true
Scott Baker86fce9a2019-12-12 09:47:17 -0800238 client.healthy = true
cbabu95f21522019-11-13 14:25:18 +0100239
William Kurkianea869482019-04-09 15:16:11 -0400240 return client
241}
242
243func (sc *SaramaClient) Start() error {
244 log.Info("Starting-kafka-sarama-client")
245
246 // Create the Done channel
247 sc.doneCh = make(chan int, 1)
248
249 var err error
250
Devmalya Paul495b94a2019-08-27 19:42:00 -0400251 // Add a cleanup in case of failure to startup
252 defer func() {
253 if err != nil {
254 sc.Stop()
255 }
256 }()
257
William Kurkianea869482019-04-09 15:16:11 -0400258 // Create the Cluster Admin
259 if err = sc.createClusterAdmin(); err != nil {
260 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
261 return err
262 }
263
264 // Create the Publisher
265 if err := sc.createPublisher(); err != nil {
266 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
267 return err
268 }
269
270 if sc.consumerType == DefaultConsumerType {
271 // Create the master consumers
272 if err := sc.createConsumer(); err != nil {
273 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
274 return err
275 }
276 }
277
278 // Create the topic to consumers/channel map
279 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
280
281 log.Info("kafka-sarama-client-started")
282
cbabu95f21522019-11-13 14:25:18 +0100283 sc.started = true
284
William Kurkianea869482019-04-09 15:16:11 -0400285 return nil
286}
287
288func (sc *SaramaClient) Stop() {
289 log.Info("stopping-sarama-client")
290
cbabu95f21522019-11-13 14:25:18 +0100291 sc.started = false
292
William Kurkianea869482019-04-09 15:16:11 -0400293 //Send a message over the done channel to close all long running routines
294 sc.doneCh <- 1
295
296 if sc.producer != nil {
297 if err := sc.producer.Close(); err != nil {
298 log.Errorw("closing-producer-failed", log.Fields{"error": err})
299 }
300 }
301
302 if sc.consumer != nil {
303 if err := sc.consumer.Close(); err != nil {
304 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
305 }
306 }
307
308 for key, val := range sc.groupConsumers {
309 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
310 if err := val.Close(); err != nil {
311 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
312 }
313 }
314
315 if sc.cAdmin != nil {
316 if err := sc.cAdmin.Close(); err != nil {
317 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
318 }
319 }
320
321 //TODO: Clear the consumers map
322 //sc.clearConsumerChannelMap()
323
324 log.Info("sarama-client-stopped")
325}
326
327//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
328// the invoking function must hold the lock
329func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
330 // Set the topic details
331 topicDetail := &sarama.TopicDetail{}
332 topicDetail.NumPartitions = int32(numPartition)
333 topicDetail.ReplicationFactor = int16(repFactor)
334 topicDetail.ConfigEntries = make(map[string]*string)
335 topicDetails := make(map[string]*sarama.TopicDetail)
336 topicDetails[topic.Name] = topicDetail
337
338 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
339 if err == sarama.ErrTopicAlreadyExists {
340 // Not an error
341 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
342 return nil
343 }
344 log.Errorw("create-topic-failure", log.Fields{"error": err})
345 return err
346 }
347 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
348 // do so.
349 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
350 return nil
351}
352
353//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
354// ensure no two go routines are performing operations on the same topic
355func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
356 sc.lockTopic(topic)
357 defer sc.unLockTopic(topic)
358
359 return sc.createTopic(topic, numPartition, repFactor)
360}
361
362//DeleteTopic removes a topic from the kafka Broker
363func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
364 sc.lockTopic(topic)
365 defer sc.unLockTopic(topic)
366
367 // Remove the topic from the broker
368 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
369 if err == sarama.ErrUnknownTopicOrPartition {
370 // Not an error as does not exist
371 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
372 return nil
373 }
374 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
375 return err
376 }
377
378 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
379 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
380 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
381 return err
382 }
383 return nil
384}
385
386// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
387// messages from that topic
388func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
389 sc.lockTopic(topic)
390 defer sc.unLockTopic(topic)
391
392 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
393
394 // If a consumers already exist for that topic then resuse it
395 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
396 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
397 // Create a channel specific for that consumers and add it to the consumers channel map
398 ch := make(chan *ic.InterContainerMessage)
399 sc.addChannelToConsumerChannelMap(topic, ch)
400 return ch, nil
401 }
402
403 // Register for the topic and set it up
404 var consumerListeningChannel chan *ic.InterContainerMessage
405 var err error
406
407 // Use the consumerType option to figure out the type of consumer to launch
408 if sc.consumerType == PartitionConsumer {
409 if sc.autoCreateTopic {
410 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
411 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
412 return nil, err
413 }
414 }
415 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
416 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
417 return nil, err
418 }
419 } else if sc.consumerType == GroupCustomer {
420 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
421 // does not consume from a precreated topic in some scenarios
422 //if sc.autoCreateTopic {
423 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
424 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
425 // return nil, err
426 // }
427 //}
428 //groupId := sc.consumerGroupName
429 groupId := getGroupId(kvArgs...)
430 // Include the group prefix
431 if groupId != "" {
432 groupId = sc.consumerGroupPrefix + groupId
433 } else {
434 // Need to use a unique group Id per topic
435 groupId = sc.consumerGroupPrefix + topic.Name
436 }
437 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
438 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
439 return nil, err
440 }
441
442 } else {
443 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
444 return nil, errors.New("unknown-consumer-type")
445 }
446
447 return consumerListeningChannel, nil
448}
449
450//UnSubscribe unsubscribe a consumer from a given topic
451func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
452 sc.lockTopic(topic)
453 defer sc.unLockTopic(topic)
454
455 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
456 var err error
457 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
458 log.Errorw("failed-removing-channel", log.Fields{"error": err})
459 }
460 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
461 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
462 }
463 return err
464}
465
cbabu95f21522019-11-13 14:25:18 +0100466func (sc *SaramaClient) updateLiveness(alive bool) {
467 // Post a consistent stream of liveness data to the channel,
468 // so that in a live state, the core does not timeout and
469 // send a forced liveness message. Production of liveness
470 // events to the channel is rate-limited by livenessChannelInterval.
471 if sc.liveness != nil {
472 if sc.alive != alive {
473 log.Info("update-liveness-channel-because-change")
474 sc.liveness <- alive
475 sc.lastLivenessTime = time.Now()
476 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
477 log.Info("update-liveness-channel-because-interval")
478 sc.liveness <- alive
479 sc.lastLivenessTime = time.Now()
480 }
481 }
482
483 // Only emit a log message when the state changes
484 if sc.alive != alive {
485 log.Info("set-client-alive", log.Fields{"alive": alive})
486 sc.alive = alive
487 }
488}
489
Scott Baker86fce9a2019-12-12 09:47:17 -0800490// Once unhealthy, we never go back
491func (sc *SaramaClient) setUnhealthy() {
492 sc.healthy = false
493 if sc.healthiness != nil {
494 log.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
495 sc.healthiness <- sc.healthy
496 }
497}
498
Devmalya Pauldd23a992019-11-14 07:06:31 +0000499func (sc *SaramaClient) isLivenessError(err error) bool {
500 // Sarama producers and consumers encapsulate the error inside
501 // a ProducerError or ConsumerError struct.
502 if prodError, ok := err.(*sarama.ProducerError); ok {
503 err = prodError.Err
504 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
505 err = consumerError.Err
506 }
507
508 // Sarama-Cluster will compose the error into a ClusterError struct,
509 // which we can't do a compare by reference. To handle that, we the
510 // best we can do is compare the error strings.
511
512 switch err.Error() {
513 case context.DeadlineExceeded.Error():
514 log.Info("is-liveness-error-timeout")
515 return true
516 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
517 log.Info("is-liveness-error-no-brokers")
518 return true
519 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
520 log.Info("is-liveness-error-shutting-down")
521 return true
522 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
523 log.Info("is-liveness-error-not-available")
524 return true
525 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
526 log.Info("is-liveness-error-circuit-breaker-open")
527 return true
528 }
529
530 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
531 log.Info("is-liveness-error-connection-refused")
532 return true
533 }
534
535 // Other errors shouldn't trigger a loss of liveness
536
537 log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
538
539 return false
540}
541
William Kurkianea869482019-04-09 15:16:11 -0400542// send formats and sends the request onto the kafka messaging bus.
543func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
544
545 // Assert message is a proto message
546 var protoMsg proto.Message
547 var ok bool
548 // ascertain the value interface type is a proto.Message
549 if protoMsg, ok = msg.(proto.Message); !ok {
550 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
551 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
552 }
553
554 var marshalled []byte
555 var err error
556 // Create the Sarama producer message
557 if marshalled, err = proto.Marshal(protoMsg); err != nil {
558 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
559 return err
560 }
561 key := ""
562 if len(keys) > 0 {
563 key = keys[0] // Only the first key is relevant
564 }
565 kafkaMsg := &sarama.ProducerMessage{
566 Topic: topic.Name,
567 Key: sarama.StringEncoder(key),
568 Value: sarama.ByteEncoder(marshalled),
569 }
570
571 // Send message to kafka
572 sc.producer.Input() <- kafkaMsg
William Kurkianea869482019-04-09 15:16:11 -0400573 // Wait for result
574 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
575 select {
576 case ok := <-sc.producer.Successes():
577 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
cbabu95f21522019-11-13 14:25:18 +0100578 sc.updateLiveness(true)
William Kurkianea869482019-04-09 15:16:11 -0400579 case notOk := <-sc.producer.Errors():
580 log.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000581 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100582 sc.updateLiveness(false)
583 }
584 return notOk
585 }
586 return nil
587}
588
589// Enable the liveness monitor channel. This channel will report
590// a "true" or "false" on every publish, which indicates whether
591// or not the channel is still live. This channel is then picked up
592// by the service (i.e. rw_core / ro_core) to update readiness status
593// and/or take other actions.
594func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
595 log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
596 if enable {
597 if sc.liveness == nil {
598 log.Info("kafka-create-liveness-channel")
599 // At least 1, so we can immediately post to it without blocking
600 // Setting a bigger number (10) allows the monitor to fall behind
601 // without blocking others. The monitor shouldn't really fall
602 // behind...
603 sc.liveness = make(chan bool, 10)
604 // post intial state to the channel
605 sc.liveness <- sc.alive
606 }
607 } else {
608 // TODO: Think about whether we need the ability to turn off
609 // liveness monitoring
610 panic("Turning off liveness reporting is not supported")
611 }
612 return sc.liveness
613}
614
Scott Baker86fce9a2019-12-12 09:47:17 -0800615// Enable the Healthiness monitor channel. This channel will report "false"
616// if the kafka consumers die, or some other problem occurs which is
617// catastrophic that would require re-creating the client.
618func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
619 log.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
620 if enable {
621 if sc.healthiness == nil {
622 log.Info("kafka-create-healthiness-channel")
623 // At least 1, so we can immediately post to it without blocking
624 // Setting a bigger number (10) allows the monitor to fall behind
625 // without blocking others. The monitor shouldn't really fall
626 // behind...
627 sc.healthiness = make(chan bool, 10)
628 // post intial state to the channel
629 sc.healthiness <- sc.healthy
630 }
631 } else {
632 // TODO: Think about whether we need the ability to turn off
633 // liveness monitoring
634 panic("Turning off healthiness reporting is not supported")
635 }
636 return sc.healthiness
637}
638
cbabu95f21522019-11-13 14:25:18 +0100639// send an empty message on the liveness channel to check whether connectivity has
640// been restored.
641func (sc *SaramaClient) SendLiveness() error {
642 if !sc.started {
643 return fmt.Errorf("SendLiveness() called while not started")
644 }
645
646 kafkaMsg := &sarama.ProducerMessage{
647 Topic: "_liveness_test",
648 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
649 }
650
651 // Send message to kafka
652 sc.producer.Input() <- kafkaMsg
653 // Wait for result
654 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
655 select {
656 case ok := <-sc.producer.Successes():
657 log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
658 sc.updateLiveness(true)
659 case notOk := <-sc.producer.Errors():
660 log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000661 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100662 sc.updateLiveness(false)
663 }
William Kurkianea869482019-04-09 15:16:11 -0400664 return notOk
665 }
666 return nil
667}
668
669// getGroupId returns the group id from the key-value args.
670func getGroupId(kvArgs ...*KVArg) string {
671 for _, arg := range kvArgs {
672 if arg.Key == GroupIdKey {
673 return arg.Value.(string)
674 }
675 }
676 return ""
677}
678
679// getOffset returns the offset from the key-value args.
680func getOffset(kvArgs ...*KVArg) int64 {
681 for _, arg := range kvArgs {
682 if arg.Key == Offset {
683 return arg.Value.(int64)
684 }
685 }
686 return sarama.OffsetNewest
687}
688
689func (sc *SaramaClient) createClusterAdmin() error {
690 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
691 config := sarama.NewConfig()
692 config.Version = sarama.V1_0_0_0
693
694 // Create a cluster Admin
695 var cAdmin sarama.ClusterAdmin
696 var err error
697 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
698 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
699 return err
700 }
701 sc.cAdmin = cAdmin
702 return nil
703}
704
705func (sc *SaramaClient) lockTopic(topic *Topic) {
706 sc.lockOfTopicLockMap.Lock()
707 if _, exist := sc.topicLockMap[topic.Name]; exist {
708 sc.lockOfTopicLockMap.Unlock()
709 sc.topicLockMap[topic.Name].Lock()
710 } else {
711 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
712 sc.lockOfTopicLockMap.Unlock()
713 sc.topicLockMap[topic.Name].Lock()
714 }
715}
716
717func (sc *SaramaClient) unLockTopic(topic *Topic) {
718 sc.lockOfTopicLockMap.Lock()
719 defer sc.lockOfTopicLockMap.Unlock()
720 if _, exist := sc.topicLockMap[topic.Name]; exist {
721 sc.topicLockMap[topic.Name].Unlock()
722 }
723}
724
725func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
726 sc.lockTopicToConsumerChannelMap.Lock()
727 defer sc.lockTopicToConsumerChannelMap.Unlock()
728 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
729 sc.topicToConsumerChannelMap[id] = arg
730 }
731}
732
733func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
734 sc.lockTopicToConsumerChannelMap.Lock()
735 defer sc.lockTopicToConsumerChannelMap.Unlock()
736 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
737 delete(sc.topicToConsumerChannelMap, id)
738 }
739}
740
741func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
742 sc.lockTopicToConsumerChannelMap.RLock()
743 defer sc.lockTopicToConsumerChannelMap.RUnlock()
744
745 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
746 return consumerCh
747 }
748 return nil
749}
750
751func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
752 sc.lockTopicToConsumerChannelMap.Lock()
753 defer sc.lockTopicToConsumerChannelMap.Unlock()
754 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
755 consumerCh.channels = append(consumerCh.channels, ch)
756 return
757 }
758 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
759}
760
761//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
762func closeConsumers(consumers []interface{}) error {
763 var err error
764 for _, consumer := range consumers {
765 // Is it a partition consumers?
766 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
767 if errTemp := partionConsumer.Close(); errTemp != nil {
768 log.Debugw("partition!!!", log.Fields{"err": errTemp})
769 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
770 // This can occur on race condition
771 err = nil
772 } else {
773 err = errTemp
774 }
775 }
776 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
777 if errTemp := groupConsumer.Close(); errTemp != nil {
778 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
779 // This can occur on race condition
780 err = nil
781 } else {
782 err = errTemp
783 }
784 }
785 }
786 }
787 return err
788}
789
790func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
791 sc.lockTopicToConsumerChannelMap.Lock()
792 defer sc.lockTopicToConsumerChannelMap.Unlock()
793 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
794 // Channel will be closed in the removeChannel method
795 consumerCh.channels = removeChannel(consumerCh.channels, ch)
796 // If there are no more channels then we can close the consumers itself
797 if len(consumerCh.channels) == 0 {
798 log.Debugw("closing-consumers", log.Fields{"topic": topic})
799 err := closeConsumers(consumerCh.consumers)
800 //err := consumerCh.consumers.Close()
801 delete(sc.topicToConsumerChannelMap, topic.Name)
802 return err
803 }
804 return nil
805 }
806 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
807 return errors.New("topic-does-not-exist")
808}
809
810func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
811 sc.lockTopicToConsumerChannelMap.Lock()
812 defer sc.lockTopicToConsumerChannelMap.Unlock()
813 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
814 for _, ch := range consumerCh.channels {
815 // Channel will be closed in the removeChannel method
816 removeChannel(consumerCh.channels, ch)
817 }
818 err := closeConsumers(consumerCh.consumers)
819 //if err == sarama.ErrUnknownTopicOrPartition {
820 // // Not an error
821 // err = nil
822 //}
823 //err := consumerCh.consumers.Close()
824 delete(sc.topicToConsumerChannelMap, topic.Name)
825 return err
826 }
827 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
828 return nil
829}
830
831func (sc *SaramaClient) clearConsumerChannelMap() error {
832 sc.lockTopicToConsumerChannelMap.Lock()
833 defer sc.lockTopicToConsumerChannelMap.Unlock()
834 var err error
835 for topic, consumerCh := range sc.topicToConsumerChannelMap {
836 for _, ch := range consumerCh.channels {
837 // Channel will be closed in the removeChannel method
838 removeChannel(consumerCh.channels, ch)
839 }
840 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
841 err = errTemp
842 }
843 //err = consumerCh.consumers.Close()
844 delete(sc.topicToConsumerChannelMap, topic)
845 }
846 return err
847}
848
849//createPublisher creates the publisher which is used to send a message onto kafka
850func (sc *SaramaClient) createPublisher() error {
851 // This Creates the publisher
852 config := sarama.NewConfig()
853 config.Producer.Partitioner = sarama.NewRandomPartitioner
854 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
855 config.Producer.Flush.Messages = sc.producerFlushMessages
856 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
857 config.Producer.Return.Errors = sc.producerReturnErrors
858 config.Producer.Return.Successes = sc.producerReturnSuccess
859 //config.Producer.RequiredAcks = sarama.WaitForAll
860 config.Producer.RequiredAcks = sarama.WaitForLocal
861
862 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
863 brokers := []string{kafkaFullAddr}
864
865 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
866 log.Errorw("error-starting-publisher", log.Fields{"error": err})
867 return err
868 } else {
869 sc.producer = producer
870 }
871 log.Info("Kafka-publisher-created")
872 return nil
873}
874
875func (sc *SaramaClient) createConsumer() error {
876 config := sarama.NewConfig()
877 config.Consumer.Return.Errors = true
878 config.Consumer.Fetch.Min = 1
879 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
880 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
881 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Mahir Gunyele77977b2019-06-27 05:36:22 -0700882 config.Metadata.Retry.Max = sc.metadataMaxRetry
William Kurkianea869482019-04-09 15:16:11 -0400883 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
884 brokers := []string{kafkaFullAddr}
885
886 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
887 log.Errorw("error-starting-consumers", log.Fields{"error": err})
888 return err
889 } else {
890 sc.consumer = consumer
891 }
892 log.Info("Kafka-consumers-created")
893 return nil
894}
895
896// createGroupConsumer creates a consumers group
897func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
898 config := scc.NewConfig()
899 config.ClientID = uuid.New().String()
900 config.Group.Mode = scc.ConsumerModeMultiplex
cbabu95f21522019-11-13 14:25:18 +0100901 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
902 config.Consumer.Return.Errors = true
William Kurkianea869482019-04-09 15:16:11 -0400903 //config.Group.Return.Notifications = false
904 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
905 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
906 config.Consumer.Offsets.Initial = initialOffset
907 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
908 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
909 brokers := []string{kafkaFullAddr}
910
911 topics := []string{topic.Name}
912 var consumer *scc.Consumer
913 var err error
914
915 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
916 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
917 return nil, err
918 }
919 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
920
921 //sc.groupConsumers[topic.Name] = consumer
922 sc.addToGroupConsumers(topic.Name, consumer)
923 return consumer, nil
924}
925
926// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
927// topic via the unique channel each subscriber received during subscription
928func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
929 // Need to go over all channels and publish messages to them - do we need to copy msg?
930 sc.lockTopicToConsumerChannelMap.RLock()
931 defer sc.lockTopicToConsumerChannelMap.RUnlock()
932 for _, ch := range consumerCh.channels {
933 go func(c chan *ic.InterContainerMessage) {
934 c <- protoMessage
935 }(ch)
936 }
937}
938
939func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
940 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
941startloop:
942 for {
943 select {
944 case err, ok := <-consumer.Errors():
945 if ok {
cbabu116b73f2019-12-10 17:56:32 +0530946 if sc.isLivenessError(err) {
947 sc.updateLiveness(false)
948 log.Warnw("partition-consumers-error", log.Fields{"error": err})
949 }
William Kurkianea869482019-04-09 15:16:11 -0400950 } else {
951 // Channel is closed
952 break startloop
953 }
954 case msg, ok := <-consumer.Messages():
955 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
956 if !ok {
957 // channel is closed
958 break startloop
959 }
960 msgBody := msg.Value
cbabu116b73f2019-12-10 17:56:32 +0530961 sc.updateLiveness(true)
962 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400963 icm := &ic.InterContainerMessage{}
964 if err := proto.Unmarshal(msgBody, icm); err != nil {
965 log.Warnw("partition-invalid-message", log.Fields{"error": err})
966 continue
967 }
968 go sc.dispatchToConsumers(consumerChnls, icm)
969 case <-sc.doneCh:
970 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
971 break startloop
972 }
973 }
974 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -0800975 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -0400976}
977
978func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
979 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
980
981startloop:
982 for {
983 select {
984 case err, ok := <-consumer.Errors():
985 if ok {
Devmalya Pauldd23a992019-11-14 07:06:31 +0000986 if sc.isLivenessError(err) {
987 sc.updateLiveness(false)
988 }
William Kurkianea869482019-04-09 15:16:11 -0400989 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
990 } else {
cbabu95f21522019-11-13 14:25:18 +0100991 log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400992 // channel is closed
993 break startloop
994 }
995 case msg, ok := <-consumer.Messages():
996 if !ok {
cbabu95f21522019-11-13 14:25:18 +0100997 log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400998 // Channel closed
999 break startloop
1000 }
cbabu95f21522019-11-13 14:25:18 +01001001 sc.updateLiveness(true)
William Kurkianea869482019-04-09 15:16:11 -04001002 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
1003 msgBody := msg.Value
1004 icm := &ic.InterContainerMessage{}
1005 if err := proto.Unmarshal(msgBody, icm); err != nil {
1006 log.Warnw("invalid-message", log.Fields{"error": err})
1007 continue
1008 }
1009 go sc.dispatchToConsumers(consumerChnls, icm)
1010 consumer.MarkOffset(msg, "")
1011 case ntf := <-consumer.Notifications():
1012 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
1013 case <-sc.doneCh:
1014 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
1015 break startloop
1016 }
1017 }
1018 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -08001019 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -04001020}
1021
1022func (sc *SaramaClient) startConsumers(topic *Topic) error {
1023 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
1024 var consumerCh *consumerChannels
1025 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
1026 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
1027 return errors.New("consumers-not-exist")
1028 }
1029 // For each consumer listening for that topic, start a consumption loop
1030 for _, consumer := range consumerCh.consumers {
1031 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1032 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1033 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1034 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1035 } else {
1036 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
1037 return errors.New("invalid-consumer")
1038 }
1039 }
1040 return nil
1041}
1042
1043//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1044//// for that topic. It also starts the routine that listens for messages on that topic.
1045func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1046 var pConsumers []sarama.PartitionConsumer
1047 var err error
1048
1049 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
1050 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1051 return nil, err
1052 }
1053
1054 consumersIf := make([]interface{}, 0)
1055 for _, pConsumer := range pConsumers {
1056 consumersIf = append(consumersIf, pConsumer)
1057 }
1058
1059 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1060 // unbuffered to verify race conditions.
1061 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1062 cc := &consumerChannels{
1063 consumers: consumersIf,
1064 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1065 }
1066
1067 // Add the consumers channel to the map
1068 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1069
1070 //Start a consumers to listen on that specific topic
1071 go sc.startConsumers(topic)
1072
1073 return consumerListeningChannel, nil
1074}
1075
1076// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1077// for that topic. It also starts the routine that listens for messages on that topic.
1078func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1079 // TODO: Replace this development partition consumers with a group consumers
1080 var pConsumer *scc.Consumer
1081 var err error
1082 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1083 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1084 return nil, err
1085 }
1086 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1087 // unbuffered to verify race conditions.
1088 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1089 cc := &consumerChannels{
1090 consumers: []interface{}{pConsumer},
1091 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1092 }
1093
1094 // Add the consumers channel to the map
1095 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1096
1097 //Start a consumers to listen on that specific topic
1098 go sc.startConsumers(topic)
1099
1100 return consumerListeningChannel, nil
1101}
1102
1103func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1104 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
1105 partitionList, err := sc.consumer.Partitions(topic.Name)
1106 if err != nil {
1107 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1108 return nil, err
1109 }
1110
1111 pConsumers := make([]sarama.PartitionConsumer, 0)
1112 for _, partition := range partitionList {
1113 var pConsumer sarama.PartitionConsumer
1114 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1115 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1116 return nil, err
1117 }
1118 pConsumers = append(pConsumers, pConsumer)
1119 }
1120 return pConsumers, nil
1121}
1122
1123func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1124 var i int
1125 var channel chan *ic.InterContainerMessage
1126 for i, channel = range channels {
1127 if channel == ch {
1128 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1129 close(channel)
1130 log.Debug("channel-closed")
1131 return channels[:len(channels)-1]
1132 }
1133 }
1134 return channels
1135}
1136
William Kurkianea869482019-04-09 15:16:11 -04001137func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1138 sc.lockOfGroupConsumers.Lock()
1139 defer sc.lockOfGroupConsumers.Unlock()
1140 if _, exist := sc.groupConsumers[topic]; !exist {
1141 sc.groupConsumers[topic] = consumer
1142 }
1143}
1144
1145func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1146 sc.lockOfGroupConsumers.Lock()
1147 defer sc.lockOfGroupConsumers.Unlock()
1148 if _, exist := sc.groupConsumers[topic]; exist {
1149 consumer := sc.groupConsumers[topic]
1150 delete(sc.groupConsumers, topic)
Matt Jeanneret384d8c92019-05-06 14:27:31 -04001151 if err := consumer.Close(); err != nil {
William Kurkianea869482019-04-09 15:16:11 -04001152 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
1153 return err
1154 }
1155 }
1156 return nil
1157}