blob: e920a83533bbe736793334d491493edc545f8022 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
khenaidoo43c82122018-11-22 18:38:28 -050021 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050023 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050024 "github.com/opencord/voltha-go/common/log"
William Kurkiandaa6bb22019-03-07 12:26:28 -050025 ic "github.com/opencord/voltha-protos/go/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "gopkg.in/Shopify/sarama.v1"
27 "strings"
khenaidoo43c82122018-11-22 18:38:28 -050028 "sync"
29 "time"
30)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
khenaidooca301322019-01-09 23:06:32 -050033 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050034}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050043 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050054 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040055 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050056 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050057 consumerType int
khenaidooca301322019-01-09 23:06:32 -050058 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050059 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050071 doneCh chan int
72 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050074 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
khenaidoo43c82122018-11-22 18:38:28 -050076}
77
78type SaramaClientOption func(*SaramaClient)
79
80func Host(host string) SaramaClientOption {
81 return func(args *SaramaClient) {
82 args.KafkaHost = host
83 }
84}
85
86func Port(port int) SaramaClientOption {
87 return func(args *SaramaClient) {
88 args.KafkaPort = port
89 }
90}
91
khenaidooca301322019-01-09 23:06:32 -050092func ConsumerGroupPrefix(prefix string) SaramaClientOption {
93 return func(args *SaramaClient) {
94 args.consumerGroupPrefix = prefix
95 }
96}
97
98func ConsumerGroupName(name string) SaramaClientOption {
99 return func(args *SaramaClient) {
100 args.consumerGroupName = name
101 }
102}
103
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500104func ConsumerType(consumer int) SaramaClientOption {
105 return func(args *SaramaClient) {
106 args.consumerType = consumer
107 }
108}
109
110func ProducerFlushFrequency(frequency int) SaramaClientOption {
111 return func(args *SaramaClient) {
112 args.producerFlushFrequency = frequency
113 }
114}
115
116func ProducerFlushMessages(num int) SaramaClientOption {
117 return func(args *SaramaClient) {
118 args.producerFlushMessages = num
119 }
120}
121
122func ProducerFlushMaxMessages(num int) SaramaClientOption {
123 return func(args *SaramaClient) {
124 args.producerFlushMaxmessages = num
125 }
126}
127
khenaidoo90847922018-12-03 14:47:51 -0500128func ProducerMaxRetries(num int) SaramaClientOption {
129 return func(args *SaramaClient) {
130 args.producerRetryMax = num
131 }
132}
133
134func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
135 return func(args *SaramaClient) {
136 args.producerRetryBackOff = duration
137 }
138}
139
140func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500141 return func(args *SaramaClient) {
142 args.producerReturnErrors = opt
143 }
144}
145
khenaidoo90847922018-12-03 14:47:51 -0500146func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500147 return func(args *SaramaClient) {
148 args.producerReturnSuccess = opt
149 }
150}
151
152func ConsumerMaxWait(wait int) SaramaClientOption {
153 return func(args *SaramaClient) {
154 args.consumerMaxwait = wait
155 }
156}
157
158func MaxProcessingTime(pTime int) SaramaClientOption {
159 return func(args *SaramaClient) {
160 args.maxProcessingTime = pTime
161 }
162}
163
164func NumPartitions(number int) SaramaClientOption {
165 return func(args *SaramaClient) {
166 args.numPartitions = number
167 }
168}
169
170func NumReplicas(number int) SaramaClientOption {
171 return func(args *SaramaClient) {
172 args.numReplicas = number
173 }
174}
175
176func AutoCreateTopic(opt bool) SaramaClientOption {
177 return func(args *SaramaClient) {
178 args.autoCreateTopic = opt
179 }
180}
181
khenaidoo43c82122018-11-22 18:38:28 -0500182func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
183 client := &SaramaClient{
184 KafkaHost: DefaultKafkaHost,
185 KafkaPort: DefaultKafkaPort,
186 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500187 client.consumerType = DefaultConsumerType
188 client.producerFlushFrequency = DefaultProducerFlushFrequency
189 client.producerFlushMessages = DefaultProducerFlushMessages
190 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
191 client.producerReturnErrors = DefaultProducerReturnErrors
192 client.producerReturnSuccess = DefaultProducerReturnSuccess
193 client.producerRetryMax = DefaultProducerRetryMax
194 client.producerRetryBackOff = DefaultProducerRetryBackoff
195 client.consumerMaxwait = DefaultConsumerMaxwait
196 client.maxProcessingTime = DefaultMaxProcessingTime
197 client.numPartitions = DefaultNumberPartitions
198 client.numReplicas = DefaultNumberReplicas
199 client.autoCreateTopic = DefaultAutoCreateTopic
khenaidoo43c82122018-11-22 18:38:28 -0500200
201 for _, option := range opts {
202 option(client)
203 }
204
khenaidooca301322019-01-09 23:06:32 -0500205 client.groupConsumers = make(map[string]*scc.Consumer)
206
khenaidoo43c82122018-11-22 18:38:28 -0500207 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500208 client.topicLockMap = make(map[string]*sync.RWMutex)
209 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500210 client.lockOfGroupConsumers = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500211 return client
212}
213
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500214func (sc *SaramaClient) Start() error {
215 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500216
217 // Create the Done channel
218 sc.doneCh = make(chan int, 1)
219
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500220 var err error
221
222 // Create the Cluster Admin
223 if err = sc.createClusterAdmin(); err != nil {
224 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
225 return err
226 }
227
khenaidoo43c82122018-11-22 18:38:28 -0500228 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500229 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500230 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
231 return err
232 }
233
khenaidooca301322019-01-09 23:06:32 -0500234 if sc.consumerType == DefaultConsumerType {
235 // Create the master consumers
236 if err := sc.createConsumer(); err != nil {
237 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
238 return err
239 }
khenaidoo43c82122018-11-22 18:38:28 -0500240 }
241
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500242 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500243 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
244
khenaidooca301322019-01-09 23:06:32 -0500245 log.Info("kafka-sarama-client-started")
246
khenaidoo43c82122018-11-22 18:38:28 -0500247 return nil
248}
249
250func (sc *SaramaClient) Stop() {
251 log.Info("stopping-sarama-client")
252
253 //Send a message over the done channel to close all long running routines
254 sc.doneCh <- 1
255
khenaidoo43c82122018-11-22 18:38:28 -0500256 if sc.producer != nil {
257 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500258 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500259 }
260 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500261
khenaidoo43c82122018-11-22 18:38:28 -0500262 if sc.consumer != nil {
263 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500264 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500265 }
266 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500267
khenaidooca301322019-01-09 23:06:32 -0500268 for key, val := range sc.groupConsumers {
269 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
270 if err := val.Close(); err != nil {
271 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500272 }
273 }
274
275 if sc.cAdmin != nil {
276 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500277 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500278 }
279 }
280
281 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500282 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500283
284 log.Info("sarama-client-stopped")
285}
286
khenaidooca301322019-01-09 23:06:32 -0500287//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
288// the invoking function must hold the lock
289func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500290 // Set the topic details
291 topicDetail := &sarama.TopicDetail{}
292 topicDetail.NumPartitions = int32(numPartition)
293 topicDetail.ReplicationFactor = int16(repFactor)
294 topicDetail.ConfigEntries = make(map[string]*string)
295 topicDetails := make(map[string]*sarama.TopicDetail)
296 topicDetails[topic.Name] = topicDetail
297
298 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
299 if err == sarama.ErrTopicAlreadyExists {
300 // Not an error
301 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
302 return nil
303 }
304 log.Errorw("create-topic-failure", log.Fields{"error": err})
305 return err
306 }
307 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
308 // do so.
309 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
310 return nil
311}
312
khenaidooca301322019-01-09 23:06:32 -0500313//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
314// ensure no two go routines are performing operations on the same topic
315func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
316 sc.lockTopic(topic)
317 defer sc.unLockTopic(topic)
318
319 return sc.createTopic(topic, numPartition, repFactor)
320}
321
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500322//DeleteTopic removes a topic from the kafka Broker
323func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500324 sc.lockTopic(topic)
325 defer sc.unLockTopic(topic)
326
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500327 // Remove the topic from the broker
328 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
329 if err == sarama.ErrUnknownTopicOrPartition {
330 // Not an error as does not exist
331 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
332 return nil
333 }
334 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
335 return err
336 }
337
338 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
339 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
340 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
341 return err
342 }
343 return nil
344}
345
346// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
347// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500348func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500349 sc.lockTopic(topic)
350 defer sc.unLockTopic(topic)
351
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500352 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
353
354 // If a consumers already exist for that topic then resuse it
355 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
356 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
357 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500358 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500359 sc.addChannelToConsumerChannelMap(topic, ch)
360 return ch, nil
361 }
362
363 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500364 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500365 var err error
366
367 // Use the consumerType option to figure out the type of consumer to launch
368 if sc.consumerType == PartitionConsumer {
369 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500370 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500371 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
372 return nil, err
373 }
374 }
khenaidoo731697e2019-01-29 16:03:29 -0500375 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500376 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
377 return nil, err
378 }
379 } else if sc.consumerType == GroupCustomer {
380 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
381 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500382 //if sc.autoCreateTopic {
383 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
384 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
385 // return nil, err
386 // }
387 //}
388 //groupId := sc.consumerGroupName
389 groupId := getGroupId(kvArgs...)
390 // Include the group prefix
391 if groupId != "" {
392 groupId = sc.consumerGroupPrefix + groupId
393 } else {
394 // Need to use a unique group Id per topic
395 groupId = sc.consumerGroupPrefix + topic.Name
396 }
khenaidoo731697e2019-01-29 16:03:29 -0500397 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500398 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500399 return nil, err
400 }
khenaidooca301322019-01-09 23:06:32 -0500401
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500402 } else {
403 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
404 return nil, errors.New("unknown-consumer-type")
405 }
406
407 return consumerListeningChannel, nil
408}
409
410//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500411func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500412 sc.lockTopic(topic)
413 defer sc.unLockTopic(topic)
414
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500415 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500416 var err error
417 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
418 log.Errorw("failed-removing-channel", log.Fields{"error": err})
419 }
420 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
421 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
422 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500423 return err
424}
425
426// send formats and sends the request onto the kafka messaging bus.
427func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
428
429 // Assert message is a proto message
430 var protoMsg proto.Message
431 var ok bool
432 // ascertain the value interface type is a proto.Message
433 if protoMsg, ok = msg.(proto.Message); !ok {
434 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
435 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
436 }
437
438 var marshalled []byte
439 var err error
440 // Create the Sarama producer message
441 if marshalled, err = proto.Marshal(protoMsg); err != nil {
442 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
443 return err
444 }
445 key := ""
446 if len(keys) > 0 {
447 key = keys[0] // Only the first key is relevant
448 }
449 kafkaMsg := &sarama.ProducerMessage{
450 Topic: topic.Name,
451 Key: sarama.StringEncoder(key),
452 Value: sarama.ByteEncoder(marshalled),
453 }
454
455 // Send message to kafka
456 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500457 // Wait for result
458 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
459 select {
460 case ok := <-sc.producer.Successes():
khenaidoo297cd252019-02-07 22:10:23 -0500461 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
khenaidoo90847922018-12-03 14:47:51 -0500462 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500463 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500464 return notOk
465 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500466 return nil
467}
468
khenaidooca301322019-01-09 23:06:32 -0500469// getGroupId returns the group id from the key-value args.
470func getGroupId(kvArgs ...*KVArg) string {
471 for _, arg := range kvArgs {
472 if arg.Key == GroupIdKey {
473 return arg.Value.(string)
474 }
475 }
476 return ""
477}
478
khenaidoo731697e2019-01-29 16:03:29 -0500479// getOffset returns the offset from the key-value args.
480func getOffset(kvArgs ...*KVArg) int64 {
481 for _, arg := range kvArgs {
482 if arg.Key == Offset {
483 return arg.Value.(int64)
484 }
485 }
486 return sarama.OffsetNewest
487}
488
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500489func (sc *SaramaClient) createClusterAdmin() error {
490 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
491 config := sarama.NewConfig()
492 config.Version = sarama.V1_0_0_0
493
494 // Create a cluster Admin
495 var cAdmin sarama.ClusterAdmin
496 var err error
497 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
498 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
499 return err
500 }
501 sc.cAdmin = cAdmin
502 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500503}
504
khenaidood2b6df92018-12-13 16:37:20 -0500505func (sc *SaramaClient) lockTopic(topic *Topic) {
506 sc.lockOfTopicLockMap.Lock()
507 if _, exist := sc.topicLockMap[topic.Name]; exist {
508 sc.lockOfTopicLockMap.Unlock()
509 sc.topicLockMap[topic.Name].Lock()
510 } else {
511 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
512 sc.lockOfTopicLockMap.Unlock()
513 sc.topicLockMap[topic.Name].Lock()
514 }
515}
516
517func (sc *SaramaClient) unLockTopic(topic *Topic) {
518 sc.lockOfTopicLockMap.Lock()
519 defer sc.lockOfTopicLockMap.Unlock()
520 if _, exist := sc.topicLockMap[topic.Name]; exist {
521 sc.topicLockMap[topic.Name].Unlock()
522 }
523}
524
khenaidoo43c82122018-11-22 18:38:28 -0500525func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
526 sc.lockTopicToConsumerChannelMap.Lock()
527 defer sc.lockTopicToConsumerChannelMap.Unlock()
528 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
529 sc.topicToConsumerChannelMap[id] = arg
530 }
531}
532
533func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
534 sc.lockTopicToConsumerChannelMap.Lock()
535 defer sc.lockTopicToConsumerChannelMap.Unlock()
536 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
537 delete(sc.topicToConsumerChannelMap, id)
538 }
539}
540
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500541func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400542 sc.lockTopicToConsumerChannelMap.RLock()
543 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500544
545 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
546 return consumerCh
547 }
548 return nil
549}
550
khenaidoo79232702018-12-04 11:00:41 -0500551func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500552 sc.lockTopicToConsumerChannelMap.Lock()
553 defer sc.lockTopicToConsumerChannelMap.Unlock()
554 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
555 consumerCh.channels = append(consumerCh.channels, ch)
556 return
557 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500558 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
559}
560
561//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
562func closeConsumers(consumers []interface{}) error {
563 var err error
564 for _, consumer := range consumers {
565 // Is it a partition consumers?
566 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
567 if errTemp := partionConsumer.Close(); errTemp != nil {
568 log.Debugw("partition!!!", log.Fields{"err": errTemp})
569 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
570 // This can occur on race condition
571 err = nil
572 } else {
573 err = errTemp
574 }
575 }
576 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
577 if errTemp := groupConsumer.Close(); errTemp != nil {
578 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
579 // This can occur on race condition
580 err = nil
581 } else {
582 err = errTemp
583 }
584 }
585 }
586 }
587 return err
khenaidoo43c82122018-11-22 18:38:28 -0500588}
589
khenaidoo79232702018-12-04 11:00:41 -0500590func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500591 sc.lockTopicToConsumerChannelMap.Lock()
592 defer sc.lockTopicToConsumerChannelMap.Unlock()
593 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
594 // Channel will be closed in the removeChannel method
595 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500596 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500597 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500598 log.Debugw("closing-consumers", log.Fields{"topic": topic})
599 err := closeConsumers(consumerCh.consumers)
600 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500601 delete(sc.topicToConsumerChannelMap, topic.Name)
602 return err
603 }
604 return nil
605 }
606 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
607 return errors.New("topic-does-not-exist")
608}
609
610func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
611 sc.lockTopicToConsumerChannelMap.Lock()
612 defer sc.lockTopicToConsumerChannelMap.Unlock()
613 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
614 for _, ch := range consumerCh.channels {
615 // Channel will be closed in the removeChannel method
616 removeChannel(consumerCh.channels, ch)
617 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500618 err := closeConsumers(consumerCh.consumers)
619 //if err == sarama.ErrUnknownTopicOrPartition {
620 // // Not an error
621 // err = nil
622 //}
623 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500624 delete(sc.topicToConsumerChannelMap, topic.Name)
625 return err
626 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500627 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
628 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500629}
630
631func (sc *SaramaClient) clearConsumerChannelMap() error {
632 sc.lockTopicToConsumerChannelMap.Lock()
633 defer sc.lockTopicToConsumerChannelMap.Unlock()
634 var err error
635 for topic, consumerCh := range sc.topicToConsumerChannelMap {
636 for _, ch := range consumerCh.channels {
637 // Channel will be closed in the removeChannel method
638 removeChannel(consumerCh.channels, ch)
639 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500640 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
641 err = errTemp
642 }
643 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500644 delete(sc.topicToConsumerChannelMap, topic)
645 }
646 return err
647}
648
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500649//createPublisher creates the publisher which is used to send a message onto kafka
650func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500651 // This Creates the publisher
652 config := sarama.NewConfig()
653 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500654 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
655 config.Producer.Flush.Messages = sc.producerFlushMessages
656 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
657 config.Producer.Return.Errors = sc.producerReturnErrors
658 config.Producer.Return.Successes = sc.producerReturnSuccess
659 //config.Producer.RequiredAcks = sarama.WaitForAll
660 config.Producer.RequiredAcks = sarama.WaitForLocal
661
khenaidoo43c82122018-11-22 18:38:28 -0500662 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
663 brokers := []string{kafkaFullAddr}
664
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500665 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
666 log.Errorw("error-starting-publisher", log.Fields{"error": err})
667 return err
668 } else {
669 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500670 }
671 log.Info("Kafka-publisher-created")
672 return nil
673}
674
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500675func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500676 config := sarama.NewConfig()
677 config.Consumer.Return.Errors = true
678 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500679 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
680 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500681 config.Consumer.Offsets.Initial = sarama.OffsetNewest
682 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
683 brokers := []string{kafkaFullAddr}
684
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500685 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
686 log.Errorw("error-starting-consumers", log.Fields{"error": err})
687 return err
688 } else {
689 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500690 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500691 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500692 return nil
693}
694
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500695// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500696func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500697 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500698 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500699 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500700 //config.Consumer.Return.Errors = true
701 //config.Group.Return.Notifications = false
702 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
703 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500704 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500705 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500706 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
707 brokers := []string{kafkaFullAddr}
708
khenaidoo43c82122018-11-22 18:38:28 -0500709 topics := []string{topic.Name}
710 var consumer *scc.Consumer
711 var err error
712
khenaidooca301322019-01-09 23:06:32 -0500713 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
714 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500715 return nil, err
716 }
khenaidooca301322019-01-09 23:06:32 -0500717 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500718
719 //sc.groupConsumers[topic.Name] = consumer
720 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500721 return consumer, nil
722}
723
khenaidoo43c82122018-11-22 18:38:28 -0500724// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500725// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500726func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500727 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400728 sc.lockTopicToConsumerChannelMap.RLock()
729 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500730 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500731 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500732 c <- protoMessage
733 }(ch)
734 }
735}
736
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500737func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
738 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500739startloop:
740 for {
741 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500742 case err, ok := <-consumer.Errors():
743 if ok {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500744 log.Warnw("partition-consumers-error", log.Fields{"error": err})
745 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500746 // Channel is closed
747 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500748 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500749 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500750 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500751 if !ok {
752 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500753 break startloop
754 }
755 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500756 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500757 if err := proto.Unmarshal(msgBody, icm); err != nil {
758 log.Warnw("partition-invalid-message", log.Fields{"error": err})
759 continue
760 }
761 go sc.dispatchToConsumers(consumerChnls, icm)
762 case <-sc.doneCh:
763 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
764 break startloop
765 }
766 }
767 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
768}
769
770func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
771 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
772
773startloop:
774 for {
775 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500776 case err, ok := <-consumer.Errors():
777 if ok {
khenaidooca301322019-01-09 23:06:32 -0500778 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500779 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500780 // channel is closed
781 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500782 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500783 case msg, ok := <-consumer.Messages():
784 if !ok {
785 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500786 break startloop
787 }
khenaidoo297cd252019-02-07 22:10:23 -0500788 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500789 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500790 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500791 if err := proto.Unmarshal(msgBody, icm); err != nil {
792 log.Warnw("invalid-message", log.Fields{"error": err})
793 continue
794 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500795 go sc.dispatchToConsumers(consumerChnls, icm)
796 consumer.MarkOffset(msg, "")
797 case ntf := <-consumer.Notifications():
798 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500799 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500800 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500801 break startloop
802 }
803 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500804 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500805}
806
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500807func (sc *SaramaClient) startConsumers(topic *Topic) error {
808 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
809 var consumerCh *consumerChannels
810 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
811 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
812 return errors.New("consumers-not-exist")
813 }
814 // For each consumer listening for that topic, start a consumption loop
815 for _, consumer := range consumerCh.consumers {
816 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
817 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
818 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
819 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
820 } else {
821 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
822 return errors.New("invalid-consumer")
823 }
824 }
825 return nil
826}
827
828//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
829//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500830func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500831 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500832 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500833
khenaidoo7ff26c72019-01-16 14:55:48 -0500834 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500835 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500836 return nil, err
837 }
838
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500839 consumersIf := make([]interface{}, 0)
840 for _, pConsumer := range pConsumers {
841 consumersIf = append(consumersIf, pConsumer)
842 }
843
844 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500845 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500846 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500847 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500848 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500849 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500850 }
851
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500852 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500853 sc.addTopicToConsumerChannelMap(topic.Name, cc)
854
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500855 //Start a consumers to listen on that specific topic
856 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500857
858 return consumerListeningChannel, nil
859}
860
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500861// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
862// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -0500863func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500864 // TODO: Replace this development partition consumers with a group consumers
865 var pConsumer *scc.Consumer
866 var err error
khenaidoo731697e2019-01-29 16:03:29 -0500867 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500868 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
869 return nil, err
870 }
871 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
872 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500873 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500874 cc := &consumerChannels{
875 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500876 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500877 }
878
879 // Add the consumers channel to the map
880 sc.addTopicToConsumerChannelMap(topic.Name, cc)
881
882 //Start a consumers to listen on that specific topic
883 go sc.startConsumers(topic)
884
885 return consumerListeningChannel, nil
886}
887
khenaidoo7ff26c72019-01-16 14:55:48 -0500888func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500889 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500890 partitionList, err := sc.consumer.Partitions(topic.Name)
891 if err != nil {
892 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
893 return nil, err
894 }
895
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500896 pConsumers := make([]sarama.PartitionConsumer, 0)
897 for _, partition := range partitionList {
898 var pConsumer sarama.PartitionConsumer
899 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
900 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
901 return nil, err
902 }
903 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500904 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500905 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500906}
907
khenaidoo79232702018-12-04 11:00:41 -0500908func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500909 var i int
khenaidoo79232702018-12-04 11:00:41 -0500910 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500911 for i, channel = range channels {
912 if channel == ch {
913 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
914 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500915 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -0500916 return channels[:len(channels)-1]
917 }
918 }
919 return channels
920}
khenaidoo7ff26c72019-01-16 14:55:48 -0500921
khenaidoo7ff26c72019-01-16 14:55:48 -0500922func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
923 sc.lockOfGroupConsumers.Lock()
924 defer sc.lockOfGroupConsumers.Unlock()
925 if _, exist := sc.groupConsumers[topic]; !exist {
926 sc.groupConsumers[topic] = consumer
927 }
928}
929
930func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
931 sc.lockOfGroupConsumers.Lock()
932 defer sc.lockOfGroupConsumers.Unlock()
933 if _, exist := sc.groupConsumers[topic]; exist {
934 consumer := sc.groupConsumers[topic]
935 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -0400936 if err := consumer.Close(); err != nil {
khenaidoo7ff26c72019-01-16 14:55:48 -0500937 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
938 return err
939 }
940 }
941 return nil
942}