blob: 8037002e6df7d60b018c7921b9905b0dca95212c [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
Abhilash S.L294ff522019-06-26 18:14:33 +053021 "strings"
22 "sync"
23 "time"
24
khenaidoo43c82122018-11-22 18:38:28 -050025 scc "github.com/bsm/sarama-cluster"
26 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050027 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050028 "github.com/opencord/voltha-go/common/log"
William Kurkiandaa6bb22019-03-07 12:26:28 -050029 ic "github.com/opencord/voltha-protos/go/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050030 "gopkg.in/Shopify/sarama.v1"
khenaidoo43c82122018-11-22 18:38:28 -050031)
32
khenaidoo4c1a5bf2018-11-29 15:53:42 -050033func init() {
khenaidooca301322019-01-09 23:06:32 -050034 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050035}
36
37type returnErrorFunction func() error
38
39// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
40// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
41//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050042type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050043 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050044 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050045}
46
47// SaramaClient represents the messaging proxy
48type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050049 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050050 client sarama.Client
51 KafkaHost string
52 KafkaPort int
53 producer sarama.AsyncProducer
54 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050055 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040056 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050057 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050058 consumerType int
khenaidooca301322019-01-09 23:06:32 -050059 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050060 producerFlushFrequency int
61 producerFlushMessages int
62 producerFlushMaxmessages int
63 producerRetryMax int
64 producerRetryBackOff time.Duration
65 producerReturnSuccess bool
66 producerReturnErrors bool
67 consumerMaxwait int
68 maxProcessingTime int
69 numPartitions int
70 numReplicas int
71 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050072 doneCh chan int
73 topicToConsumerChannelMap map[string]*consumerChannels
74 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050075 topicLockMap map[string]*sync.RWMutex
76 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053077 metadataMaxRetry int
khenaidoo43c82122018-11-22 18:38:28 -050078}
79
80type SaramaClientOption func(*SaramaClient)
81
82func Host(host string) SaramaClientOption {
83 return func(args *SaramaClient) {
84 args.KafkaHost = host
85 }
86}
87
88func Port(port int) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaPort = port
91 }
92}
93
khenaidooca301322019-01-09 23:06:32 -050094func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
khenaidoo90847922018-12-03 14:47:51 -0500130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
khenaidoo90847922018-12-03 14:47:51 -0500148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
Abhilash S.L294ff522019-06-26 18:14:33 +0530184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
khenaidoo43c82122018-11-22 18:38:28 -0500190func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
191 client := &SaramaClient{
192 KafkaHost: DefaultKafkaHost,
193 KafkaPort: DefaultKafkaPort,
194 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500195 client.consumerType = DefaultConsumerType
196 client.producerFlushFrequency = DefaultProducerFlushFrequency
197 client.producerFlushMessages = DefaultProducerFlushMessages
198 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
199 client.producerReturnErrors = DefaultProducerReturnErrors
200 client.producerReturnSuccess = DefaultProducerReturnSuccess
201 client.producerRetryMax = DefaultProducerRetryMax
202 client.producerRetryBackOff = DefaultProducerRetryBackoff
203 client.consumerMaxwait = DefaultConsumerMaxwait
204 client.maxProcessingTime = DefaultMaxProcessingTime
205 client.numPartitions = DefaultNumberPartitions
206 client.numReplicas = DefaultNumberReplicas
207 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530208 client.metadataMaxRetry = DefaultMetadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500209
210 for _, option := range opts {
211 option(client)
212 }
213
khenaidooca301322019-01-09 23:06:32 -0500214 client.groupConsumers = make(map[string]*scc.Consumer)
215
khenaidoo43c82122018-11-22 18:38:28 -0500216 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500217 client.topicLockMap = make(map[string]*sync.RWMutex)
218 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500219 client.lockOfGroupConsumers = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500220 return client
221}
222
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500223func (sc *SaramaClient) Start() error {
224 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500225
226 // Create the Done channel
227 sc.doneCh = make(chan int, 1)
228
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500229 var err error
230
khenaidoob3244212019-08-27 14:32:27 -0400231 // Add a cleanup in case of failure to startup
232 defer func() {
233 if err != nil {
234 sc.Stop()
235 }
236 }()
237
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500238 // Create the Cluster Admin
239 if err = sc.createClusterAdmin(); err != nil {
240 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
241 return err
242 }
243
khenaidoo43c82122018-11-22 18:38:28 -0500244 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500245 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500246 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
247 return err
248 }
249
khenaidooca301322019-01-09 23:06:32 -0500250 if sc.consumerType == DefaultConsumerType {
251 // Create the master consumers
252 if err := sc.createConsumer(); err != nil {
253 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
254 return err
255 }
khenaidoo43c82122018-11-22 18:38:28 -0500256 }
257
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500258 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500259 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
260
khenaidooca301322019-01-09 23:06:32 -0500261 log.Info("kafka-sarama-client-started")
262
khenaidoo43c82122018-11-22 18:38:28 -0500263 return nil
264}
265
266func (sc *SaramaClient) Stop() {
267 log.Info("stopping-sarama-client")
268
269 //Send a message over the done channel to close all long running routines
270 sc.doneCh <- 1
271
khenaidoo43c82122018-11-22 18:38:28 -0500272 if sc.producer != nil {
273 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500274 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500275 }
276 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500277
khenaidoo43c82122018-11-22 18:38:28 -0500278 if sc.consumer != nil {
279 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500280 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500281 }
282 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500283
khenaidooca301322019-01-09 23:06:32 -0500284 for key, val := range sc.groupConsumers {
285 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
286 if err := val.Close(); err != nil {
287 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500288 }
289 }
290
291 if sc.cAdmin != nil {
292 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500293 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500294 }
295 }
296
297 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500298 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500299
300 log.Info("sarama-client-stopped")
301}
302
khenaidooca301322019-01-09 23:06:32 -0500303//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
304// the invoking function must hold the lock
305func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500306 // Set the topic details
307 topicDetail := &sarama.TopicDetail{}
308 topicDetail.NumPartitions = int32(numPartition)
309 topicDetail.ReplicationFactor = int16(repFactor)
310 topicDetail.ConfigEntries = make(map[string]*string)
311 topicDetails := make(map[string]*sarama.TopicDetail)
312 topicDetails[topic.Name] = topicDetail
313
314 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
315 if err == sarama.ErrTopicAlreadyExists {
316 // Not an error
317 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
318 return nil
319 }
320 log.Errorw("create-topic-failure", log.Fields{"error": err})
321 return err
322 }
323 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
324 // do so.
325 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
326 return nil
327}
328
khenaidooca301322019-01-09 23:06:32 -0500329//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
330// ensure no two go routines are performing operations on the same topic
331func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
332 sc.lockTopic(topic)
333 defer sc.unLockTopic(topic)
334
335 return sc.createTopic(topic, numPartition, repFactor)
336}
337
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500338//DeleteTopic removes a topic from the kafka Broker
339func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500340 sc.lockTopic(topic)
341 defer sc.unLockTopic(topic)
342
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500343 // Remove the topic from the broker
344 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
345 if err == sarama.ErrUnknownTopicOrPartition {
346 // Not an error as does not exist
347 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
348 return nil
349 }
350 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
351 return err
352 }
353
354 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
355 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
356 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
357 return err
358 }
359 return nil
360}
361
362// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
363// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500364func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500365 sc.lockTopic(topic)
366 defer sc.unLockTopic(topic)
367
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500368 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
369
370 // If a consumers already exist for that topic then resuse it
371 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
372 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
373 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500374 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500375 sc.addChannelToConsumerChannelMap(topic, ch)
376 return ch, nil
377 }
378
379 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500380 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500381 var err error
382
383 // Use the consumerType option to figure out the type of consumer to launch
384 if sc.consumerType == PartitionConsumer {
385 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500386 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500387 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
388 return nil, err
389 }
390 }
khenaidoo731697e2019-01-29 16:03:29 -0500391 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500392 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
393 return nil, err
394 }
395 } else if sc.consumerType == GroupCustomer {
396 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
397 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500398 //if sc.autoCreateTopic {
399 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
400 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
401 // return nil, err
402 // }
403 //}
404 //groupId := sc.consumerGroupName
405 groupId := getGroupId(kvArgs...)
406 // Include the group prefix
407 if groupId != "" {
408 groupId = sc.consumerGroupPrefix + groupId
409 } else {
410 // Need to use a unique group Id per topic
411 groupId = sc.consumerGroupPrefix + topic.Name
412 }
khenaidoo731697e2019-01-29 16:03:29 -0500413 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500414 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500415 return nil, err
416 }
khenaidooca301322019-01-09 23:06:32 -0500417
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500418 } else {
419 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
420 return nil, errors.New("unknown-consumer-type")
421 }
422
423 return consumerListeningChannel, nil
424}
425
426//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500427func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500428 sc.lockTopic(topic)
429 defer sc.unLockTopic(topic)
430
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500431 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500432 var err error
433 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
434 log.Errorw("failed-removing-channel", log.Fields{"error": err})
435 }
436 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
437 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
438 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500439 return err
440}
441
442// send formats and sends the request onto the kafka messaging bus.
443func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
444
445 // Assert message is a proto message
446 var protoMsg proto.Message
447 var ok bool
448 // ascertain the value interface type is a proto.Message
449 if protoMsg, ok = msg.(proto.Message); !ok {
450 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
451 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
452 }
453
454 var marshalled []byte
455 var err error
456 // Create the Sarama producer message
457 if marshalled, err = proto.Marshal(protoMsg); err != nil {
458 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
459 return err
460 }
461 key := ""
462 if len(keys) > 0 {
463 key = keys[0] // Only the first key is relevant
464 }
465 kafkaMsg := &sarama.ProducerMessage{
466 Topic: topic.Name,
467 Key: sarama.StringEncoder(key),
468 Value: sarama.ByteEncoder(marshalled),
469 }
470
471 // Send message to kafka
472 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500473 // Wait for result
474 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
475 select {
476 case ok := <-sc.producer.Successes():
khenaidoo297cd252019-02-07 22:10:23 -0500477 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
khenaidoo90847922018-12-03 14:47:51 -0500478 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500479 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500480 return notOk
481 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500482 return nil
483}
484
khenaidooca301322019-01-09 23:06:32 -0500485// getGroupId returns the group id from the key-value args.
486func getGroupId(kvArgs ...*KVArg) string {
487 for _, arg := range kvArgs {
488 if arg.Key == GroupIdKey {
489 return arg.Value.(string)
490 }
491 }
492 return ""
493}
494
khenaidoo731697e2019-01-29 16:03:29 -0500495// getOffset returns the offset from the key-value args.
496func getOffset(kvArgs ...*KVArg) int64 {
497 for _, arg := range kvArgs {
498 if arg.Key == Offset {
499 return arg.Value.(int64)
500 }
501 }
502 return sarama.OffsetNewest
503}
504
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500505func (sc *SaramaClient) createClusterAdmin() error {
506 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
507 config := sarama.NewConfig()
508 config.Version = sarama.V1_0_0_0
509
510 // Create a cluster Admin
511 var cAdmin sarama.ClusterAdmin
512 var err error
513 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
514 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
515 return err
516 }
517 sc.cAdmin = cAdmin
518 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500519}
520
khenaidood2b6df92018-12-13 16:37:20 -0500521func (sc *SaramaClient) lockTopic(topic *Topic) {
522 sc.lockOfTopicLockMap.Lock()
523 if _, exist := sc.topicLockMap[topic.Name]; exist {
524 sc.lockOfTopicLockMap.Unlock()
525 sc.topicLockMap[topic.Name].Lock()
526 } else {
527 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
528 sc.lockOfTopicLockMap.Unlock()
529 sc.topicLockMap[topic.Name].Lock()
530 }
531}
532
533func (sc *SaramaClient) unLockTopic(topic *Topic) {
534 sc.lockOfTopicLockMap.Lock()
535 defer sc.lockOfTopicLockMap.Unlock()
536 if _, exist := sc.topicLockMap[topic.Name]; exist {
537 sc.topicLockMap[topic.Name].Unlock()
538 }
539}
540
khenaidoo43c82122018-11-22 18:38:28 -0500541func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
542 sc.lockTopicToConsumerChannelMap.Lock()
543 defer sc.lockTopicToConsumerChannelMap.Unlock()
544 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
545 sc.topicToConsumerChannelMap[id] = arg
546 }
547}
548
549func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
550 sc.lockTopicToConsumerChannelMap.Lock()
551 defer sc.lockTopicToConsumerChannelMap.Unlock()
552 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
553 delete(sc.topicToConsumerChannelMap, id)
554 }
555}
556
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500557func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400558 sc.lockTopicToConsumerChannelMap.RLock()
559 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500560
561 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
562 return consumerCh
563 }
564 return nil
565}
566
khenaidoo79232702018-12-04 11:00:41 -0500567func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500568 sc.lockTopicToConsumerChannelMap.Lock()
569 defer sc.lockTopicToConsumerChannelMap.Unlock()
570 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
571 consumerCh.channels = append(consumerCh.channels, ch)
572 return
573 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500574 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
575}
576
577//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
578func closeConsumers(consumers []interface{}) error {
579 var err error
580 for _, consumer := range consumers {
581 // Is it a partition consumers?
582 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
583 if errTemp := partionConsumer.Close(); errTemp != nil {
584 log.Debugw("partition!!!", log.Fields{"err": errTemp})
585 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
586 // This can occur on race condition
587 err = nil
588 } else {
589 err = errTemp
590 }
591 }
592 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
593 if errTemp := groupConsumer.Close(); errTemp != nil {
594 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
595 // This can occur on race condition
596 err = nil
597 } else {
598 err = errTemp
599 }
600 }
601 }
602 }
603 return err
khenaidoo43c82122018-11-22 18:38:28 -0500604}
605
khenaidoo79232702018-12-04 11:00:41 -0500606func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500607 sc.lockTopicToConsumerChannelMap.Lock()
608 defer sc.lockTopicToConsumerChannelMap.Unlock()
609 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
610 // Channel will be closed in the removeChannel method
611 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500612 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500613 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500614 log.Debugw("closing-consumers", log.Fields{"topic": topic})
615 err := closeConsumers(consumerCh.consumers)
616 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500617 delete(sc.topicToConsumerChannelMap, topic.Name)
618 return err
619 }
620 return nil
621 }
622 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
623 return errors.New("topic-does-not-exist")
624}
625
626func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
627 sc.lockTopicToConsumerChannelMap.Lock()
628 defer sc.lockTopicToConsumerChannelMap.Unlock()
629 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
630 for _, ch := range consumerCh.channels {
631 // Channel will be closed in the removeChannel method
632 removeChannel(consumerCh.channels, ch)
633 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500634 err := closeConsumers(consumerCh.consumers)
635 //if err == sarama.ErrUnknownTopicOrPartition {
636 // // Not an error
637 // err = nil
638 //}
639 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500640 delete(sc.topicToConsumerChannelMap, topic.Name)
641 return err
642 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500643 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
644 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500645}
646
647func (sc *SaramaClient) clearConsumerChannelMap() error {
648 sc.lockTopicToConsumerChannelMap.Lock()
649 defer sc.lockTopicToConsumerChannelMap.Unlock()
650 var err error
651 for topic, consumerCh := range sc.topicToConsumerChannelMap {
652 for _, ch := range consumerCh.channels {
653 // Channel will be closed in the removeChannel method
654 removeChannel(consumerCh.channels, ch)
655 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500656 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
657 err = errTemp
658 }
659 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500660 delete(sc.topicToConsumerChannelMap, topic)
661 }
662 return err
663}
664
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500665//createPublisher creates the publisher which is used to send a message onto kafka
666func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500667 // This Creates the publisher
668 config := sarama.NewConfig()
669 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500670 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
671 config.Producer.Flush.Messages = sc.producerFlushMessages
672 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
673 config.Producer.Return.Errors = sc.producerReturnErrors
674 config.Producer.Return.Successes = sc.producerReturnSuccess
675 //config.Producer.RequiredAcks = sarama.WaitForAll
676 config.Producer.RequiredAcks = sarama.WaitForLocal
677
khenaidoo43c82122018-11-22 18:38:28 -0500678 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
679 brokers := []string{kafkaFullAddr}
680
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500681 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
682 log.Errorw("error-starting-publisher", log.Fields{"error": err})
683 return err
684 } else {
685 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500686 }
687 log.Info("Kafka-publisher-created")
688 return nil
689}
690
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500691func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500692 config := sarama.NewConfig()
693 config.Consumer.Return.Errors = true
694 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500695 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
696 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500697 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530698 config.Metadata.Retry.Max = sc.metadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500699 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
700 brokers := []string{kafkaFullAddr}
701
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500702 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
703 log.Errorw("error-starting-consumers", log.Fields{"error": err})
704 return err
705 } else {
706 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500707 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500708 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500709 return nil
710}
711
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500712// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500713func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500714 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500715 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500716 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500717 //config.Consumer.Return.Errors = true
718 //config.Group.Return.Notifications = false
719 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
720 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500721 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500722 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500723 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
724 brokers := []string{kafkaFullAddr}
725
khenaidoo43c82122018-11-22 18:38:28 -0500726 topics := []string{topic.Name}
727 var consumer *scc.Consumer
728 var err error
729
khenaidooca301322019-01-09 23:06:32 -0500730 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
731 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500732 return nil, err
733 }
khenaidooca301322019-01-09 23:06:32 -0500734 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500735
736 //sc.groupConsumers[topic.Name] = consumer
737 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500738 return consumer, nil
739}
740
khenaidoo43c82122018-11-22 18:38:28 -0500741// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500742// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500743func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500744 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400745 sc.lockTopicToConsumerChannelMap.RLock()
746 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500747 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500748 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500749 c <- protoMessage
750 }(ch)
751 }
752}
753
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500754func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
755 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500756startloop:
757 for {
758 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500759 case err, ok := <-consumer.Errors():
760 if ok {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500761 log.Warnw("partition-consumers-error", log.Fields{"error": err})
762 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500763 // Channel is closed
764 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500765 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500766 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500767 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500768 if !ok {
769 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500770 break startloop
771 }
772 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500773 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500774 if err := proto.Unmarshal(msgBody, icm); err != nil {
775 log.Warnw("partition-invalid-message", log.Fields{"error": err})
776 continue
777 }
778 go sc.dispatchToConsumers(consumerChnls, icm)
779 case <-sc.doneCh:
780 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
781 break startloop
782 }
783 }
784 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
785}
786
787func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
788 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
789
790startloop:
791 for {
792 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500793 case err, ok := <-consumer.Errors():
794 if ok {
khenaidooca301322019-01-09 23:06:32 -0500795 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500796 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500797 // channel is closed
798 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500799 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500800 case msg, ok := <-consumer.Messages():
801 if !ok {
802 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500803 break startloop
804 }
khenaidoo297cd252019-02-07 22:10:23 -0500805 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500806 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500807 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500808 if err := proto.Unmarshal(msgBody, icm); err != nil {
809 log.Warnw("invalid-message", log.Fields{"error": err})
810 continue
811 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500812 go sc.dispatchToConsumers(consumerChnls, icm)
813 consumer.MarkOffset(msg, "")
814 case ntf := <-consumer.Notifications():
815 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500816 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500817 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500818 break startloop
819 }
820 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500821 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500822}
823
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500824func (sc *SaramaClient) startConsumers(topic *Topic) error {
825 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
826 var consumerCh *consumerChannels
827 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
828 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
829 return errors.New("consumers-not-exist")
830 }
831 // For each consumer listening for that topic, start a consumption loop
832 for _, consumer := range consumerCh.consumers {
833 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
834 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
835 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
836 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
837 } else {
838 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
839 return errors.New("invalid-consumer")
840 }
841 }
842 return nil
843}
844
845//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
846//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500847func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500848 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500849 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500850
khenaidoo7ff26c72019-01-16 14:55:48 -0500851 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500852 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500853 return nil, err
854 }
855
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500856 consumersIf := make([]interface{}, 0)
857 for _, pConsumer := range pConsumers {
858 consumersIf = append(consumersIf, pConsumer)
859 }
860
861 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500862 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500863 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500864 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500865 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500866 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500867 }
868
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500869 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500870 sc.addTopicToConsumerChannelMap(topic.Name, cc)
871
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500872 //Start a consumers to listen on that specific topic
873 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500874
875 return consumerListeningChannel, nil
876}
877
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500878// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
879// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -0500880func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500881 // TODO: Replace this development partition consumers with a group consumers
882 var pConsumer *scc.Consumer
883 var err error
khenaidoo731697e2019-01-29 16:03:29 -0500884 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500885 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
886 return nil, err
887 }
888 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
889 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500890 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500891 cc := &consumerChannels{
892 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500893 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500894 }
895
896 // Add the consumers channel to the map
897 sc.addTopicToConsumerChannelMap(topic.Name, cc)
898
899 //Start a consumers to listen on that specific topic
900 go sc.startConsumers(topic)
901
902 return consumerListeningChannel, nil
903}
904
khenaidoo7ff26c72019-01-16 14:55:48 -0500905func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500906 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500907 partitionList, err := sc.consumer.Partitions(topic.Name)
908 if err != nil {
909 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
910 return nil, err
911 }
912
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500913 pConsumers := make([]sarama.PartitionConsumer, 0)
914 for _, partition := range partitionList {
915 var pConsumer sarama.PartitionConsumer
916 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
917 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
918 return nil, err
919 }
920 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500921 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500922 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500923}
924
khenaidoo79232702018-12-04 11:00:41 -0500925func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500926 var i int
khenaidoo79232702018-12-04 11:00:41 -0500927 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500928 for i, channel = range channels {
929 if channel == ch {
930 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
931 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500932 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -0500933 return channels[:len(channels)-1]
934 }
935 }
936 return channels
937}
khenaidoo7ff26c72019-01-16 14:55:48 -0500938
khenaidoo7ff26c72019-01-16 14:55:48 -0500939func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
940 sc.lockOfGroupConsumers.Lock()
941 defer sc.lockOfGroupConsumers.Unlock()
942 if _, exist := sc.groupConsumers[topic]; !exist {
943 sc.groupConsumers[topic] = consumer
944 }
945}
946
947func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
948 sc.lockOfGroupConsumers.Lock()
949 defer sc.lockOfGroupConsumers.Unlock()
950 if _, exist := sc.groupConsumers[topic]; exist {
951 consumer := sc.groupConsumers[topic]
952 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -0400953 if err := consumer.Close(); err != nil {
khenaidoo7ff26c72019-01-16 14:55:48 -0500954 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
955 return err
956 }
957 }
958 return nil
959}