blob: 9e3ce0c5d16826a50847b7d273d4abd3e2537a09 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
Scott Bakera9ebe332019-09-27 12:39:56 -070021 "github.com/Shopify/sarama"
khenaidoo43c82122018-11-22 18:38:28 -050022 scc "github.com/bsm/sarama-cluster"
23 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050024 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050025 "github.com/opencord/voltha-go/common/log"
William Kurkiandaa6bb22019-03-07 12:26:28 -050026 ic "github.com/opencord/voltha-protos/go/inter_container"
Scott Bakera9ebe332019-09-27 12:39:56 -070027 "strings"
28 "sync"
29 "time"
khenaidoo43c82122018-11-22 18:38:28 -050030)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
khenaidooca301322019-01-09 23:06:32 -050033 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050034}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050043 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050054 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040055 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050056 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050057 consumerType int
khenaidooca301322019-01-09 23:06:32 -050058 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050059 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050071 doneCh chan int
72 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050074 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053076 metadataMaxRetry int
khenaidoo43c82122018-11-22 18:38:28 -050077}
78
79type SaramaClientOption func(*SaramaClient)
80
81func Host(host string) SaramaClientOption {
82 return func(args *SaramaClient) {
83 args.KafkaHost = host
84 }
85}
86
87func Port(port int) SaramaClientOption {
88 return func(args *SaramaClient) {
89 args.KafkaPort = port
90 }
91}
92
khenaidooca301322019-01-09 23:06:32 -050093func ConsumerGroupPrefix(prefix string) SaramaClientOption {
94 return func(args *SaramaClient) {
95 args.consumerGroupPrefix = prefix
96 }
97}
98
99func ConsumerGroupName(name string) SaramaClientOption {
100 return func(args *SaramaClient) {
101 args.consumerGroupName = name
102 }
103}
104
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500105func ConsumerType(consumer int) SaramaClientOption {
106 return func(args *SaramaClient) {
107 args.consumerType = consumer
108 }
109}
110
111func ProducerFlushFrequency(frequency int) SaramaClientOption {
112 return func(args *SaramaClient) {
113 args.producerFlushFrequency = frequency
114 }
115}
116
117func ProducerFlushMessages(num int) SaramaClientOption {
118 return func(args *SaramaClient) {
119 args.producerFlushMessages = num
120 }
121}
122
123func ProducerFlushMaxMessages(num int) SaramaClientOption {
124 return func(args *SaramaClient) {
125 args.producerFlushMaxmessages = num
126 }
127}
128
khenaidoo90847922018-12-03 14:47:51 -0500129func ProducerMaxRetries(num int) SaramaClientOption {
130 return func(args *SaramaClient) {
131 args.producerRetryMax = num
132 }
133}
134
135func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
136 return func(args *SaramaClient) {
137 args.producerRetryBackOff = duration
138 }
139}
140
141func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500142 return func(args *SaramaClient) {
143 args.producerReturnErrors = opt
144 }
145}
146
khenaidoo90847922018-12-03 14:47:51 -0500147func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500148 return func(args *SaramaClient) {
149 args.producerReturnSuccess = opt
150 }
151}
152
153func ConsumerMaxWait(wait int) SaramaClientOption {
154 return func(args *SaramaClient) {
155 args.consumerMaxwait = wait
156 }
157}
158
159func MaxProcessingTime(pTime int) SaramaClientOption {
160 return func(args *SaramaClient) {
161 args.maxProcessingTime = pTime
162 }
163}
164
165func NumPartitions(number int) SaramaClientOption {
166 return func(args *SaramaClient) {
167 args.numPartitions = number
168 }
169}
170
171func NumReplicas(number int) SaramaClientOption {
172 return func(args *SaramaClient) {
173 args.numReplicas = number
174 }
175}
176
177func AutoCreateTopic(opt bool) SaramaClientOption {
178 return func(args *SaramaClient) {
179 args.autoCreateTopic = opt
180 }
181}
182
Abhilash S.L294ff522019-06-26 18:14:33 +0530183func MetadatMaxRetries(retry int) SaramaClientOption {
184 return func(args *SaramaClient) {
185 args.metadataMaxRetry = retry
186 }
187}
188
khenaidoo43c82122018-11-22 18:38:28 -0500189func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
190 client := &SaramaClient{
191 KafkaHost: DefaultKafkaHost,
192 KafkaPort: DefaultKafkaPort,
193 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500194 client.consumerType = DefaultConsumerType
195 client.producerFlushFrequency = DefaultProducerFlushFrequency
196 client.producerFlushMessages = DefaultProducerFlushMessages
197 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
198 client.producerReturnErrors = DefaultProducerReturnErrors
199 client.producerReturnSuccess = DefaultProducerReturnSuccess
200 client.producerRetryMax = DefaultProducerRetryMax
201 client.producerRetryBackOff = DefaultProducerRetryBackoff
202 client.consumerMaxwait = DefaultConsumerMaxwait
203 client.maxProcessingTime = DefaultMaxProcessingTime
204 client.numPartitions = DefaultNumberPartitions
205 client.numReplicas = DefaultNumberReplicas
206 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530207 client.metadataMaxRetry = DefaultMetadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500208
209 for _, option := range opts {
210 option(client)
211 }
212
khenaidooca301322019-01-09 23:06:32 -0500213 client.groupConsumers = make(map[string]*scc.Consumer)
214
khenaidoo43c82122018-11-22 18:38:28 -0500215 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500216 client.topicLockMap = make(map[string]*sync.RWMutex)
217 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500218 client.lockOfGroupConsumers = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500219 return client
220}
221
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500222func (sc *SaramaClient) Start() error {
223 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500224
225 // Create the Done channel
226 sc.doneCh = make(chan int, 1)
227
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500228 var err error
229
khenaidooacc718f2019-08-21 16:21:07 -0400230 // Add a cleanup in case of failure to startup
231 defer func() {
232 if err != nil {
233 sc.Stop()
234 }
235 }()
236
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500237 // Create the Cluster Admin
238 if err = sc.createClusterAdmin(); err != nil {
239 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
240 return err
241 }
242
khenaidoo43c82122018-11-22 18:38:28 -0500243 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500244 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500245 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
246 return err
247 }
248
khenaidooca301322019-01-09 23:06:32 -0500249 if sc.consumerType == DefaultConsumerType {
250 // Create the master consumers
251 if err := sc.createConsumer(); err != nil {
252 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
253 return err
254 }
khenaidoo43c82122018-11-22 18:38:28 -0500255 }
256
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500257 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500258 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
259
khenaidooca301322019-01-09 23:06:32 -0500260 log.Info("kafka-sarama-client-started")
261
khenaidoo43c82122018-11-22 18:38:28 -0500262 return nil
263}
264
265func (sc *SaramaClient) Stop() {
266 log.Info("stopping-sarama-client")
267
268 //Send a message over the done channel to close all long running routines
269 sc.doneCh <- 1
270
khenaidoo43c82122018-11-22 18:38:28 -0500271 if sc.producer != nil {
272 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500273 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500274 }
275 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500276
khenaidoo43c82122018-11-22 18:38:28 -0500277 if sc.consumer != nil {
278 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500279 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500280 }
281 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500282
khenaidooca301322019-01-09 23:06:32 -0500283 for key, val := range sc.groupConsumers {
284 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
285 if err := val.Close(); err != nil {
286 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500287 }
288 }
289
290 if sc.cAdmin != nil {
291 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500292 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500293 }
294 }
295
296 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500297 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500298
299 log.Info("sarama-client-stopped")
300}
301
khenaidooca301322019-01-09 23:06:32 -0500302//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
303// the invoking function must hold the lock
304func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500305 // Set the topic details
306 topicDetail := &sarama.TopicDetail{}
307 topicDetail.NumPartitions = int32(numPartition)
308 topicDetail.ReplicationFactor = int16(repFactor)
309 topicDetail.ConfigEntries = make(map[string]*string)
310 topicDetails := make(map[string]*sarama.TopicDetail)
311 topicDetails[topic.Name] = topicDetail
312
313 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
314 if err == sarama.ErrTopicAlreadyExists {
315 // Not an error
316 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
317 return nil
318 }
319 log.Errorw("create-topic-failure", log.Fields{"error": err})
320 return err
321 }
322 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
323 // do so.
324 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
325 return nil
326}
327
khenaidooca301322019-01-09 23:06:32 -0500328//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
329// ensure no two go routines are performing operations on the same topic
330func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
331 sc.lockTopic(topic)
332 defer sc.unLockTopic(topic)
333
334 return sc.createTopic(topic, numPartition, repFactor)
335}
336
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500337//DeleteTopic removes a topic from the kafka Broker
338func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500339 sc.lockTopic(topic)
340 defer sc.unLockTopic(topic)
341
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500342 // Remove the topic from the broker
343 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
344 if err == sarama.ErrUnknownTopicOrPartition {
345 // Not an error as does not exist
346 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
347 return nil
348 }
349 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
350 return err
351 }
352
353 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
354 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
355 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
356 return err
357 }
358 return nil
359}
360
361// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
362// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500363func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500364 sc.lockTopic(topic)
365 defer sc.unLockTopic(topic)
366
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500367 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
368
369 // If a consumers already exist for that topic then resuse it
370 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
371 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
372 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500373 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500374 sc.addChannelToConsumerChannelMap(topic, ch)
375 return ch, nil
376 }
377
378 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500379 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500380 var err error
381
382 // Use the consumerType option to figure out the type of consumer to launch
383 if sc.consumerType == PartitionConsumer {
384 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500385 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500386 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
387 return nil, err
388 }
389 }
khenaidoo731697e2019-01-29 16:03:29 -0500390 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500391 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
392 return nil, err
393 }
394 } else if sc.consumerType == GroupCustomer {
395 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
396 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500397 //if sc.autoCreateTopic {
398 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
399 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
400 // return nil, err
401 // }
402 //}
403 //groupId := sc.consumerGroupName
404 groupId := getGroupId(kvArgs...)
405 // Include the group prefix
406 if groupId != "" {
407 groupId = sc.consumerGroupPrefix + groupId
408 } else {
409 // Need to use a unique group Id per topic
410 groupId = sc.consumerGroupPrefix + topic.Name
411 }
khenaidoo731697e2019-01-29 16:03:29 -0500412 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500413 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500414 return nil, err
415 }
khenaidooca301322019-01-09 23:06:32 -0500416
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500417 } else {
418 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
419 return nil, errors.New("unknown-consumer-type")
420 }
421
422 return consumerListeningChannel, nil
423}
424
425//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500426func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500427 sc.lockTopic(topic)
428 defer sc.unLockTopic(topic)
429
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500430 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500431 var err error
432 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
433 log.Errorw("failed-removing-channel", log.Fields{"error": err})
434 }
435 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
436 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
437 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500438 return err
439}
440
441// send formats and sends the request onto the kafka messaging bus.
442func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
443
444 // Assert message is a proto message
445 var protoMsg proto.Message
446 var ok bool
447 // ascertain the value interface type is a proto.Message
448 if protoMsg, ok = msg.(proto.Message); !ok {
449 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
450 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
451 }
452
453 var marshalled []byte
454 var err error
455 // Create the Sarama producer message
456 if marshalled, err = proto.Marshal(protoMsg); err != nil {
457 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
458 return err
459 }
460 key := ""
461 if len(keys) > 0 {
462 key = keys[0] // Only the first key is relevant
463 }
464 kafkaMsg := &sarama.ProducerMessage{
465 Topic: topic.Name,
466 Key: sarama.StringEncoder(key),
467 Value: sarama.ByteEncoder(marshalled),
468 }
469
470 // Send message to kafka
471 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500472 // Wait for result
473 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
474 select {
475 case ok := <-sc.producer.Successes():
khenaidoo297cd252019-02-07 22:10:23 -0500476 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
khenaidoo90847922018-12-03 14:47:51 -0500477 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500478 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500479 return notOk
480 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500481 return nil
482}
483
khenaidooca301322019-01-09 23:06:32 -0500484// getGroupId returns the group id from the key-value args.
485func getGroupId(kvArgs ...*KVArg) string {
486 for _, arg := range kvArgs {
487 if arg.Key == GroupIdKey {
488 return arg.Value.(string)
489 }
490 }
491 return ""
492}
493
khenaidoo731697e2019-01-29 16:03:29 -0500494// getOffset returns the offset from the key-value args.
495func getOffset(kvArgs ...*KVArg) int64 {
496 for _, arg := range kvArgs {
497 if arg.Key == Offset {
498 return arg.Value.(int64)
499 }
500 }
501 return sarama.OffsetNewest
502}
503
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500504func (sc *SaramaClient) createClusterAdmin() error {
505 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
506 config := sarama.NewConfig()
507 config.Version = sarama.V1_0_0_0
508
509 // Create a cluster Admin
510 var cAdmin sarama.ClusterAdmin
511 var err error
512 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
513 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
514 return err
515 }
516 sc.cAdmin = cAdmin
517 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500518}
519
khenaidood2b6df92018-12-13 16:37:20 -0500520func (sc *SaramaClient) lockTopic(topic *Topic) {
521 sc.lockOfTopicLockMap.Lock()
522 if _, exist := sc.topicLockMap[topic.Name]; exist {
523 sc.lockOfTopicLockMap.Unlock()
524 sc.topicLockMap[topic.Name].Lock()
525 } else {
526 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
527 sc.lockOfTopicLockMap.Unlock()
528 sc.topicLockMap[topic.Name].Lock()
529 }
530}
531
532func (sc *SaramaClient) unLockTopic(topic *Topic) {
533 sc.lockOfTopicLockMap.Lock()
534 defer sc.lockOfTopicLockMap.Unlock()
535 if _, exist := sc.topicLockMap[topic.Name]; exist {
536 sc.topicLockMap[topic.Name].Unlock()
537 }
538}
539
khenaidoo43c82122018-11-22 18:38:28 -0500540func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
541 sc.lockTopicToConsumerChannelMap.Lock()
542 defer sc.lockTopicToConsumerChannelMap.Unlock()
543 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
544 sc.topicToConsumerChannelMap[id] = arg
545 }
546}
547
548func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
549 sc.lockTopicToConsumerChannelMap.Lock()
550 defer sc.lockTopicToConsumerChannelMap.Unlock()
551 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
552 delete(sc.topicToConsumerChannelMap, id)
553 }
554}
555
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500556func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400557 sc.lockTopicToConsumerChannelMap.RLock()
558 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500559
560 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
561 return consumerCh
562 }
563 return nil
564}
565
khenaidoo79232702018-12-04 11:00:41 -0500566func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500567 sc.lockTopicToConsumerChannelMap.Lock()
568 defer sc.lockTopicToConsumerChannelMap.Unlock()
569 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
570 consumerCh.channels = append(consumerCh.channels, ch)
571 return
572 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500573 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
574}
575
576//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
577func closeConsumers(consumers []interface{}) error {
578 var err error
579 for _, consumer := range consumers {
580 // Is it a partition consumers?
581 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
582 if errTemp := partionConsumer.Close(); errTemp != nil {
583 log.Debugw("partition!!!", log.Fields{"err": errTemp})
584 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
585 // This can occur on race condition
586 err = nil
587 } else {
588 err = errTemp
589 }
590 }
591 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
592 if errTemp := groupConsumer.Close(); errTemp != nil {
593 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
594 // This can occur on race condition
595 err = nil
596 } else {
597 err = errTemp
598 }
599 }
600 }
601 }
602 return err
khenaidoo43c82122018-11-22 18:38:28 -0500603}
604
khenaidoo79232702018-12-04 11:00:41 -0500605func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500606 sc.lockTopicToConsumerChannelMap.Lock()
607 defer sc.lockTopicToConsumerChannelMap.Unlock()
608 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
609 // Channel will be closed in the removeChannel method
610 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500611 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500612 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500613 log.Debugw("closing-consumers", log.Fields{"topic": topic})
614 err := closeConsumers(consumerCh.consumers)
615 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500616 delete(sc.topicToConsumerChannelMap, topic.Name)
617 return err
618 }
619 return nil
620 }
621 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
622 return errors.New("topic-does-not-exist")
623}
624
625func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
626 sc.lockTopicToConsumerChannelMap.Lock()
627 defer sc.lockTopicToConsumerChannelMap.Unlock()
628 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
629 for _, ch := range consumerCh.channels {
630 // Channel will be closed in the removeChannel method
631 removeChannel(consumerCh.channels, ch)
632 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500633 err := closeConsumers(consumerCh.consumers)
634 //if err == sarama.ErrUnknownTopicOrPartition {
635 // // Not an error
636 // err = nil
637 //}
638 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500639 delete(sc.topicToConsumerChannelMap, topic.Name)
640 return err
641 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500642 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
643 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500644}
645
646func (sc *SaramaClient) clearConsumerChannelMap() error {
647 sc.lockTopicToConsumerChannelMap.Lock()
648 defer sc.lockTopicToConsumerChannelMap.Unlock()
649 var err error
650 for topic, consumerCh := range sc.topicToConsumerChannelMap {
651 for _, ch := range consumerCh.channels {
652 // Channel will be closed in the removeChannel method
653 removeChannel(consumerCh.channels, ch)
654 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500655 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
656 err = errTemp
657 }
658 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500659 delete(sc.topicToConsumerChannelMap, topic)
660 }
661 return err
662}
663
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500664//createPublisher creates the publisher which is used to send a message onto kafka
665func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500666 // This Creates the publisher
667 config := sarama.NewConfig()
668 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500669 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
670 config.Producer.Flush.Messages = sc.producerFlushMessages
671 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
672 config.Producer.Return.Errors = sc.producerReturnErrors
673 config.Producer.Return.Successes = sc.producerReturnSuccess
674 //config.Producer.RequiredAcks = sarama.WaitForAll
675 config.Producer.RequiredAcks = sarama.WaitForLocal
676
khenaidoo43c82122018-11-22 18:38:28 -0500677 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
678 brokers := []string{kafkaFullAddr}
679
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500680 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
681 log.Errorw("error-starting-publisher", log.Fields{"error": err})
682 return err
683 } else {
684 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500685 }
686 log.Info("Kafka-publisher-created")
687 return nil
688}
689
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500690func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500691 config := sarama.NewConfig()
692 config.Consumer.Return.Errors = true
693 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500694 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
695 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500696 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530697 config.Metadata.Retry.Max = sc.metadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500698 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
699 brokers := []string{kafkaFullAddr}
700
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500701 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
702 log.Errorw("error-starting-consumers", log.Fields{"error": err})
703 return err
704 } else {
705 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500706 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500707 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500708 return nil
709}
710
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500711// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500712func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500713 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500714 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500715 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500716 //config.Consumer.Return.Errors = true
717 //config.Group.Return.Notifications = false
718 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
719 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500720 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500721 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500722 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
723 brokers := []string{kafkaFullAddr}
724
khenaidoo43c82122018-11-22 18:38:28 -0500725 topics := []string{topic.Name}
726 var consumer *scc.Consumer
727 var err error
728
khenaidooca301322019-01-09 23:06:32 -0500729 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
730 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500731 return nil, err
732 }
khenaidooca301322019-01-09 23:06:32 -0500733 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500734
735 //sc.groupConsumers[topic.Name] = consumer
736 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500737 return consumer, nil
738}
739
khenaidoo43c82122018-11-22 18:38:28 -0500740// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500741// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500742func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500743 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400744 sc.lockTopicToConsumerChannelMap.RLock()
745 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500746 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500747 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500748 c <- protoMessage
749 }(ch)
750 }
751}
752
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500753func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
754 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500755startloop:
756 for {
757 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500758 case err, ok := <-consumer.Errors():
759 if ok {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500760 log.Warnw("partition-consumers-error", log.Fields{"error": err})
761 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500762 // Channel is closed
763 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500764 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500765 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500766 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500767 if !ok {
768 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500769 break startloop
770 }
771 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500772 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500773 if err := proto.Unmarshal(msgBody, icm); err != nil {
774 log.Warnw("partition-invalid-message", log.Fields{"error": err})
775 continue
776 }
777 go sc.dispatchToConsumers(consumerChnls, icm)
778 case <-sc.doneCh:
779 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
780 break startloop
781 }
782 }
783 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
784}
785
786func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
787 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
788
789startloop:
790 for {
791 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500792 case err, ok := <-consumer.Errors():
793 if ok {
khenaidooca301322019-01-09 23:06:32 -0500794 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500795 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500796 // channel is closed
797 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500798 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500799 case msg, ok := <-consumer.Messages():
800 if !ok {
801 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500802 break startloop
803 }
khenaidoo297cd252019-02-07 22:10:23 -0500804 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500805 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500806 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500807 if err := proto.Unmarshal(msgBody, icm); err != nil {
808 log.Warnw("invalid-message", log.Fields{"error": err})
809 continue
810 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500811 go sc.dispatchToConsumers(consumerChnls, icm)
812 consumer.MarkOffset(msg, "")
813 case ntf := <-consumer.Notifications():
814 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500815 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500816 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500817 break startloop
818 }
819 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500820 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500821}
822
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500823func (sc *SaramaClient) startConsumers(topic *Topic) error {
824 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
825 var consumerCh *consumerChannels
826 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
827 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
828 return errors.New("consumers-not-exist")
829 }
830 // For each consumer listening for that topic, start a consumption loop
831 for _, consumer := range consumerCh.consumers {
832 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
833 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
834 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
835 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
836 } else {
837 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
838 return errors.New("invalid-consumer")
839 }
840 }
841 return nil
842}
843
844//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
845//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500846func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500847 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500848 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500849
khenaidoo7ff26c72019-01-16 14:55:48 -0500850 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500851 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500852 return nil, err
853 }
854
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500855 consumersIf := make([]interface{}, 0)
856 for _, pConsumer := range pConsumers {
857 consumersIf = append(consumersIf, pConsumer)
858 }
859
860 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500861 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500862 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500863 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500864 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500865 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500866 }
867
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500868 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500869 sc.addTopicToConsumerChannelMap(topic.Name, cc)
870
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500871 //Start a consumers to listen on that specific topic
872 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500873
874 return consumerListeningChannel, nil
875}
876
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500877// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
878// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -0500879func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500880 // TODO: Replace this development partition consumers with a group consumers
881 var pConsumer *scc.Consumer
882 var err error
khenaidoo731697e2019-01-29 16:03:29 -0500883 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500884 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
885 return nil, err
886 }
887 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
888 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500889 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500890 cc := &consumerChannels{
891 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500892 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500893 }
894
895 // Add the consumers channel to the map
896 sc.addTopicToConsumerChannelMap(topic.Name, cc)
897
898 //Start a consumers to listen on that specific topic
899 go sc.startConsumers(topic)
900
901 return consumerListeningChannel, nil
902}
903
khenaidoo7ff26c72019-01-16 14:55:48 -0500904func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500905 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500906 partitionList, err := sc.consumer.Partitions(topic.Name)
907 if err != nil {
908 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
909 return nil, err
910 }
911
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500912 pConsumers := make([]sarama.PartitionConsumer, 0)
913 for _, partition := range partitionList {
914 var pConsumer sarama.PartitionConsumer
915 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
916 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
917 return nil, err
918 }
919 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500920 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500921 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500922}
923
khenaidoo79232702018-12-04 11:00:41 -0500924func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500925 var i int
khenaidoo79232702018-12-04 11:00:41 -0500926 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500927 for i, channel = range channels {
928 if channel == ch {
929 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
930 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500931 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -0500932 return channels[:len(channels)-1]
933 }
934 }
935 return channels
936}
khenaidoo7ff26c72019-01-16 14:55:48 -0500937
khenaidoo7ff26c72019-01-16 14:55:48 -0500938func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
939 sc.lockOfGroupConsumers.Lock()
940 defer sc.lockOfGroupConsumers.Unlock()
941 if _, exist := sc.groupConsumers[topic]; !exist {
942 sc.groupConsumers[topic] = consumer
943 }
944}
945
946func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
947 sc.lockOfGroupConsumers.Lock()
948 defer sc.lockOfGroupConsumers.Unlock()
949 if _, exist := sc.groupConsumers[topic]; exist {
950 consumer := sc.groupConsumers[topic]
951 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -0400952 if err := consumer.Close(); err != nil {
khenaidoo7ff26c72019-01-16 14:55:48 -0500953 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
954 return err
955 }
956 }
957 return nil
958}