blob: 0576da961d7004bfb60c42e643d29a9f309103bd [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
Abhilash S.L294ff522019-06-26 18:14:33 +053021 "strings"
22 "sync"
23 "time"
24
khenaidoo43c82122018-11-22 18:38:28 -050025 scc "github.com/bsm/sarama-cluster"
26 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050027 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050028 "github.com/opencord/voltha-go/common/log"
William Kurkiandaa6bb22019-03-07 12:26:28 -050029 ic "github.com/opencord/voltha-protos/go/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050030 "gopkg.in/Shopify/sarama.v1"
khenaidoo43c82122018-11-22 18:38:28 -050031)
32
khenaidoo4c1a5bf2018-11-29 15:53:42 -050033func init() {
khenaidooca301322019-01-09 23:06:32 -050034 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050035}
36
37type returnErrorFunction func() error
38
39// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
40// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
41//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050042type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050043 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050044 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050045}
46
47// SaramaClient represents the messaging proxy
48type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050049 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050050 client sarama.Client
51 KafkaHost string
52 KafkaPort int
53 producer sarama.AsyncProducer
54 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050055 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040056 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050057 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050058 consumerType int
khenaidooca301322019-01-09 23:06:32 -050059 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050060 producerFlushFrequency int
61 producerFlushMessages int
62 producerFlushMaxmessages int
63 producerRetryMax int
64 producerRetryBackOff time.Duration
65 producerReturnSuccess bool
66 producerReturnErrors bool
67 consumerMaxwait int
68 maxProcessingTime int
69 numPartitions int
70 numReplicas int
71 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050072 doneCh chan int
73 topicToConsumerChannelMap map[string]*consumerChannels
74 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050075 topicLockMap map[string]*sync.RWMutex
76 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053077 metadataMaxRetry int
khenaidoo43c82122018-11-22 18:38:28 -050078}
79
80type SaramaClientOption func(*SaramaClient)
81
82func Host(host string) SaramaClientOption {
83 return func(args *SaramaClient) {
84 args.KafkaHost = host
85 }
86}
87
88func Port(port int) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaPort = port
91 }
92}
93
khenaidooca301322019-01-09 23:06:32 -050094func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
khenaidoo90847922018-12-03 14:47:51 -0500130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
khenaidoo90847922018-12-03 14:47:51 -0500148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
Abhilash S.L294ff522019-06-26 18:14:33 +0530184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
khenaidoo43c82122018-11-22 18:38:28 -0500190func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
191 client := &SaramaClient{
192 KafkaHost: DefaultKafkaHost,
193 KafkaPort: DefaultKafkaPort,
194 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500195 client.consumerType = DefaultConsumerType
196 client.producerFlushFrequency = DefaultProducerFlushFrequency
197 client.producerFlushMessages = DefaultProducerFlushMessages
198 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
199 client.producerReturnErrors = DefaultProducerReturnErrors
200 client.producerReturnSuccess = DefaultProducerReturnSuccess
201 client.producerRetryMax = DefaultProducerRetryMax
202 client.producerRetryBackOff = DefaultProducerRetryBackoff
203 client.consumerMaxwait = DefaultConsumerMaxwait
204 client.maxProcessingTime = DefaultMaxProcessingTime
205 client.numPartitions = DefaultNumberPartitions
206 client.numReplicas = DefaultNumberReplicas
207 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530208 client.metadataMaxRetry = DefaultMetadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500209
210 for _, option := range opts {
211 option(client)
212 }
213
khenaidooca301322019-01-09 23:06:32 -0500214 client.groupConsumers = make(map[string]*scc.Consumer)
215
khenaidoo43c82122018-11-22 18:38:28 -0500216 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500217 client.topicLockMap = make(map[string]*sync.RWMutex)
218 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500219 client.lockOfGroupConsumers = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500220 return client
221}
222
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500223func (sc *SaramaClient) Start() error {
224 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500225
226 // Create the Done channel
227 sc.doneCh = make(chan int, 1)
228
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500229 var err error
230
231 // Create the Cluster Admin
232 if err = sc.createClusterAdmin(); err != nil {
233 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
234 return err
235 }
236
khenaidoo43c82122018-11-22 18:38:28 -0500237 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500238 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500239 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
240 return err
241 }
242
khenaidooca301322019-01-09 23:06:32 -0500243 if sc.consumerType == DefaultConsumerType {
244 // Create the master consumers
245 if err := sc.createConsumer(); err != nil {
246 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
247 return err
248 }
khenaidoo43c82122018-11-22 18:38:28 -0500249 }
250
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500251 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500252 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
253
khenaidooca301322019-01-09 23:06:32 -0500254 log.Info("kafka-sarama-client-started")
255
khenaidoo43c82122018-11-22 18:38:28 -0500256 return nil
257}
258
259func (sc *SaramaClient) Stop() {
260 log.Info("stopping-sarama-client")
261
262 //Send a message over the done channel to close all long running routines
263 sc.doneCh <- 1
264
khenaidoo43c82122018-11-22 18:38:28 -0500265 if sc.producer != nil {
266 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500267 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500268 }
269 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500270
khenaidoo43c82122018-11-22 18:38:28 -0500271 if sc.consumer != nil {
272 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500273 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500274 }
275 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500276
khenaidooca301322019-01-09 23:06:32 -0500277 for key, val := range sc.groupConsumers {
278 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
279 if err := val.Close(); err != nil {
280 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500281 }
282 }
283
284 if sc.cAdmin != nil {
285 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500286 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500287 }
288 }
289
290 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500291 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500292
293 log.Info("sarama-client-stopped")
294}
295
khenaidooca301322019-01-09 23:06:32 -0500296//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
297// the invoking function must hold the lock
298func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500299 // Set the topic details
300 topicDetail := &sarama.TopicDetail{}
301 topicDetail.NumPartitions = int32(numPartition)
302 topicDetail.ReplicationFactor = int16(repFactor)
303 topicDetail.ConfigEntries = make(map[string]*string)
304 topicDetails := make(map[string]*sarama.TopicDetail)
305 topicDetails[topic.Name] = topicDetail
306
307 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
308 if err == sarama.ErrTopicAlreadyExists {
309 // Not an error
310 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
311 return nil
312 }
313 log.Errorw("create-topic-failure", log.Fields{"error": err})
314 return err
315 }
316 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
317 // do so.
318 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
319 return nil
320}
321
khenaidooca301322019-01-09 23:06:32 -0500322//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
323// ensure no two go routines are performing operations on the same topic
324func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
325 sc.lockTopic(topic)
326 defer sc.unLockTopic(topic)
327
328 return sc.createTopic(topic, numPartition, repFactor)
329}
330
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500331//DeleteTopic removes a topic from the kafka Broker
332func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500333 sc.lockTopic(topic)
334 defer sc.unLockTopic(topic)
335
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500336 // Remove the topic from the broker
337 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
338 if err == sarama.ErrUnknownTopicOrPartition {
339 // Not an error as does not exist
340 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
341 return nil
342 }
343 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
344 return err
345 }
346
347 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
348 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
349 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
350 return err
351 }
352 return nil
353}
354
355// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
356// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500357func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500358 sc.lockTopic(topic)
359 defer sc.unLockTopic(topic)
360
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500361 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
362
363 // If a consumers already exist for that topic then resuse it
364 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
365 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
366 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500367 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500368 sc.addChannelToConsumerChannelMap(topic, ch)
369 return ch, nil
370 }
371
372 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500373 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500374 var err error
375
376 // Use the consumerType option to figure out the type of consumer to launch
377 if sc.consumerType == PartitionConsumer {
378 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500379 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500380 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
381 return nil, err
382 }
383 }
khenaidoo731697e2019-01-29 16:03:29 -0500384 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500385 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
386 return nil, err
387 }
388 } else if sc.consumerType == GroupCustomer {
389 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
390 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500391 //if sc.autoCreateTopic {
392 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
393 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
394 // return nil, err
395 // }
396 //}
397 //groupId := sc.consumerGroupName
398 groupId := getGroupId(kvArgs...)
399 // Include the group prefix
400 if groupId != "" {
401 groupId = sc.consumerGroupPrefix + groupId
402 } else {
403 // Need to use a unique group Id per topic
404 groupId = sc.consumerGroupPrefix + topic.Name
405 }
khenaidoo731697e2019-01-29 16:03:29 -0500406 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500407 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500408 return nil, err
409 }
khenaidooca301322019-01-09 23:06:32 -0500410
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500411 } else {
412 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
413 return nil, errors.New("unknown-consumer-type")
414 }
415
416 return consumerListeningChannel, nil
417}
418
419//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500420func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500421 sc.lockTopic(topic)
422 defer sc.unLockTopic(topic)
423
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500424 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500425 var err error
426 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
427 log.Errorw("failed-removing-channel", log.Fields{"error": err})
428 }
429 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
430 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
431 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500432 return err
433}
434
435// send formats and sends the request onto the kafka messaging bus.
436func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
437
438 // Assert message is a proto message
439 var protoMsg proto.Message
440 var ok bool
441 // ascertain the value interface type is a proto.Message
442 if protoMsg, ok = msg.(proto.Message); !ok {
443 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
444 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
445 }
446
447 var marshalled []byte
448 var err error
449 // Create the Sarama producer message
450 if marshalled, err = proto.Marshal(protoMsg); err != nil {
451 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
452 return err
453 }
454 key := ""
455 if len(keys) > 0 {
456 key = keys[0] // Only the first key is relevant
457 }
458 kafkaMsg := &sarama.ProducerMessage{
459 Topic: topic.Name,
460 Key: sarama.StringEncoder(key),
461 Value: sarama.ByteEncoder(marshalled),
462 }
463
464 // Send message to kafka
465 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500466 // Wait for result
467 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
468 select {
469 case ok := <-sc.producer.Successes():
khenaidoo297cd252019-02-07 22:10:23 -0500470 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
khenaidoo90847922018-12-03 14:47:51 -0500471 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500472 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500473 return notOk
474 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500475 return nil
476}
477
khenaidooca301322019-01-09 23:06:32 -0500478// getGroupId returns the group id from the key-value args.
479func getGroupId(kvArgs ...*KVArg) string {
480 for _, arg := range kvArgs {
481 if arg.Key == GroupIdKey {
482 return arg.Value.(string)
483 }
484 }
485 return ""
486}
487
khenaidoo731697e2019-01-29 16:03:29 -0500488// getOffset returns the offset from the key-value args.
489func getOffset(kvArgs ...*KVArg) int64 {
490 for _, arg := range kvArgs {
491 if arg.Key == Offset {
492 return arg.Value.(int64)
493 }
494 }
495 return sarama.OffsetNewest
496}
497
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500498func (sc *SaramaClient) createClusterAdmin() error {
499 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
500 config := sarama.NewConfig()
501 config.Version = sarama.V1_0_0_0
502
503 // Create a cluster Admin
504 var cAdmin sarama.ClusterAdmin
505 var err error
506 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
507 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
508 return err
509 }
510 sc.cAdmin = cAdmin
511 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500512}
513
khenaidood2b6df92018-12-13 16:37:20 -0500514func (sc *SaramaClient) lockTopic(topic *Topic) {
515 sc.lockOfTopicLockMap.Lock()
516 if _, exist := sc.topicLockMap[topic.Name]; exist {
517 sc.lockOfTopicLockMap.Unlock()
518 sc.topicLockMap[topic.Name].Lock()
519 } else {
520 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
521 sc.lockOfTopicLockMap.Unlock()
522 sc.topicLockMap[topic.Name].Lock()
523 }
524}
525
526func (sc *SaramaClient) unLockTopic(topic *Topic) {
527 sc.lockOfTopicLockMap.Lock()
528 defer sc.lockOfTopicLockMap.Unlock()
529 if _, exist := sc.topicLockMap[topic.Name]; exist {
530 sc.topicLockMap[topic.Name].Unlock()
531 }
532}
533
khenaidoo43c82122018-11-22 18:38:28 -0500534func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
535 sc.lockTopicToConsumerChannelMap.Lock()
536 defer sc.lockTopicToConsumerChannelMap.Unlock()
537 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
538 sc.topicToConsumerChannelMap[id] = arg
539 }
540}
541
542func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
543 sc.lockTopicToConsumerChannelMap.Lock()
544 defer sc.lockTopicToConsumerChannelMap.Unlock()
545 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
546 delete(sc.topicToConsumerChannelMap, id)
547 }
548}
549
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500550func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400551 sc.lockTopicToConsumerChannelMap.RLock()
552 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500553
554 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
555 return consumerCh
556 }
557 return nil
558}
559
khenaidoo79232702018-12-04 11:00:41 -0500560func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500561 sc.lockTopicToConsumerChannelMap.Lock()
562 defer sc.lockTopicToConsumerChannelMap.Unlock()
563 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
564 consumerCh.channels = append(consumerCh.channels, ch)
565 return
566 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500567 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
568}
569
570//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
571func closeConsumers(consumers []interface{}) error {
572 var err error
573 for _, consumer := range consumers {
574 // Is it a partition consumers?
575 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
576 if errTemp := partionConsumer.Close(); errTemp != nil {
577 log.Debugw("partition!!!", log.Fields{"err": errTemp})
578 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
579 // This can occur on race condition
580 err = nil
581 } else {
582 err = errTemp
583 }
584 }
585 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
586 if errTemp := groupConsumer.Close(); errTemp != nil {
587 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
588 // This can occur on race condition
589 err = nil
590 } else {
591 err = errTemp
592 }
593 }
594 }
595 }
596 return err
khenaidoo43c82122018-11-22 18:38:28 -0500597}
598
khenaidoo79232702018-12-04 11:00:41 -0500599func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500600 sc.lockTopicToConsumerChannelMap.Lock()
601 defer sc.lockTopicToConsumerChannelMap.Unlock()
602 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
603 // Channel will be closed in the removeChannel method
604 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500605 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500606 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500607 log.Debugw("closing-consumers", log.Fields{"topic": topic})
608 err := closeConsumers(consumerCh.consumers)
609 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500610 delete(sc.topicToConsumerChannelMap, topic.Name)
611 return err
612 }
613 return nil
614 }
615 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
616 return errors.New("topic-does-not-exist")
617}
618
619func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
620 sc.lockTopicToConsumerChannelMap.Lock()
621 defer sc.lockTopicToConsumerChannelMap.Unlock()
622 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
623 for _, ch := range consumerCh.channels {
624 // Channel will be closed in the removeChannel method
625 removeChannel(consumerCh.channels, ch)
626 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500627 err := closeConsumers(consumerCh.consumers)
628 //if err == sarama.ErrUnknownTopicOrPartition {
629 // // Not an error
630 // err = nil
631 //}
632 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500633 delete(sc.topicToConsumerChannelMap, topic.Name)
634 return err
635 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500636 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
637 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500638}
639
640func (sc *SaramaClient) clearConsumerChannelMap() error {
641 sc.lockTopicToConsumerChannelMap.Lock()
642 defer sc.lockTopicToConsumerChannelMap.Unlock()
643 var err error
644 for topic, consumerCh := range sc.topicToConsumerChannelMap {
645 for _, ch := range consumerCh.channels {
646 // Channel will be closed in the removeChannel method
647 removeChannel(consumerCh.channels, ch)
648 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500649 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
650 err = errTemp
651 }
652 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500653 delete(sc.topicToConsumerChannelMap, topic)
654 }
655 return err
656}
657
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500658//createPublisher creates the publisher which is used to send a message onto kafka
659func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500660 // This Creates the publisher
661 config := sarama.NewConfig()
662 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500663 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
664 config.Producer.Flush.Messages = sc.producerFlushMessages
665 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
666 config.Producer.Return.Errors = sc.producerReturnErrors
667 config.Producer.Return.Successes = sc.producerReturnSuccess
668 //config.Producer.RequiredAcks = sarama.WaitForAll
669 config.Producer.RequiredAcks = sarama.WaitForLocal
670
khenaidoo43c82122018-11-22 18:38:28 -0500671 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
672 brokers := []string{kafkaFullAddr}
673
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500674 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
675 log.Errorw("error-starting-publisher", log.Fields{"error": err})
676 return err
677 } else {
678 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500679 }
680 log.Info("Kafka-publisher-created")
681 return nil
682}
683
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500684func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500685 config := sarama.NewConfig()
686 config.Consumer.Return.Errors = true
687 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500688 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
689 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500690 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530691 config.Metadata.Retry.Max = sc.metadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500692 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
693 brokers := []string{kafkaFullAddr}
694
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500695 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
696 log.Errorw("error-starting-consumers", log.Fields{"error": err})
697 return err
698 } else {
699 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500700 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500701 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500702 return nil
703}
704
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500705// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500706func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500707 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500708 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500709 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500710 //config.Consumer.Return.Errors = true
711 //config.Group.Return.Notifications = false
712 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
713 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500714 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500715 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500716 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
717 brokers := []string{kafkaFullAddr}
718
khenaidoo43c82122018-11-22 18:38:28 -0500719 topics := []string{topic.Name}
720 var consumer *scc.Consumer
721 var err error
722
khenaidooca301322019-01-09 23:06:32 -0500723 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
724 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500725 return nil, err
726 }
khenaidooca301322019-01-09 23:06:32 -0500727 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500728
729 //sc.groupConsumers[topic.Name] = consumer
730 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500731 return consumer, nil
732}
733
khenaidoo43c82122018-11-22 18:38:28 -0500734// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500735// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500736func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500737 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400738 sc.lockTopicToConsumerChannelMap.RLock()
739 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500740 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500741 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500742 c <- protoMessage
743 }(ch)
744 }
745}
746
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500747func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
748 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500749startloop:
750 for {
751 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500752 case err, ok := <-consumer.Errors():
753 if ok {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500754 log.Warnw("partition-consumers-error", log.Fields{"error": err})
755 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500756 // Channel is closed
757 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500758 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500759 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500760 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500761 if !ok {
762 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500763 break startloop
764 }
765 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500766 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500767 if err := proto.Unmarshal(msgBody, icm); err != nil {
768 log.Warnw("partition-invalid-message", log.Fields{"error": err})
769 continue
770 }
771 go sc.dispatchToConsumers(consumerChnls, icm)
772 case <-sc.doneCh:
773 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
774 break startloop
775 }
776 }
777 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
778}
779
780func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
781 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
782
783startloop:
784 for {
785 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500786 case err, ok := <-consumer.Errors():
787 if ok {
khenaidooca301322019-01-09 23:06:32 -0500788 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500789 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500790 // channel is closed
791 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500792 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500793 case msg, ok := <-consumer.Messages():
794 if !ok {
795 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500796 break startloop
797 }
khenaidoo297cd252019-02-07 22:10:23 -0500798 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500799 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500800 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500801 if err := proto.Unmarshal(msgBody, icm); err != nil {
802 log.Warnw("invalid-message", log.Fields{"error": err})
803 continue
804 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500805 go sc.dispatchToConsumers(consumerChnls, icm)
806 consumer.MarkOffset(msg, "")
807 case ntf := <-consumer.Notifications():
808 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500809 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500810 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500811 break startloop
812 }
813 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500814 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500815}
816
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500817func (sc *SaramaClient) startConsumers(topic *Topic) error {
818 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
819 var consumerCh *consumerChannels
820 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
821 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
822 return errors.New("consumers-not-exist")
823 }
824 // For each consumer listening for that topic, start a consumption loop
825 for _, consumer := range consumerCh.consumers {
826 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
827 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
828 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
829 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
830 } else {
831 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
832 return errors.New("invalid-consumer")
833 }
834 }
835 return nil
836}
837
838//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
839//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500840func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500841 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500842 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500843
khenaidoo7ff26c72019-01-16 14:55:48 -0500844 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500845 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500846 return nil, err
847 }
848
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500849 consumersIf := make([]interface{}, 0)
850 for _, pConsumer := range pConsumers {
851 consumersIf = append(consumersIf, pConsumer)
852 }
853
854 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500855 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500856 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500857 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500858 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500859 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500860 }
861
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500862 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500863 sc.addTopicToConsumerChannelMap(topic.Name, cc)
864
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500865 //Start a consumers to listen on that specific topic
866 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500867
868 return consumerListeningChannel, nil
869}
870
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500871// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
872// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -0500873func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500874 // TODO: Replace this development partition consumers with a group consumers
875 var pConsumer *scc.Consumer
876 var err error
khenaidoo731697e2019-01-29 16:03:29 -0500877 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500878 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
879 return nil, err
880 }
881 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
882 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500883 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500884 cc := &consumerChannels{
885 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500886 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500887 }
888
889 // Add the consumers channel to the map
890 sc.addTopicToConsumerChannelMap(topic.Name, cc)
891
892 //Start a consumers to listen on that specific topic
893 go sc.startConsumers(topic)
894
895 return consumerListeningChannel, nil
896}
897
khenaidoo7ff26c72019-01-16 14:55:48 -0500898func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500899 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500900 partitionList, err := sc.consumer.Partitions(topic.Name)
901 if err != nil {
902 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
903 return nil, err
904 }
905
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500906 pConsumers := make([]sarama.PartitionConsumer, 0)
907 for _, partition := range partitionList {
908 var pConsumer sarama.PartitionConsumer
909 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
910 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
911 return nil, err
912 }
913 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500914 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500915 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500916}
917
khenaidoo79232702018-12-04 11:00:41 -0500918func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500919 var i int
khenaidoo79232702018-12-04 11:00:41 -0500920 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500921 for i, channel = range channels {
922 if channel == ch {
923 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
924 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500925 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -0500926 return channels[:len(channels)-1]
927 }
928 }
929 return channels
930}
khenaidoo7ff26c72019-01-16 14:55:48 -0500931
khenaidoo7ff26c72019-01-16 14:55:48 -0500932func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
933 sc.lockOfGroupConsumers.Lock()
934 defer sc.lockOfGroupConsumers.Unlock()
935 if _, exist := sc.groupConsumers[topic]; !exist {
936 sc.groupConsumers[topic] = consumer
937 }
938}
939
940func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
941 sc.lockOfGroupConsumers.Lock()
942 defer sc.lockOfGroupConsumers.Unlock()
943 if _, exist := sc.groupConsumers[topic]; exist {
944 consumer := sc.groupConsumers[topic]
945 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -0400946 if err := consumer.Close(); err != nil {
khenaidoo7ff26c72019-01-16 14:55:48 -0500947 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
948 return err
949 }
950 }
951 return nil
952}