blob: 55c68a3b8cda77cc92e8495cd03d55299c0e262f [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
khenaidoo43c82122018-11-22 18:38:28 -050021 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050023 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050024 "github.com/opencord/voltha-go/common/log"
khenaidoo79232702018-12-04 11:00:41 -050025 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "gopkg.in/Shopify/sarama.v1"
27 "strings"
khenaidoo43c82122018-11-22 18:38:28 -050028 "sync"
29 "time"
30)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
khenaidooca301322019-01-09 23:06:32 -050033 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050034}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050043 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050054 groupConsumers map[string]*scc.Consumer
khenaidoo7ff26c72019-01-16 14:55:48 -050055 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050056 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050057 consumerType int
khenaidooca301322019-01-09 23:06:32 -050058 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050059 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050071 doneCh chan int
72 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050074 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
khenaidoo43c82122018-11-22 18:38:28 -050076}
77
78type SaramaClientOption func(*SaramaClient)
79
80func Host(host string) SaramaClientOption {
81 return func(args *SaramaClient) {
82 args.KafkaHost = host
83 }
84}
85
86func Port(port int) SaramaClientOption {
87 return func(args *SaramaClient) {
88 args.KafkaPort = port
89 }
90}
91
khenaidooca301322019-01-09 23:06:32 -050092func ConsumerGroupPrefix(prefix string) SaramaClientOption {
93 return func(args *SaramaClient) {
94 args.consumerGroupPrefix = prefix
95 }
96}
97
98func ConsumerGroupName(name string) SaramaClientOption {
99 return func(args *SaramaClient) {
100 args.consumerGroupName = name
101 }
102}
103
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500104func ConsumerType(consumer int) SaramaClientOption {
105 return func(args *SaramaClient) {
106 args.consumerType = consumer
107 }
108}
109
110func ProducerFlushFrequency(frequency int) SaramaClientOption {
111 return func(args *SaramaClient) {
112 args.producerFlushFrequency = frequency
113 }
114}
115
116func ProducerFlushMessages(num int) SaramaClientOption {
117 return func(args *SaramaClient) {
118 args.producerFlushMessages = num
119 }
120}
121
122func ProducerFlushMaxMessages(num int) SaramaClientOption {
123 return func(args *SaramaClient) {
124 args.producerFlushMaxmessages = num
125 }
126}
127
khenaidoo90847922018-12-03 14:47:51 -0500128func ProducerMaxRetries(num int) SaramaClientOption {
129 return func(args *SaramaClient) {
130 args.producerRetryMax = num
131 }
132}
133
134func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
135 return func(args *SaramaClient) {
136 args.producerRetryBackOff = duration
137 }
138}
139
140func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500141 return func(args *SaramaClient) {
142 args.producerReturnErrors = opt
143 }
144}
145
khenaidoo90847922018-12-03 14:47:51 -0500146func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500147 return func(args *SaramaClient) {
148 args.producerReturnSuccess = opt
149 }
150}
151
152func ConsumerMaxWait(wait int) SaramaClientOption {
153 return func(args *SaramaClient) {
154 args.consumerMaxwait = wait
155 }
156}
157
158func MaxProcessingTime(pTime int) SaramaClientOption {
159 return func(args *SaramaClient) {
160 args.maxProcessingTime = pTime
161 }
162}
163
164func NumPartitions(number int) SaramaClientOption {
165 return func(args *SaramaClient) {
166 args.numPartitions = number
167 }
168}
169
170func NumReplicas(number int) SaramaClientOption {
171 return func(args *SaramaClient) {
172 args.numReplicas = number
173 }
174}
175
176func AutoCreateTopic(opt bool) SaramaClientOption {
177 return func(args *SaramaClient) {
178 args.autoCreateTopic = opt
179 }
180}
181
khenaidoo43c82122018-11-22 18:38:28 -0500182func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
183 client := &SaramaClient{
184 KafkaHost: DefaultKafkaHost,
185 KafkaPort: DefaultKafkaPort,
186 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500187 client.consumerType = DefaultConsumerType
188 client.producerFlushFrequency = DefaultProducerFlushFrequency
189 client.producerFlushMessages = DefaultProducerFlushMessages
190 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
191 client.producerReturnErrors = DefaultProducerReturnErrors
192 client.producerReturnSuccess = DefaultProducerReturnSuccess
193 client.producerRetryMax = DefaultProducerRetryMax
194 client.producerRetryBackOff = DefaultProducerRetryBackoff
195 client.consumerMaxwait = DefaultConsumerMaxwait
196 client.maxProcessingTime = DefaultMaxProcessingTime
197 client.numPartitions = DefaultNumberPartitions
198 client.numReplicas = DefaultNumberReplicas
199 client.autoCreateTopic = DefaultAutoCreateTopic
khenaidoo43c82122018-11-22 18:38:28 -0500200
201 for _, option := range opts {
202 option(client)
203 }
204
khenaidooca301322019-01-09 23:06:32 -0500205 client.groupConsumers = make(map[string]*scc.Consumer)
206
khenaidoo43c82122018-11-22 18:38:28 -0500207 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500208 client.topicLockMap = make(map[string]*sync.RWMutex)
209 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500210 client.lockOfGroupConsumers = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500211 return client
212}
213
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500214func (sc *SaramaClient) Start() error {
215 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500216
217 // Create the Done channel
218 sc.doneCh = make(chan int, 1)
219
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500220 var err error
221
222 // Create the Cluster Admin
223 if err = sc.createClusterAdmin(); err != nil {
224 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
225 return err
226 }
227
khenaidoo43c82122018-11-22 18:38:28 -0500228 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500229 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500230 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
231 return err
232 }
233
khenaidooca301322019-01-09 23:06:32 -0500234 if sc.consumerType == DefaultConsumerType {
235 // Create the master consumers
236 if err := sc.createConsumer(); err != nil {
237 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
238 return err
239 }
khenaidoo43c82122018-11-22 18:38:28 -0500240 }
241
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500242 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500243 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
244
khenaidooca301322019-01-09 23:06:32 -0500245 log.Info("kafka-sarama-client-started")
246
khenaidoo43c82122018-11-22 18:38:28 -0500247 return nil
248}
249
250func (sc *SaramaClient) Stop() {
251 log.Info("stopping-sarama-client")
252
253 //Send a message over the done channel to close all long running routines
254 sc.doneCh <- 1
255
khenaidoo43c82122018-11-22 18:38:28 -0500256 if sc.producer != nil {
257 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500258 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500259 }
260 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500261
khenaidoo43c82122018-11-22 18:38:28 -0500262 if sc.consumer != nil {
263 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500264 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500265 }
266 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500267
khenaidooca301322019-01-09 23:06:32 -0500268 for key, val := range sc.groupConsumers {
269 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
270 if err := val.Close(); err != nil {
271 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500272 }
273 }
274
275 if sc.cAdmin != nil {
276 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500277 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500278 }
279 }
280
281 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500282 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500283
284 log.Info("sarama-client-stopped")
285}
286
khenaidooca301322019-01-09 23:06:32 -0500287//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
288// the invoking function must hold the lock
289func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500290 // Set the topic details
291 topicDetail := &sarama.TopicDetail{}
292 topicDetail.NumPartitions = int32(numPartition)
293 topicDetail.ReplicationFactor = int16(repFactor)
294 topicDetail.ConfigEntries = make(map[string]*string)
295 topicDetails := make(map[string]*sarama.TopicDetail)
296 topicDetails[topic.Name] = topicDetail
297
298 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
299 if err == sarama.ErrTopicAlreadyExists {
300 // Not an error
301 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
302 return nil
303 }
304 log.Errorw("create-topic-failure", log.Fields{"error": err})
305 return err
306 }
307 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
308 // do so.
309 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
310 return nil
311}
312
khenaidooca301322019-01-09 23:06:32 -0500313//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
314// ensure no two go routines are performing operations on the same topic
315func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
316 sc.lockTopic(topic)
317 defer sc.unLockTopic(topic)
318
319 return sc.createTopic(topic, numPartition, repFactor)
320}
321
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500322//DeleteTopic removes a topic from the kafka Broker
323func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500324 sc.lockTopic(topic)
325 defer sc.unLockTopic(topic)
326
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500327 // Remove the topic from the broker
328 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
329 if err == sarama.ErrUnknownTopicOrPartition {
330 // Not an error as does not exist
331 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
332 return nil
333 }
334 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
335 return err
336 }
337
338 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
339 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
340 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
341 return err
342 }
343 return nil
344}
345
346// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
347// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500348func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500349 sc.lockTopic(topic)
350 defer sc.unLockTopic(topic)
351
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500352 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
353
354 // If a consumers already exist for that topic then resuse it
355 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
356 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
357 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500358 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500359 sc.addChannelToConsumerChannelMap(topic, ch)
360 return ch, nil
361 }
362
363 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500364 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500365 var err error
366
367 // Use the consumerType option to figure out the type of consumer to launch
368 if sc.consumerType == PartitionConsumer {
369 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500370 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500371 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
372 return nil, err
373 }
374 }
khenaidoo731697e2019-01-29 16:03:29 -0500375 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500376 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
377 return nil, err
378 }
379 } else if sc.consumerType == GroupCustomer {
380 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
381 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500382 //if sc.autoCreateTopic {
383 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
384 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
385 // return nil, err
386 // }
387 //}
388 //groupId := sc.consumerGroupName
389 groupId := getGroupId(kvArgs...)
390 // Include the group prefix
391 if groupId != "" {
392 groupId = sc.consumerGroupPrefix + groupId
393 } else {
394 // Need to use a unique group Id per topic
395 groupId = sc.consumerGroupPrefix + topic.Name
396 }
khenaidoo731697e2019-01-29 16:03:29 -0500397 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500398 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500399 return nil, err
400 }
khenaidooca301322019-01-09 23:06:32 -0500401
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500402 } else {
403 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
404 return nil, errors.New("unknown-consumer-type")
405 }
406
407 return consumerListeningChannel, nil
408}
409
410//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500411func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500412 sc.lockTopic(topic)
413 defer sc.unLockTopic(topic)
414
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500415 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500416 var err error
417 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
418 log.Errorw("failed-removing-channel", log.Fields{"error": err})
419 }
420 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
421 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
422 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500423 return err
424}
425
426// send formats and sends the request onto the kafka messaging bus.
427func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
428
429 // Assert message is a proto message
430 var protoMsg proto.Message
431 var ok bool
432 // ascertain the value interface type is a proto.Message
433 if protoMsg, ok = msg.(proto.Message); !ok {
434 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
435 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
436 }
437
438 var marshalled []byte
439 var err error
440 // Create the Sarama producer message
441 if marshalled, err = proto.Marshal(protoMsg); err != nil {
442 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
443 return err
444 }
445 key := ""
446 if len(keys) > 0 {
447 key = keys[0] // Only the first key is relevant
448 }
449 kafkaMsg := &sarama.ProducerMessage{
450 Topic: topic.Name,
451 Key: sarama.StringEncoder(key),
452 Value: sarama.ByteEncoder(marshalled),
453 }
454
455 // Send message to kafka
456 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500457
458 // Wait for result
459 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
460 select {
461 case ok := <-sc.producer.Successes():
khenaidoo79232702018-12-04 11:00:41 -0500462 log.Debugw("message-sent", log.Fields{"status": ok})
khenaidoo90847922018-12-03 14:47:51 -0500463 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500464 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500465 return notOk
466 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500467 return nil
468}
469
khenaidooca301322019-01-09 23:06:32 -0500470// getGroupId returns the group id from the key-value args.
471func getGroupId(kvArgs ...*KVArg) string {
472 for _, arg := range kvArgs {
473 if arg.Key == GroupIdKey {
474 return arg.Value.(string)
475 }
476 }
477 return ""
478}
479
khenaidoo731697e2019-01-29 16:03:29 -0500480// getOffset returns the offset from the key-value args.
481func getOffset(kvArgs ...*KVArg) int64 {
482 for _, arg := range kvArgs {
483 if arg.Key == Offset {
484 return arg.Value.(int64)
485 }
486 }
487 return sarama.OffsetNewest
488}
489
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500490func (sc *SaramaClient) createClusterAdmin() error {
491 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
492 config := sarama.NewConfig()
493 config.Version = sarama.V1_0_0_0
494
495 // Create a cluster Admin
496 var cAdmin sarama.ClusterAdmin
497 var err error
498 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
499 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
500 return err
501 }
502 sc.cAdmin = cAdmin
503 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500504}
505
khenaidood2b6df92018-12-13 16:37:20 -0500506func (sc *SaramaClient) lockTopic(topic *Topic) {
507 sc.lockOfTopicLockMap.Lock()
508 if _, exist := sc.topicLockMap[topic.Name]; exist {
509 sc.lockOfTopicLockMap.Unlock()
510 sc.topicLockMap[topic.Name].Lock()
511 } else {
512 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
513 sc.lockOfTopicLockMap.Unlock()
514 sc.topicLockMap[topic.Name].Lock()
515 }
516}
517
518func (sc *SaramaClient) unLockTopic(topic *Topic) {
519 sc.lockOfTopicLockMap.Lock()
520 defer sc.lockOfTopicLockMap.Unlock()
521 if _, exist := sc.topicLockMap[topic.Name]; exist {
522 sc.topicLockMap[topic.Name].Unlock()
523 }
524}
525
khenaidoo43c82122018-11-22 18:38:28 -0500526func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
527 sc.lockTopicToConsumerChannelMap.Lock()
528 defer sc.lockTopicToConsumerChannelMap.Unlock()
529 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
530 sc.topicToConsumerChannelMap[id] = arg
531 }
532}
533
534func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
535 sc.lockTopicToConsumerChannelMap.Lock()
536 defer sc.lockTopicToConsumerChannelMap.Unlock()
537 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
538 delete(sc.topicToConsumerChannelMap, id)
539 }
540}
541
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500542func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo43c82122018-11-22 18:38:28 -0500543 sc.lockTopicToConsumerChannelMap.Lock()
544 defer sc.lockTopicToConsumerChannelMap.Unlock()
545
546 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
547 return consumerCh
548 }
549 return nil
550}
551
khenaidoo79232702018-12-04 11:00:41 -0500552func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500553 sc.lockTopicToConsumerChannelMap.Lock()
554 defer sc.lockTopicToConsumerChannelMap.Unlock()
555 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
556 consumerCh.channels = append(consumerCh.channels, ch)
557 return
558 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500559 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
560}
561
562//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
563func closeConsumers(consumers []interface{}) error {
564 var err error
565 for _, consumer := range consumers {
566 // Is it a partition consumers?
567 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
568 if errTemp := partionConsumer.Close(); errTemp != nil {
569 log.Debugw("partition!!!", log.Fields{"err": errTemp})
570 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
571 // This can occur on race condition
572 err = nil
573 } else {
574 err = errTemp
575 }
576 }
577 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
578 if errTemp := groupConsumer.Close(); errTemp != nil {
579 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
580 // This can occur on race condition
581 err = nil
582 } else {
583 err = errTemp
584 }
585 }
586 }
587 }
588 return err
khenaidoo43c82122018-11-22 18:38:28 -0500589}
590
khenaidoo79232702018-12-04 11:00:41 -0500591func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500592 sc.lockTopicToConsumerChannelMap.Lock()
593 defer sc.lockTopicToConsumerChannelMap.Unlock()
594 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
595 // Channel will be closed in the removeChannel method
596 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500597 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500598 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500599 log.Debugw("closing-consumers", log.Fields{"topic": topic})
600 err := closeConsumers(consumerCh.consumers)
601 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500602 delete(sc.topicToConsumerChannelMap, topic.Name)
603 return err
604 }
605 return nil
606 }
607 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
608 return errors.New("topic-does-not-exist")
609}
610
611func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
612 sc.lockTopicToConsumerChannelMap.Lock()
613 defer sc.lockTopicToConsumerChannelMap.Unlock()
614 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
615 for _, ch := range consumerCh.channels {
616 // Channel will be closed in the removeChannel method
617 removeChannel(consumerCh.channels, ch)
618 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500619 err := closeConsumers(consumerCh.consumers)
620 //if err == sarama.ErrUnknownTopicOrPartition {
621 // // Not an error
622 // err = nil
623 //}
624 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500625 delete(sc.topicToConsumerChannelMap, topic.Name)
626 return err
627 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500628 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
629 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500630}
631
632func (sc *SaramaClient) clearConsumerChannelMap() error {
633 sc.lockTopicToConsumerChannelMap.Lock()
634 defer sc.lockTopicToConsumerChannelMap.Unlock()
635 var err error
636 for topic, consumerCh := range sc.topicToConsumerChannelMap {
637 for _, ch := range consumerCh.channels {
638 // Channel will be closed in the removeChannel method
639 removeChannel(consumerCh.channels, ch)
640 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500641 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
642 err = errTemp
643 }
644 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500645 delete(sc.topicToConsumerChannelMap, topic)
646 }
647 return err
648}
649
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500650//createPublisher creates the publisher which is used to send a message onto kafka
651func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500652 // This Creates the publisher
653 config := sarama.NewConfig()
654 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500655 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
656 config.Producer.Flush.Messages = sc.producerFlushMessages
657 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
658 config.Producer.Return.Errors = sc.producerReturnErrors
659 config.Producer.Return.Successes = sc.producerReturnSuccess
660 //config.Producer.RequiredAcks = sarama.WaitForAll
661 config.Producer.RequiredAcks = sarama.WaitForLocal
662
khenaidoo43c82122018-11-22 18:38:28 -0500663 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
664 brokers := []string{kafkaFullAddr}
665
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500666 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
667 log.Errorw("error-starting-publisher", log.Fields{"error": err})
668 return err
669 } else {
670 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500671 }
672 log.Info("Kafka-publisher-created")
673 return nil
674}
675
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500676func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500677 config := sarama.NewConfig()
678 config.Consumer.Return.Errors = true
679 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500680 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
681 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500682 config.Consumer.Offsets.Initial = sarama.OffsetNewest
683 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
684 brokers := []string{kafkaFullAddr}
685
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500686 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
687 log.Errorw("error-starting-consumers", log.Fields{"error": err})
688 return err
689 } else {
690 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500691 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500692 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500693 return nil
694}
695
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500696// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500697func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500698 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500699 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500700 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500701 //config.Consumer.Return.Errors = true
702 //config.Group.Return.Notifications = false
703 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
704 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500705 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500706 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500707 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
708 brokers := []string{kafkaFullAddr}
709
khenaidoo43c82122018-11-22 18:38:28 -0500710 topics := []string{topic.Name}
711 var consumer *scc.Consumer
712 var err error
713
khenaidooca301322019-01-09 23:06:32 -0500714 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
715 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500716 return nil, err
717 }
khenaidooca301322019-01-09 23:06:32 -0500718 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500719
720 //sc.groupConsumers[topic.Name] = consumer
721 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500722 return consumer, nil
723}
724
khenaidoo43c82122018-11-22 18:38:28 -0500725// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500726// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500727func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500728 // Need to go over all channels and publish messages to them - do we need to copy msg?
729 sc.lockTopicToConsumerChannelMap.Lock()
730 defer sc.lockTopicToConsumerChannelMap.Unlock()
731 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500732 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500733 c <- protoMessage
734 }(ch)
735 }
736}
737
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500738func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
739 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500740startloop:
741 for {
742 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500743 case err, ok := <-consumer.Errors():
744 if ok {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500745 log.Warnw("partition-consumers-error", log.Fields{"error": err})
746 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500747 // Channel is closed
748 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500749 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500750 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500751 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500752 if !ok {
753 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500754 break startloop
755 }
756 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500757 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500758 if err := proto.Unmarshal(msgBody, icm); err != nil {
759 log.Warnw("partition-invalid-message", log.Fields{"error": err})
760 continue
761 }
762 go sc.dispatchToConsumers(consumerChnls, icm)
763 case <-sc.doneCh:
764 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
765 break startloop
766 }
767 }
768 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
769}
770
771func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
772 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
773
774startloop:
775 for {
776 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500777 case err, ok := <-consumer.Errors():
778 if ok {
khenaidooca301322019-01-09 23:06:32 -0500779 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500780 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500781 // channel is closed
782 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500783 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500784 case msg, ok := <-consumer.Messages():
785 if !ok {
786 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500787 break startloop
788 }
khenaidooca301322019-01-09 23:06:32 -0500789 log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500790 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500791 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500792 if err := proto.Unmarshal(msgBody, icm); err != nil {
793 log.Warnw("invalid-message", log.Fields{"error": err})
794 continue
795 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500796 go sc.dispatchToConsumers(consumerChnls, icm)
797 consumer.MarkOffset(msg, "")
798 case ntf := <-consumer.Notifications():
799 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500800 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500801 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500802 break startloop
803 }
804 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500805 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500806}
807
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500808func (sc *SaramaClient) startConsumers(topic *Topic) error {
809 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
810 var consumerCh *consumerChannels
811 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
812 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
813 return errors.New("consumers-not-exist")
814 }
815 // For each consumer listening for that topic, start a consumption loop
816 for _, consumer := range consumerCh.consumers {
817 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
818 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
819 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
820 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
821 } else {
822 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
823 return errors.New("invalid-consumer")
824 }
825 }
826 return nil
827}
828
829//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
830//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500831func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500832 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500833 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500834
khenaidoo7ff26c72019-01-16 14:55:48 -0500835 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500836 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500837 return nil, err
838 }
839
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500840 consumersIf := make([]interface{}, 0)
841 for _, pConsumer := range pConsumers {
842 consumersIf = append(consumersIf, pConsumer)
843 }
844
845 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500846 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500847 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500848 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500849 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500850 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500851 }
852
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500853 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500854 sc.addTopicToConsumerChannelMap(topic.Name, cc)
855
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500856 //Start a consumers to listen on that specific topic
857 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500858
859 return consumerListeningChannel, nil
860}
861
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500862// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
863// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -0500864func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500865 // TODO: Replace this development partition consumers with a group consumers
866 var pConsumer *scc.Consumer
867 var err error
khenaidoo731697e2019-01-29 16:03:29 -0500868 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500869 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
870 return nil, err
871 }
872 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
873 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500874 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500875 cc := &consumerChannels{
876 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500877 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500878 }
879
880 // Add the consumers channel to the map
881 sc.addTopicToConsumerChannelMap(topic.Name, cc)
882
883 //Start a consumers to listen on that specific topic
884 go sc.startConsumers(topic)
885
886 return consumerListeningChannel, nil
887}
888
khenaidoo7ff26c72019-01-16 14:55:48 -0500889func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500890 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500891 partitionList, err := sc.consumer.Partitions(topic.Name)
892 if err != nil {
893 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
894 return nil, err
895 }
896
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500897 pConsumers := make([]sarama.PartitionConsumer, 0)
898 for _, partition := range partitionList {
899 var pConsumer sarama.PartitionConsumer
900 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
901 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
902 return nil, err
903 }
904 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500905 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500906 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500907}
908
khenaidoo79232702018-12-04 11:00:41 -0500909func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500910 var i int
khenaidoo79232702018-12-04 11:00:41 -0500911 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500912 for i, channel = range channels {
913 if channel == ch {
914 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
915 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500916 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -0500917 return channels[:len(channels)-1]
918 }
919 }
920 return channels
921}
khenaidoo7ff26c72019-01-16 14:55:48 -0500922
923
924func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
925 sc.lockOfGroupConsumers.Lock()
926 defer sc.lockOfGroupConsumers.Unlock()
927 if _, exist := sc.groupConsumers[topic]; !exist {
928 sc.groupConsumers[topic] = consumer
929 }
930}
931
932func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
933 sc.lockOfGroupConsumers.Lock()
934 defer sc.lockOfGroupConsumers.Unlock()
935 if _, exist := sc.groupConsumers[topic]; exist {
936 consumer := sc.groupConsumers[topic]
937 delete(sc.groupConsumers, topic)
938 if err := consumer.Close(); err!= nil {
939 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
940 return err
941 }
942 }
943 return nil
944}