blob: 35ede44a0781ce374a5b031cd8dfcf72f3a021fb [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
khenaidoo43c82122018-11-22 18:38:28 -050021 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050023 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050024 "github.com/opencord/voltha-go/common/log"
khenaidoo79232702018-12-04 11:00:41 -050025 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "gopkg.in/Shopify/sarama.v1"
27 "strings"
khenaidoo43c82122018-11-22 18:38:28 -050028 "sync"
29 "time"
30)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
khenaidooca301322019-01-09 23:06:32 -050033 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050034}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050043 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050054 groupConsumers map[string]*scc.Consumer
khenaidoo7ff26c72019-01-16 14:55:48 -050055 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050056 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050057 consumerType int
khenaidooca301322019-01-09 23:06:32 -050058 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050059 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050071 doneCh chan int
72 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050074 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
khenaidoo43c82122018-11-22 18:38:28 -050076}
77
78type SaramaClientOption func(*SaramaClient)
79
80func Host(host string) SaramaClientOption {
81 return func(args *SaramaClient) {
82 args.KafkaHost = host
83 }
84}
85
86func Port(port int) SaramaClientOption {
87 return func(args *SaramaClient) {
88 args.KafkaPort = port
89 }
90}
91
khenaidooca301322019-01-09 23:06:32 -050092func ConsumerGroupPrefix(prefix string) SaramaClientOption {
93 return func(args *SaramaClient) {
94 args.consumerGroupPrefix = prefix
95 }
96}
97
98func ConsumerGroupName(name string) SaramaClientOption {
99 return func(args *SaramaClient) {
100 args.consumerGroupName = name
101 }
102}
103
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500104func ConsumerType(consumer int) SaramaClientOption {
105 return func(args *SaramaClient) {
106 args.consumerType = consumer
107 }
108}
109
110func ProducerFlushFrequency(frequency int) SaramaClientOption {
111 return func(args *SaramaClient) {
112 args.producerFlushFrequency = frequency
113 }
114}
115
116func ProducerFlushMessages(num int) SaramaClientOption {
117 return func(args *SaramaClient) {
118 args.producerFlushMessages = num
119 }
120}
121
122func ProducerFlushMaxMessages(num int) SaramaClientOption {
123 return func(args *SaramaClient) {
124 args.producerFlushMaxmessages = num
125 }
126}
127
khenaidoo90847922018-12-03 14:47:51 -0500128func ProducerMaxRetries(num int) SaramaClientOption {
129 return func(args *SaramaClient) {
130 args.producerRetryMax = num
131 }
132}
133
134func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
135 return func(args *SaramaClient) {
136 args.producerRetryBackOff = duration
137 }
138}
139
140func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500141 return func(args *SaramaClient) {
142 args.producerReturnErrors = opt
143 }
144}
145
khenaidoo90847922018-12-03 14:47:51 -0500146func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500147 return func(args *SaramaClient) {
148 args.producerReturnSuccess = opt
149 }
150}
151
152func ConsumerMaxWait(wait int) SaramaClientOption {
153 return func(args *SaramaClient) {
154 args.consumerMaxwait = wait
155 }
156}
157
158func MaxProcessingTime(pTime int) SaramaClientOption {
159 return func(args *SaramaClient) {
160 args.maxProcessingTime = pTime
161 }
162}
163
164func NumPartitions(number int) SaramaClientOption {
165 return func(args *SaramaClient) {
166 args.numPartitions = number
167 }
168}
169
170func NumReplicas(number int) SaramaClientOption {
171 return func(args *SaramaClient) {
172 args.numReplicas = number
173 }
174}
175
176func AutoCreateTopic(opt bool) SaramaClientOption {
177 return func(args *SaramaClient) {
178 args.autoCreateTopic = opt
179 }
180}
181
khenaidoo43c82122018-11-22 18:38:28 -0500182func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
183 client := &SaramaClient{
184 KafkaHost: DefaultKafkaHost,
185 KafkaPort: DefaultKafkaPort,
186 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500187 client.consumerType = DefaultConsumerType
188 client.producerFlushFrequency = DefaultProducerFlushFrequency
189 client.producerFlushMessages = DefaultProducerFlushMessages
190 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
191 client.producerReturnErrors = DefaultProducerReturnErrors
192 client.producerReturnSuccess = DefaultProducerReturnSuccess
193 client.producerRetryMax = DefaultProducerRetryMax
194 client.producerRetryBackOff = DefaultProducerRetryBackoff
195 client.consumerMaxwait = DefaultConsumerMaxwait
196 client.maxProcessingTime = DefaultMaxProcessingTime
197 client.numPartitions = DefaultNumberPartitions
198 client.numReplicas = DefaultNumberReplicas
199 client.autoCreateTopic = DefaultAutoCreateTopic
khenaidoo43c82122018-11-22 18:38:28 -0500200
201 for _, option := range opts {
202 option(client)
203 }
204
khenaidooca301322019-01-09 23:06:32 -0500205 client.groupConsumers = make(map[string]*scc.Consumer)
206
khenaidoo43c82122018-11-22 18:38:28 -0500207 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500208 client.topicLockMap = make(map[string]*sync.RWMutex)
209 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500210 client.lockOfGroupConsumers = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500211 return client
212}
213
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500214func (sc *SaramaClient) Start() error {
215 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500216
217 // Create the Done channel
218 sc.doneCh = make(chan int, 1)
219
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500220 var err error
221
222 // Create the Cluster Admin
223 if err = sc.createClusterAdmin(); err != nil {
224 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
225 return err
226 }
227
khenaidoo43c82122018-11-22 18:38:28 -0500228 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500229 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500230 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
231 return err
232 }
233
khenaidooca301322019-01-09 23:06:32 -0500234 if sc.consumerType == DefaultConsumerType {
235 // Create the master consumers
236 if err := sc.createConsumer(); err != nil {
237 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
238 return err
239 }
khenaidoo43c82122018-11-22 18:38:28 -0500240 }
241
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500242 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500243 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
244
khenaidooca301322019-01-09 23:06:32 -0500245 log.Info("kafka-sarama-client-started")
246
khenaidoo43c82122018-11-22 18:38:28 -0500247 return nil
248}
249
250func (sc *SaramaClient) Stop() {
251 log.Info("stopping-sarama-client")
252
253 //Send a message over the done channel to close all long running routines
254 sc.doneCh <- 1
255
khenaidoo43c82122018-11-22 18:38:28 -0500256 if sc.producer != nil {
257 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500258 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500259 }
260 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500261
khenaidoo43c82122018-11-22 18:38:28 -0500262 if sc.consumer != nil {
263 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500264 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500265 }
266 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500267
khenaidooca301322019-01-09 23:06:32 -0500268 for key, val := range sc.groupConsumers {
269 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
270 if err := val.Close(); err != nil {
271 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500272 }
273 }
274
275 if sc.cAdmin != nil {
276 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500277 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500278 }
279 }
280
281 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500282 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500283
284 log.Info("sarama-client-stopped")
285}
286
khenaidooca301322019-01-09 23:06:32 -0500287//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
288// the invoking function must hold the lock
289func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500290 // Set the topic details
291 topicDetail := &sarama.TopicDetail{}
292 topicDetail.NumPartitions = int32(numPartition)
293 topicDetail.ReplicationFactor = int16(repFactor)
294 topicDetail.ConfigEntries = make(map[string]*string)
295 topicDetails := make(map[string]*sarama.TopicDetail)
296 topicDetails[topic.Name] = topicDetail
297
298 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
299 if err == sarama.ErrTopicAlreadyExists {
300 // Not an error
301 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
302 return nil
303 }
304 log.Errorw("create-topic-failure", log.Fields{"error": err})
305 return err
306 }
307 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
308 // do so.
309 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
310 return nil
311}
312
khenaidooca301322019-01-09 23:06:32 -0500313//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
314// ensure no two go routines are performing operations on the same topic
315func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
316 sc.lockTopic(topic)
317 defer sc.unLockTopic(topic)
318
319 return sc.createTopic(topic, numPartition, repFactor)
320}
321
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500322//DeleteTopic removes a topic from the kafka Broker
323func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500324 sc.lockTopic(topic)
325 defer sc.unLockTopic(topic)
326
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500327 // Remove the topic from the broker
328 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
329 if err == sarama.ErrUnknownTopicOrPartition {
330 // Not an error as does not exist
331 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
332 return nil
333 }
334 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
335 return err
336 }
337
338 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
339 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
340 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
341 return err
342 }
343 return nil
344}
345
346// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
347// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500348func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500349 sc.lockTopic(topic)
350 defer sc.unLockTopic(topic)
351
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500352 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
353
354 // If a consumers already exist for that topic then resuse it
355 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
356 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
357 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500358 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500359 sc.addChannelToConsumerChannelMap(topic, ch)
360 return ch, nil
361 }
362
363 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500364 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500365 var err error
366
367 // Use the consumerType option to figure out the type of consumer to launch
368 if sc.consumerType == PartitionConsumer {
369 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500370 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500371 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
372 return nil, err
373 }
374 }
375 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
376 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
377 return nil, err
378 }
379 } else if sc.consumerType == GroupCustomer {
380 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
381 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500382 //if sc.autoCreateTopic {
383 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
384 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
385 // return nil, err
386 // }
387 //}
388 //groupId := sc.consumerGroupName
389 groupId := getGroupId(kvArgs...)
390 // Include the group prefix
391 if groupId != "" {
392 groupId = sc.consumerGroupPrefix + groupId
393 } else {
394 // Need to use a unique group Id per topic
395 groupId = sc.consumerGroupPrefix + topic.Name
396 }
397 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
398 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500399 return nil, err
400 }
khenaidooca301322019-01-09 23:06:32 -0500401
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500402 } else {
403 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
404 return nil, errors.New("unknown-consumer-type")
405 }
406
407 return consumerListeningChannel, nil
408}
409
410//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500411func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500412 sc.lockTopic(topic)
413 defer sc.unLockTopic(topic)
414
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500415 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500416 var err error
417 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
418 log.Errorw("failed-removing-channel", log.Fields{"error": err})
419 }
420 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
421 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
422 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500423 return err
424}
425
426// send formats and sends the request onto the kafka messaging bus.
427func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
428
429 // Assert message is a proto message
430 var protoMsg proto.Message
431 var ok bool
432 // ascertain the value interface type is a proto.Message
433 if protoMsg, ok = msg.(proto.Message); !ok {
434 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
435 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
436 }
437
438 var marshalled []byte
439 var err error
440 // Create the Sarama producer message
441 if marshalled, err = proto.Marshal(protoMsg); err != nil {
442 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
443 return err
444 }
445 key := ""
446 if len(keys) > 0 {
447 key = keys[0] // Only the first key is relevant
448 }
449 kafkaMsg := &sarama.ProducerMessage{
450 Topic: topic.Name,
451 Key: sarama.StringEncoder(key),
452 Value: sarama.ByteEncoder(marshalled),
453 }
454
455 // Send message to kafka
456 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500457
458 // Wait for result
459 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
460 select {
461 case ok := <-sc.producer.Successes():
khenaidoo79232702018-12-04 11:00:41 -0500462 log.Debugw("message-sent", log.Fields{"status": ok})
khenaidoo90847922018-12-03 14:47:51 -0500463 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500464 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500465 return notOk
466 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500467 return nil
468}
469
khenaidooca301322019-01-09 23:06:32 -0500470// getGroupId returns the group id from the key-value args.
471func getGroupId(kvArgs ...*KVArg) string {
472 for _, arg := range kvArgs {
473 if arg.Key == GroupIdKey {
474 return arg.Value.(string)
475 }
476 }
477 return ""
478}
479
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500480func (sc *SaramaClient) createClusterAdmin() error {
481 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
482 config := sarama.NewConfig()
483 config.Version = sarama.V1_0_0_0
484
485 // Create a cluster Admin
486 var cAdmin sarama.ClusterAdmin
487 var err error
488 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
489 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
490 return err
491 }
492 sc.cAdmin = cAdmin
493 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500494}
495
khenaidood2b6df92018-12-13 16:37:20 -0500496func (sc *SaramaClient) lockTopic(topic *Topic) {
497 sc.lockOfTopicLockMap.Lock()
498 if _, exist := sc.topicLockMap[topic.Name]; exist {
499 sc.lockOfTopicLockMap.Unlock()
500 sc.topicLockMap[topic.Name].Lock()
501 } else {
502 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
503 sc.lockOfTopicLockMap.Unlock()
504 sc.topicLockMap[topic.Name].Lock()
505 }
506}
507
508func (sc *SaramaClient) unLockTopic(topic *Topic) {
509 sc.lockOfTopicLockMap.Lock()
510 defer sc.lockOfTopicLockMap.Unlock()
511 if _, exist := sc.topicLockMap[topic.Name]; exist {
512 sc.topicLockMap[topic.Name].Unlock()
513 }
514}
515
khenaidoo43c82122018-11-22 18:38:28 -0500516func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
517 sc.lockTopicToConsumerChannelMap.Lock()
518 defer sc.lockTopicToConsumerChannelMap.Unlock()
519 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
520 sc.topicToConsumerChannelMap[id] = arg
521 }
522}
523
524func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
525 sc.lockTopicToConsumerChannelMap.Lock()
526 defer sc.lockTopicToConsumerChannelMap.Unlock()
527 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
528 delete(sc.topicToConsumerChannelMap, id)
529 }
530}
531
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500532func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo43c82122018-11-22 18:38:28 -0500533 sc.lockTopicToConsumerChannelMap.Lock()
534 defer sc.lockTopicToConsumerChannelMap.Unlock()
535
536 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
537 return consumerCh
538 }
539 return nil
540}
541
khenaidoo79232702018-12-04 11:00:41 -0500542func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500543 sc.lockTopicToConsumerChannelMap.Lock()
544 defer sc.lockTopicToConsumerChannelMap.Unlock()
545 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
546 consumerCh.channels = append(consumerCh.channels, ch)
547 return
548 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500549 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
550}
551
552//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
553func closeConsumers(consumers []interface{}) error {
554 var err error
555 for _, consumer := range consumers {
556 // Is it a partition consumers?
557 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
558 if errTemp := partionConsumer.Close(); errTemp != nil {
559 log.Debugw("partition!!!", log.Fields{"err": errTemp})
560 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
561 // This can occur on race condition
562 err = nil
563 } else {
564 err = errTemp
565 }
566 }
567 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
568 if errTemp := groupConsumer.Close(); errTemp != nil {
569 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
570 // This can occur on race condition
571 err = nil
572 } else {
573 err = errTemp
574 }
575 }
576 }
577 }
578 return err
khenaidoo43c82122018-11-22 18:38:28 -0500579}
580
khenaidoo79232702018-12-04 11:00:41 -0500581func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500582 sc.lockTopicToConsumerChannelMap.Lock()
583 defer sc.lockTopicToConsumerChannelMap.Unlock()
584 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
585 // Channel will be closed in the removeChannel method
586 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500587 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500588 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500589 log.Debugw("closing-consumers", log.Fields{"topic": topic})
590 err := closeConsumers(consumerCh.consumers)
591 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500592 delete(sc.topicToConsumerChannelMap, topic.Name)
593 return err
594 }
595 return nil
596 }
597 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
598 return errors.New("topic-does-not-exist")
599}
600
601func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
602 sc.lockTopicToConsumerChannelMap.Lock()
603 defer sc.lockTopicToConsumerChannelMap.Unlock()
604 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
605 for _, ch := range consumerCh.channels {
606 // Channel will be closed in the removeChannel method
607 removeChannel(consumerCh.channels, ch)
608 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500609 err := closeConsumers(consumerCh.consumers)
610 //if err == sarama.ErrUnknownTopicOrPartition {
611 // // Not an error
612 // err = nil
613 //}
614 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500615 delete(sc.topicToConsumerChannelMap, topic.Name)
616 return err
617 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500618 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
619 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500620}
621
622func (sc *SaramaClient) clearConsumerChannelMap() error {
623 sc.lockTopicToConsumerChannelMap.Lock()
624 defer sc.lockTopicToConsumerChannelMap.Unlock()
625 var err error
626 for topic, consumerCh := range sc.topicToConsumerChannelMap {
627 for _, ch := range consumerCh.channels {
628 // Channel will be closed in the removeChannel method
629 removeChannel(consumerCh.channels, ch)
630 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500631 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
632 err = errTemp
633 }
634 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500635 delete(sc.topicToConsumerChannelMap, topic)
636 }
637 return err
638}
639
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500640//createPublisher creates the publisher which is used to send a message onto kafka
641func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500642 // This Creates the publisher
643 config := sarama.NewConfig()
644 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500645 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
646 config.Producer.Flush.Messages = sc.producerFlushMessages
647 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
648 config.Producer.Return.Errors = sc.producerReturnErrors
649 config.Producer.Return.Successes = sc.producerReturnSuccess
650 //config.Producer.RequiredAcks = sarama.WaitForAll
651 config.Producer.RequiredAcks = sarama.WaitForLocal
652
khenaidoo43c82122018-11-22 18:38:28 -0500653 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
654 brokers := []string{kafkaFullAddr}
655
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500656 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
657 log.Errorw("error-starting-publisher", log.Fields{"error": err})
658 return err
659 } else {
660 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500661 }
662 log.Info("Kafka-publisher-created")
663 return nil
664}
665
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500666func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500667 config := sarama.NewConfig()
668 config.Consumer.Return.Errors = true
669 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500670 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
671 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500672 config.Consumer.Offsets.Initial = sarama.OffsetNewest
673 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
674 brokers := []string{kafkaFullAddr}
675
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500676 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
677 log.Errorw("error-starting-consumers", log.Fields{"error": err})
678 return err
679 } else {
680 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500681 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500682 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500683 return nil
684}
685
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500686// createGroupConsumer creates a consumers group
khenaidooca301322019-01-09 23:06:32 -0500687func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500688 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500689 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500690 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500691 //config.Consumer.Return.Errors = true
692 //config.Group.Return.Notifications = false
693 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
694 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500695 config.Consumer.Offsets.Initial = sarama.OffsetNewest
khenaidooca301322019-01-09 23:06:32 -0500696 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500697 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
698 brokers := []string{kafkaFullAddr}
699
khenaidoo43c82122018-11-22 18:38:28 -0500700 topics := []string{topic.Name}
701 var consumer *scc.Consumer
702 var err error
703
khenaidooca301322019-01-09 23:06:32 -0500704 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
705 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500706 return nil, err
707 }
khenaidooca301322019-01-09 23:06:32 -0500708 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500709
710 //sc.groupConsumers[topic.Name] = consumer
711 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500712 return consumer, nil
713}
714
khenaidoo43c82122018-11-22 18:38:28 -0500715// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
716// topic via the unique channel each subsciber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500717func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500718 // Need to go over all channels and publish messages to them - do we need to copy msg?
719 sc.lockTopicToConsumerChannelMap.Lock()
720 defer sc.lockTopicToConsumerChannelMap.Unlock()
721 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500722 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500723 c <- protoMessage
724 }(ch)
725 }
726}
727
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500728func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
729 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500730startloop:
731 for {
732 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500733 case err, ok := <-consumer.Errors():
734 if ok {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500735 log.Warnw("partition-consumers-error", log.Fields{"error": err})
736 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500737 // Channel is closed
738 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500739 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500740 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500741 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500742 if !ok {
743 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500744 break startloop
745 }
746 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500747 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500748 if err := proto.Unmarshal(msgBody, icm); err != nil {
749 log.Warnw("partition-invalid-message", log.Fields{"error": err})
750 continue
751 }
752 go sc.dispatchToConsumers(consumerChnls, icm)
753 case <-sc.doneCh:
754 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
755 break startloop
756 }
757 }
758 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
759}
760
761func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
762 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
763
764startloop:
765 for {
766 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500767 case err, ok := <-consumer.Errors():
768 if ok {
khenaidooca301322019-01-09 23:06:32 -0500769 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500770 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500771 // channel is closed
772 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500773 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500774 case msg, ok := <-consumer.Messages():
775 if !ok {
776 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500777 break startloop
778 }
khenaidooca301322019-01-09 23:06:32 -0500779 log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500780 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500781 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500782 if err := proto.Unmarshal(msgBody, icm); err != nil {
783 log.Warnw("invalid-message", log.Fields{"error": err})
784 continue
785 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500786 go sc.dispatchToConsumers(consumerChnls, icm)
787 consumer.MarkOffset(msg, "")
788 case ntf := <-consumer.Notifications():
789 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500790 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500791 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500792 break startloop
793 }
794 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500795 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500796}
797
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500798func (sc *SaramaClient) startConsumers(topic *Topic) error {
799 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
800 var consumerCh *consumerChannels
801 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
802 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
803 return errors.New("consumers-not-exist")
804 }
805 // For each consumer listening for that topic, start a consumption loop
806 for _, consumer := range consumerCh.consumers {
807 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
808 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
809 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
810 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
811 } else {
812 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
813 return errors.New("invalid-consumer")
814 }
815 }
816 return nil
817}
818
819//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
820//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500821func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500822 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500823 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500824
khenaidoo7ff26c72019-01-16 14:55:48 -0500825 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500826 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500827 return nil, err
828 }
829
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500830 consumersIf := make([]interface{}, 0)
831 for _, pConsumer := range pConsumers {
832 consumersIf = append(consumersIf, pConsumer)
833 }
834
835 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500836 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500837 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500838 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500839 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500840 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500841 }
842
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500843 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500844 sc.addTopicToConsumerChannelMap(topic.Name, cc)
845
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500846 //Start a consumers to listen on that specific topic
847 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500848
849 return consumerListeningChannel, nil
850}
851
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500852// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
853// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500854func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500855 // TODO: Replace this development partition consumers with a group consumers
856 var pConsumer *scc.Consumer
857 var err error
khenaidooca301322019-01-09 23:06:32 -0500858 if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500859 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
860 return nil, err
861 }
862 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
863 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500864 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500865 cc := &consumerChannels{
866 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500867 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500868 }
869
870 // Add the consumers channel to the map
871 sc.addTopicToConsumerChannelMap(topic.Name, cc)
872
873 //Start a consumers to listen on that specific topic
874 go sc.startConsumers(topic)
875
876 return consumerListeningChannel, nil
877}
878
khenaidoo7ff26c72019-01-16 14:55:48 -0500879func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500880 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500881 partitionList, err := sc.consumer.Partitions(topic.Name)
882 if err != nil {
883 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
884 return nil, err
885 }
886
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500887 pConsumers := make([]sarama.PartitionConsumer, 0)
888 for _, partition := range partitionList {
889 var pConsumer sarama.PartitionConsumer
890 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
891 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
892 return nil, err
893 }
894 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500895 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500896 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500897}
898
khenaidoo79232702018-12-04 11:00:41 -0500899func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500900 var i int
khenaidoo79232702018-12-04 11:00:41 -0500901 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500902 for i, channel = range channels {
903 if channel == ch {
904 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
905 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500906 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -0500907 return channels[:len(channels)-1]
908 }
909 }
910 return channels
911}
khenaidoo7ff26c72019-01-16 14:55:48 -0500912
913
914func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
915 sc.lockOfGroupConsumers.Lock()
916 defer sc.lockOfGroupConsumers.Unlock()
917 if _, exist := sc.groupConsumers[topic]; !exist {
918 sc.groupConsumers[topic] = consumer
919 }
920}
921
922func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
923 sc.lockOfGroupConsumers.Lock()
924 defer sc.lockOfGroupConsumers.Unlock()
925 if _, exist := sc.groupConsumers[topic]; exist {
926 consumer := sc.groupConsumers[topic]
927 delete(sc.groupConsumers, topic)
928 if err := consumer.Close(); err!= nil {
929 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
930 return err
931 }
932 }
933 return nil
934}