blob: a251c566a7c59b800967d77dd07ec30d66ab2556 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
Scott Bakerf2596722019-09-27 12:39:56 -070021 "github.com/Shopify/sarama"
khenaidoo43c82122018-11-22 18:38:28 -050022 scc "github.com/bsm/sarama-cluster"
23 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050024 "github.com/google/uuid"
Scott Baker807addd2019-10-24 15:16:21 -070025 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Baker555307d2019-11-04 08:58:01 -080026 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Bakerf2596722019-09-27 12:39:56 -070027 "strings"
28 "sync"
29 "time"
khenaidoo43c82122018-11-22 18:38:28 -050030)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
khenaidooca301322019-01-09 23:06:32 -050033 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050034}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050043 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050054 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040055 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050056 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050057 consumerType int
khenaidooca301322019-01-09 23:06:32 -050058 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050059 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050071 doneCh chan int
72 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050074 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053076 metadataMaxRetry int
Scott Bakeree6a0872019-10-29 15:59:52 -070077 alive bool
78 liveness chan bool
79 livenessChannelInterval time.Duration
80 lastLivenessTime time.Time
81 started bool
khenaidoo43c82122018-11-22 18:38:28 -050082}
83
84type SaramaClientOption func(*SaramaClient)
85
86func Host(host string) SaramaClientOption {
87 return func(args *SaramaClient) {
88 args.KafkaHost = host
89 }
90}
91
92func Port(port int) SaramaClientOption {
93 return func(args *SaramaClient) {
94 args.KafkaPort = port
95 }
96}
97
khenaidooca301322019-01-09 23:06:32 -050098func ConsumerGroupPrefix(prefix string) SaramaClientOption {
99 return func(args *SaramaClient) {
100 args.consumerGroupPrefix = prefix
101 }
102}
103
104func ConsumerGroupName(name string) SaramaClientOption {
105 return func(args *SaramaClient) {
106 args.consumerGroupName = name
107 }
108}
109
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500110func ConsumerType(consumer int) SaramaClientOption {
111 return func(args *SaramaClient) {
112 args.consumerType = consumer
113 }
114}
115
116func ProducerFlushFrequency(frequency int) SaramaClientOption {
117 return func(args *SaramaClient) {
118 args.producerFlushFrequency = frequency
119 }
120}
121
122func ProducerFlushMessages(num int) SaramaClientOption {
123 return func(args *SaramaClient) {
124 args.producerFlushMessages = num
125 }
126}
127
128func ProducerFlushMaxMessages(num int) SaramaClientOption {
129 return func(args *SaramaClient) {
130 args.producerFlushMaxmessages = num
131 }
132}
133
khenaidoo90847922018-12-03 14:47:51 -0500134func ProducerMaxRetries(num int) SaramaClientOption {
135 return func(args *SaramaClient) {
136 args.producerRetryMax = num
137 }
138}
139
140func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
141 return func(args *SaramaClient) {
142 args.producerRetryBackOff = duration
143 }
144}
145
146func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500147 return func(args *SaramaClient) {
148 args.producerReturnErrors = opt
149 }
150}
151
khenaidoo90847922018-12-03 14:47:51 -0500152func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500153 return func(args *SaramaClient) {
154 args.producerReturnSuccess = opt
155 }
156}
157
158func ConsumerMaxWait(wait int) SaramaClientOption {
159 return func(args *SaramaClient) {
160 args.consumerMaxwait = wait
161 }
162}
163
164func MaxProcessingTime(pTime int) SaramaClientOption {
165 return func(args *SaramaClient) {
166 args.maxProcessingTime = pTime
167 }
168}
169
170func NumPartitions(number int) SaramaClientOption {
171 return func(args *SaramaClient) {
172 args.numPartitions = number
173 }
174}
175
176func NumReplicas(number int) SaramaClientOption {
177 return func(args *SaramaClient) {
178 args.numReplicas = number
179 }
180}
181
182func AutoCreateTopic(opt bool) SaramaClientOption {
183 return func(args *SaramaClient) {
184 args.autoCreateTopic = opt
185 }
186}
187
Abhilash S.L294ff522019-06-26 18:14:33 +0530188func MetadatMaxRetries(retry int) SaramaClientOption {
189 return func(args *SaramaClient) {
190 args.metadataMaxRetry = retry
191 }
192}
193
Scott Bakeree6a0872019-10-29 15:59:52 -0700194func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
195 return func(args *SaramaClient) {
196 args.livenessChannelInterval = opt
197 }
198}
199
khenaidoo43c82122018-11-22 18:38:28 -0500200func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
201 client := &SaramaClient{
202 KafkaHost: DefaultKafkaHost,
203 KafkaPort: DefaultKafkaPort,
204 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500205 client.consumerType = DefaultConsumerType
206 client.producerFlushFrequency = DefaultProducerFlushFrequency
207 client.producerFlushMessages = DefaultProducerFlushMessages
208 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
209 client.producerReturnErrors = DefaultProducerReturnErrors
210 client.producerReturnSuccess = DefaultProducerReturnSuccess
211 client.producerRetryMax = DefaultProducerRetryMax
212 client.producerRetryBackOff = DefaultProducerRetryBackoff
213 client.consumerMaxwait = DefaultConsumerMaxwait
214 client.maxProcessingTime = DefaultMaxProcessingTime
215 client.numPartitions = DefaultNumberPartitions
216 client.numReplicas = DefaultNumberReplicas
217 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530218 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Bakeree6a0872019-10-29 15:59:52 -0700219 client.livenessChannelInterval = DefaultLivenessChannelInterval
khenaidoo43c82122018-11-22 18:38:28 -0500220
221 for _, option := range opts {
222 option(client)
223 }
224
khenaidooca301322019-01-09 23:06:32 -0500225 client.groupConsumers = make(map[string]*scc.Consumer)
226
khenaidoo43c82122018-11-22 18:38:28 -0500227 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500228 client.topicLockMap = make(map[string]*sync.RWMutex)
229 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500230 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Bakeree6a0872019-10-29 15:59:52 -0700231
232 // alive until proven otherwise
233 client.alive = true
234
khenaidoo43c82122018-11-22 18:38:28 -0500235 return client
236}
237
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500238func (sc *SaramaClient) Start() error {
239 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500240
241 // Create the Done channel
242 sc.doneCh = make(chan int, 1)
243
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500244 var err error
245
khenaidoob3244212019-08-27 14:32:27 -0400246 // Add a cleanup in case of failure to startup
247 defer func() {
248 if err != nil {
249 sc.Stop()
250 }
251 }()
252
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500253 // Create the Cluster Admin
254 if err = sc.createClusterAdmin(); err != nil {
255 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
256 return err
257 }
258
khenaidoo43c82122018-11-22 18:38:28 -0500259 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500260 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500261 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
262 return err
263 }
264
khenaidooca301322019-01-09 23:06:32 -0500265 if sc.consumerType == DefaultConsumerType {
266 // Create the master consumers
267 if err := sc.createConsumer(); err != nil {
268 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
269 return err
270 }
khenaidoo43c82122018-11-22 18:38:28 -0500271 }
272
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500273 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500274 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
275
khenaidooca301322019-01-09 23:06:32 -0500276 log.Info("kafka-sarama-client-started")
277
Scott Bakeree6a0872019-10-29 15:59:52 -0700278 sc.started = true
279
khenaidoo43c82122018-11-22 18:38:28 -0500280 return nil
281}
282
283func (sc *SaramaClient) Stop() {
284 log.Info("stopping-sarama-client")
285
Scott Bakeree6a0872019-10-29 15:59:52 -0700286 sc.started = false
287
khenaidoo43c82122018-11-22 18:38:28 -0500288 //Send a message over the done channel to close all long running routines
289 sc.doneCh <- 1
290
khenaidoo43c82122018-11-22 18:38:28 -0500291 if sc.producer != nil {
292 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500293 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500294 }
295 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500296
khenaidoo43c82122018-11-22 18:38:28 -0500297 if sc.consumer != nil {
298 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500299 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500300 }
301 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500302
khenaidooca301322019-01-09 23:06:32 -0500303 for key, val := range sc.groupConsumers {
304 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
305 if err := val.Close(); err != nil {
306 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500307 }
308 }
309
310 if sc.cAdmin != nil {
311 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500312 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500313 }
314 }
315
316 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500317 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500318
319 log.Info("sarama-client-stopped")
320}
321
khenaidooca301322019-01-09 23:06:32 -0500322//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
323// the invoking function must hold the lock
324func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500325 // Set the topic details
326 topicDetail := &sarama.TopicDetail{}
327 topicDetail.NumPartitions = int32(numPartition)
328 topicDetail.ReplicationFactor = int16(repFactor)
329 topicDetail.ConfigEntries = make(map[string]*string)
330 topicDetails := make(map[string]*sarama.TopicDetail)
331 topicDetails[topic.Name] = topicDetail
332
333 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
334 if err == sarama.ErrTopicAlreadyExists {
335 // Not an error
336 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
337 return nil
338 }
339 log.Errorw("create-topic-failure", log.Fields{"error": err})
340 return err
341 }
342 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
343 // do so.
344 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
345 return nil
346}
347
khenaidooca301322019-01-09 23:06:32 -0500348//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
349// ensure no two go routines are performing operations on the same topic
350func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
351 sc.lockTopic(topic)
352 defer sc.unLockTopic(topic)
353
354 return sc.createTopic(topic, numPartition, repFactor)
355}
356
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500357//DeleteTopic removes a topic from the kafka Broker
358func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500359 sc.lockTopic(topic)
360 defer sc.unLockTopic(topic)
361
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500362 // Remove the topic from the broker
363 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
364 if err == sarama.ErrUnknownTopicOrPartition {
365 // Not an error as does not exist
366 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
367 return nil
368 }
369 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
370 return err
371 }
372
373 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
374 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
375 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
376 return err
377 }
378 return nil
379}
380
381// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
382// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500383func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500384 sc.lockTopic(topic)
385 defer sc.unLockTopic(topic)
386
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500387 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
388
389 // If a consumers already exist for that topic then resuse it
390 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
391 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
392 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500393 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500394 sc.addChannelToConsumerChannelMap(topic, ch)
395 return ch, nil
396 }
397
398 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500399 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500400 var err error
401
402 // Use the consumerType option to figure out the type of consumer to launch
403 if sc.consumerType == PartitionConsumer {
404 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500405 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500406 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
407 return nil, err
408 }
409 }
khenaidoo731697e2019-01-29 16:03:29 -0500410 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500411 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
412 return nil, err
413 }
414 } else if sc.consumerType == GroupCustomer {
415 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
416 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500417 //if sc.autoCreateTopic {
418 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
419 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
420 // return nil, err
421 // }
422 //}
423 //groupId := sc.consumerGroupName
424 groupId := getGroupId(kvArgs...)
425 // Include the group prefix
426 if groupId != "" {
427 groupId = sc.consumerGroupPrefix + groupId
428 } else {
429 // Need to use a unique group Id per topic
430 groupId = sc.consumerGroupPrefix + topic.Name
431 }
khenaidoo731697e2019-01-29 16:03:29 -0500432 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500433 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500434 return nil, err
435 }
khenaidooca301322019-01-09 23:06:32 -0500436
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500437 } else {
438 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
439 return nil, errors.New("unknown-consumer-type")
440 }
441
442 return consumerListeningChannel, nil
443}
444
445//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500446func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500447 sc.lockTopic(topic)
448 defer sc.unLockTopic(topic)
449
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500450 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500451 var err error
452 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
453 log.Errorw("failed-removing-channel", log.Fields{"error": err})
454 }
455 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
456 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
457 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500458 return err
459}
460
Scott Bakeree6a0872019-10-29 15:59:52 -0700461func (sc *SaramaClient) updateLiveness(alive bool) {
462 // Post a consistent stream of liveness data to the channel,
463 // so that in a live state, the core does not timeout and
464 // send a forced liveness message. Production of liveness
465 // events to the channel is rate-limited by livenessChannelInterval.
466 if sc.liveness != nil {
467 if sc.alive != alive {
468 log.Info("update-liveness-channel-because-change")
469 sc.liveness <- alive
470 sc.lastLivenessTime = time.Now()
471 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
472 log.Info("update-liveness-channel-because-interval")
473 sc.liveness <- alive
474 sc.lastLivenessTime = time.Now()
475 }
476 }
477
478 // Only emit a log message when the state changes
479 if sc.alive != alive {
480 log.Info("set-client-alive", log.Fields{"alive": alive})
481 sc.alive = alive
482 }
483}
484
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500485// send formats and sends the request onto the kafka messaging bus.
486func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
487
488 // Assert message is a proto message
489 var protoMsg proto.Message
490 var ok bool
491 // ascertain the value interface type is a proto.Message
492 if protoMsg, ok = msg.(proto.Message); !ok {
493 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
494 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
495 }
496
497 var marshalled []byte
498 var err error
499 // Create the Sarama producer message
500 if marshalled, err = proto.Marshal(protoMsg); err != nil {
501 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
502 return err
503 }
504 key := ""
505 if len(keys) > 0 {
506 key = keys[0] // Only the first key is relevant
507 }
508 kafkaMsg := &sarama.ProducerMessage{
509 Topic: topic.Name,
510 Key: sarama.StringEncoder(key),
511 Value: sarama.ByteEncoder(marshalled),
512 }
513
514 // Send message to kafka
515 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500516 // Wait for result
517 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
518 select {
519 case ok := <-sc.producer.Successes():
khenaidoo297cd252019-02-07 22:10:23 -0500520 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Bakeree6a0872019-10-29 15:59:52 -0700521 sc.updateLiveness(true)
khenaidoo90847922018-12-03 14:47:51 -0500522 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500523 log.Debugw("error-sending", log.Fields{"status": notOk})
Scott Bakeree6a0872019-10-29 15:59:52 -0700524 if strings.Contains(notOk.Error(), "Failed to produce") {
525 sc.updateLiveness(false)
526 }
527 return notOk
528 }
529 return nil
530}
531
532// Enable the liveness monitor channel. This channel will report
533// a "true" or "false" on every publish, which indicates whether
534// or not the channel is still live. This channel is then picked up
535// by the service (i.e. rw_core / ro_core) to update readiness status
536// and/or take other actions.
537func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
538 log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
539 if enable {
540 if sc.liveness == nil {
541 log.Info("kafka-create-liveness-channel")
542 // At least 1, so we can immediately post to it without blocking
543 // Setting a bigger number (10) allows the monitor to fall behind
544 // without blocking others. The monitor shouldn't really fall
545 // behind...
546 sc.liveness = make(chan bool, 10)
547 // post intial state to the channel
548 sc.liveness <- sc.alive
549 }
550 } else {
551 // TODO: Think about whether we need the ability to turn off
552 // liveness monitoring
553 panic("Turning off liveness reporting is not supported")
554 }
555 return sc.liveness
556}
557
558// send an empty message on the liveness channel to check whether connectivity has
559// been restored.
560func (sc *SaramaClient) SendLiveness() error {
561 if !sc.started {
562 return fmt.Errorf("SendLiveness() called while not started")
563 }
564
565 kafkaMsg := &sarama.ProducerMessage{
566 Topic: "_liveness_test",
567 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
568 }
569
570 // Send message to kafka
571 sc.producer.Input() <- kafkaMsg
572 // Wait for result
573 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
574 select {
575 case ok := <-sc.producer.Successes():
576 log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
577 sc.updateLiveness(true)
578 case notOk := <-sc.producer.Errors():
579 log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
580 if strings.Contains(notOk.Error(), "Failed to produce") {
581 sc.updateLiveness(false)
582 }
khenaidoo90847922018-12-03 14:47:51 -0500583 return notOk
584 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500585 return nil
586}
587
khenaidooca301322019-01-09 23:06:32 -0500588// getGroupId returns the group id from the key-value args.
589func getGroupId(kvArgs ...*KVArg) string {
590 for _, arg := range kvArgs {
591 if arg.Key == GroupIdKey {
592 return arg.Value.(string)
593 }
594 }
595 return ""
596}
597
khenaidoo731697e2019-01-29 16:03:29 -0500598// getOffset returns the offset from the key-value args.
599func getOffset(kvArgs ...*KVArg) int64 {
600 for _, arg := range kvArgs {
601 if arg.Key == Offset {
602 return arg.Value.(int64)
603 }
604 }
605 return sarama.OffsetNewest
606}
607
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500608func (sc *SaramaClient) createClusterAdmin() error {
609 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
610 config := sarama.NewConfig()
611 config.Version = sarama.V1_0_0_0
612
613 // Create a cluster Admin
614 var cAdmin sarama.ClusterAdmin
615 var err error
616 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
617 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
618 return err
619 }
620 sc.cAdmin = cAdmin
621 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500622}
623
khenaidood2b6df92018-12-13 16:37:20 -0500624func (sc *SaramaClient) lockTopic(topic *Topic) {
625 sc.lockOfTopicLockMap.Lock()
626 if _, exist := sc.topicLockMap[topic.Name]; exist {
627 sc.lockOfTopicLockMap.Unlock()
628 sc.topicLockMap[topic.Name].Lock()
629 } else {
630 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
631 sc.lockOfTopicLockMap.Unlock()
632 sc.topicLockMap[topic.Name].Lock()
633 }
634}
635
636func (sc *SaramaClient) unLockTopic(topic *Topic) {
637 sc.lockOfTopicLockMap.Lock()
638 defer sc.lockOfTopicLockMap.Unlock()
639 if _, exist := sc.topicLockMap[topic.Name]; exist {
640 sc.topicLockMap[topic.Name].Unlock()
641 }
642}
643
khenaidoo43c82122018-11-22 18:38:28 -0500644func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
645 sc.lockTopicToConsumerChannelMap.Lock()
646 defer sc.lockTopicToConsumerChannelMap.Unlock()
647 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
648 sc.topicToConsumerChannelMap[id] = arg
649 }
650}
651
652func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
653 sc.lockTopicToConsumerChannelMap.Lock()
654 defer sc.lockTopicToConsumerChannelMap.Unlock()
655 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
656 delete(sc.topicToConsumerChannelMap, id)
657 }
658}
659
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500660func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400661 sc.lockTopicToConsumerChannelMap.RLock()
662 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500663
664 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
665 return consumerCh
666 }
667 return nil
668}
669
khenaidoo79232702018-12-04 11:00:41 -0500670func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500671 sc.lockTopicToConsumerChannelMap.Lock()
672 defer sc.lockTopicToConsumerChannelMap.Unlock()
673 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
674 consumerCh.channels = append(consumerCh.channels, ch)
675 return
676 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500677 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
678}
679
680//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
681func closeConsumers(consumers []interface{}) error {
682 var err error
683 for _, consumer := range consumers {
684 // Is it a partition consumers?
685 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
686 if errTemp := partionConsumer.Close(); errTemp != nil {
687 log.Debugw("partition!!!", log.Fields{"err": errTemp})
688 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
689 // This can occur on race condition
690 err = nil
691 } else {
692 err = errTemp
693 }
694 }
695 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
696 if errTemp := groupConsumer.Close(); errTemp != nil {
697 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
698 // This can occur on race condition
699 err = nil
700 } else {
701 err = errTemp
702 }
703 }
704 }
705 }
706 return err
khenaidoo43c82122018-11-22 18:38:28 -0500707}
708
khenaidoo79232702018-12-04 11:00:41 -0500709func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500710 sc.lockTopicToConsumerChannelMap.Lock()
711 defer sc.lockTopicToConsumerChannelMap.Unlock()
712 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
713 // Channel will be closed in the removeChannel method
714 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500715 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500716 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500717 log.Debugw("closing-consumers", log.Fields{"topic": topic})
718 err := closeConsumers(consumerCh.consumers)
719 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500720 delete(sc.topicToConsumerChannelMap, topic.Name)
721 return err
722 }
723 return nil
724 }
725 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
726 return errors.New("topic-does-not-exist")
727}
728
729func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
730 sc.lockTopicToConsumerChannelMap.Lock()
731 defer sc.lockTopicToConsumerChannelMap.Unlock()
732 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
733 for _, ch := range consumerCh.channels {
734 // Channel will be closed in the removeChannel method
735 removeChannel(consumerCh.channels, ch)
736 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500737 err := closeConsumers(consumerCh.consumers)
738 //if err == sarama.ErrUnknownTopicOrPartition {
739 // // Not an error
740 // err = nil
741 //}
742 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500743 delete(sc.topicToConsumerChannelMap, topic.Name)
744 return err
745 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500746 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
747 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500748}
749
750func (sc *SaramaClient) clearConsumerChannelMap() error {
751 sc.lockTopicToConsumerChannelMap.Lock()
752 defer sc.lockTopicToConsumerChannelMap.Unlock()
753 var err error
754 for topic, consumerCh := range sc.topicToConsumerChannelMap {
755 for _, ch := range consumerCh.channels {
756 // Channel will be closed in the removeChannel method
757 removeChannel(consumerCh.channels, ch)
758 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500759 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
760 err = errTemp
761 }
762 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500763 delete(sc.topicToConsumerChannelMap, topic)
764 }
765 return err
766}
767
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500768//createPublisher creates the publisher which is used to send a message onto kafka
769func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500770 // This Creates the publisher
771 config := sarama.NewConfig()
772 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500773 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
774 config.Producer.Flush.Messages = sc.producerFlushMessages
775 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
776 config.Producer.Return.Errors = sc.producerReturnErrors
777 config.Producer.Return.Successes = sc.producerReturnSuccess
778 //config.Producer.RequiredAcks = sarama.WaitForAll
779 config.Producer.RequiredAcks = sarama.WaitForLocal
780
khenaidoo43c82122018-11-22 18:38:28 -0500781 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
782 brokers := []string{kafkaFullAddr}
783
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500784 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
785 log.Errorw("error-starting-publisher", log.Fields{"error": err})
786 return err
787 } else {
788 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500789 }
790 log.Info("Kafka-publisher-created")
791 return nil
792}
793
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500794func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500795 config := sarama.NewConfig()
796 config.Consumer.Return.Errors = true
797 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500798 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
799 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500800 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530801 config.Metadata.Retry.Max = sc.metadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500802 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
803 brokers := []string{kafkaFullAddr}
804
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500805 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
806 log.Errorw("error-starting-consumers", log.Fields{"error": err})
807 return err
808 } else {
809 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500810 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500811 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500812 return nil
813}
814
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500815// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500816func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500817 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500818 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500819 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Bakeree6a0872019-10-29 15:59:52 -0700820 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
821 config.Consumer.Return.Errors = true
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500822 //config.Group.Return.Notifications = false
823 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
824 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500825 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500826 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500827 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
828 brokers := []string{kafkaFullAddr}
829
khenaidoo43c82122018-11-22 18:38:28 -0500830 topics := []string{topic.Name}
831 var consumer *scc.Consumer
832 var err error
833
khenaidooca301322019-01-09 23:06:32 -0500834 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
835 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500836 return nil, err
837 }
khenaidooca301322019-01-09 23:06:32 -0500838 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500839
840 //sc.groupConsumers[topic.Name] = consumer
841 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500842 return consumer, nil
843}
844
khenaidoo43c82122018-11-22 18:38:28 -0500845// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500846// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500847func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500848 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400849 sc.lockTopicToConsumerChannelMap.RLock()
850 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500851 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500852 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500853 c <- protoMessage
854 }(ch)
855 }
856}
857
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500858func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
859 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500860startloop:
861 for {
862 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500863 case err, ok := <-consumer.Errors():
864 if ok {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500865 log.Warnw("partition-consumers-error", log.Fields{"error": err})
866 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500867 // Channel is closed
868 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500869 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500870 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500871 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500872 if !ok {
873 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500874 break startloop
875 }
876 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500877 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500878 if err := proto.Unmarshal(msgBody, icm); err != nil {
879 log.Warnw("partition-invalid-message", log.Fields{"error": err})
880 continue
881 }
882 go sc.dispatchToConsumers(consumerChnls, icm)
883 case <-sc.doneCh:
884 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
885 break startloop
886 }
887 }
888 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
889}
890
891func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
892 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
893
894startloop:
895 for {
896 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500897 case err, ok := <-consumer.Errors():
898 if ok {
Scott Bakeree6a0872019-10-29 15:59:52 -0700899 sc.updateLiveness(false)
khenaidooca301322019-01-09 23:06:32 -0500900 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500901 } else {
Scott Bakeree6a0872019-10-29 15:59:52 -0700902 log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500903 // channel is closed
904 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500905 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500906 case msg, ok := <-consumer.Messages():
907 if !ok {
Scott Bakeree6a0872019-10-29 15:59:52 -0700908 log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500909 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500910 break startloop
911 }
Scott Bakeree6a0872019-10-29 15:59:52 -0700912 sc.updateLiveness(true)
khenaidoo297cd252019-02-07 22:10:23 -0500913 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500914 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500915 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500916 if err := proto.Unmarshal(msgBody, icm); err != nil {
917 log.Warnw("invalid-message", log.Fields{"error": err})
918 continue
919 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500920 go sc.dispatchToConsumers(consumerChnls, icm)
921 consumer.MarkOffset(msg, "")
922 case ntf := <-consumer.Notifications():
923 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500924 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500925 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500926 break startloop
927 }
928 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500929 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500930}
931
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500932func (sc *SaramaClient) startConsumers(topic *Topic) error {
933 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
934 var consumerCh *consumerChannels
935 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
936 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
937 return errors.New("consumers-not-exist")
938 }
939 // For each consumer listening for that topic, start a consumption loop
940 for _, consumer := range consumerCh.consumers {
941 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
942 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
943 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
944 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
945 } else {
946 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
947 return errors.New("invalid-consumer")
948 }
949 }
950 return nil
951}
952
953//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
954//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500955func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500956 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500957 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500958
khenaidoo7ff26c72019-01-16 14:55:48 -0500959 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500960 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500961 return nil, err
962 }
963
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500964 consumersIf := make([]interface{}, 0)
965 for _, pConsumer := range pConsumers {
966 consumersIf = append(consumersIf, pConsumer)
967 }
968
969 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500970 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500971 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500972 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500973 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500974 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500975 }
976
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500977 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500978 sc.addTopicToConsumerChannelMap(topic.Name, cc)
979
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500980 //Start a consumers to listen on that specific topic
981 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500982
983 return consumerListeningChannel, nil
984}
985
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500986// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
987// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -0500988func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500989 // TODO: Replace this development partition consumers with a group consumers
990 var pConsumer *scc.Consumer
991 var err error
khenaidoo731697e2019-01-29 16:03:29 -0500992 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500993 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
994 return nil, err
995 }
996 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
997 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500998 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500999 cc := &consumerChannels{
1000 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -05001001 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001002 }
1003
1004 // Add the consumers channel to the map
1005 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1006
1007 //Start a consumers to listen on that specific topic
1008 go sc.startConsumers(topic)
1009
1010 return consumerListeningChannel, nil
1011}
1012
khenaidoo7ff26c72019-01-16 14:55:48 -05001013func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001014 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001015 partitionList, err := sc.consumer.Partitions(topic.Name)
1016 if err != nil {
1017 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1018 return nil, err
1019 }
1020
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001021 pConsumers := make([]sarama.PartitionConsumer, 0)
1022 for _, partition := range partitionList {
1023 var pConsumer sarama.PartitionConsumer
1024 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1025 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1026 return nil, err
1027 }
1028 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -05001029 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001030 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -05001031}
1032
khenaidoo79232702018-12-04 11:00:41 -05001033func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -05001034 var i int
khenaidoo79232702018-12-04 11:00:41 -05001035 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -05001036 for i, channel = range channels {
1037 if channel == ch {
1038 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1039 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -05001040 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -05001041 return channels[:len(channels)-1]
1042 }
1043 }
1044 return channels
1045}
khenaidoo7ff26c72019-01-16 14:55:48 -05001046
khenaidoo7ff26c72019-01-16 14:55:48 -05001047func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1048 sc.lockOfGroupConsumers.Lock()
1049 defer sc.lockOfGroupConsumers.Unlock()
1050 if _, exist := sc.groupConsumers[topic]; !exist {
1051 sc.groupConsumers[topic] = consumer
1052 }
1053}
1054
1055func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1056 sc.lockOfGroupConsumers.Lock()
1057 defer sc.lockOfGroupConsumers.Unlock()
1058 if _, exist := sc.groupConsumers[topic]; exist {
1059 consumer := sc.groupConsumers[topic]
1060 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -04001061 if err := consumer.Close(); err != nil {
khenaidoo7ff26c72019-01-16 14:55:48 -05001062 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
1063 return err
1064 }
1065 }
1066 return nil
1067}