blob: deb72fdf30b11c676c5c3f5f29d02c369ef588d1 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Pauldd23a992019-11-14 07:06:31 +000019 "context"
William Kurkianea869482019-04-09 15:16:11 -040020 "errors"
21 "fmt"
Esin Karamanccb714b2019-11-29 15:02:06 +000022 "strings"
23 "sync"
24 "time"
25
kdarapub26b4502019-10-05 03:02:33 +053026 "github.com/Shopify/sarama"
William Kurkianea869482019-04-09 15:16:11 -040027 scc "github.com/bsm/sarama-cluster"
Devmalya Pauldd23a992019-11-14 07:06:31 +000028 "github.com/eapache/go-resiliency/breaker"
William Kurkianea869482019-04-09 15:16:11 -040029 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
Esin Karamanccb714b2019-11-29 15:02:06 +000031 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
William Kurkianea869482019-04-09 15:16:11 -040033)
34
William Kurkianea869482019-04-09 15:16:11 -040035// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
36// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
37//consumer or a group consumer
38type consumerChannels struct {
39 consumers []interface{}
40 channels []chan *ic.InterContainerMessage
41}
42
npujarec5762e2020-01-01 14:08:48 +053043// static check to ensure SaramaClient implements Client
44var _ Client = &SaramaClient{}
45
William Kurkianea869482019-04-09 15:16:11 -040046// SaramaClient represents the messaging proxy
47type SaramaClient struct {
48 cAdmin sarama.ClusterAdmin
William Kurkianea869482019-04-09 15:16:11 -040049 KafkaHost string
50 KafkaPort int
51 producer sarama.AsyncProducer
52 consumer sarama.Consumer
53 groupConsumers map[string]*scc.Consumer
Matt Jeanneret384d8c92019-05-06 14:27:31 -040054 lockOfGroupConsumers sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040055 consumerGroupPrefix string
56 consumerType int
57 consumerGroupName string
58 producerFlushFrequency int
59 producerFlushMessages int
60 producerFlushMaxmessages int
61 producerRetryMax int
62 producerRetryBackOff time.Duration
63 producerReturnSuccess bool
64 producerReturnErrors bool
65 consumerMaxwait int
66 maxProcessingTime int
67 numPartitions int
68 numReplicas int
69 autoCreateTopic bool
70 doneCh chan int
npujarec5762e2020-01-01 14:08:48 +053071 metadataCallback func(fromTopic string, timestamp int64)
William Kurkianea869482019-04-09 15:16:11 -040072 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
74 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
Mahir Gunyele77977b2019-06-27 05:36:22 -070076 metadataMaxRetry int
cbabu95f21522019-11-13 14:25:18 +010077 alive bool
78 liveness chan bool
79 livenessChannelInterval time.Duration
80 lastLivenessTime time.Time
81 started bool
Scott Baker86fce9a2019-12-12 09:47:17 -080082 healthy bool
83 healthiness chan bool
William Kurkianea869482019-04-09 15:16:11 -040084}
85
86type SaramaClientOption func(*SaramaClient)
87
88func Host(host string) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaHost = host
91 }
92}
93
94func Port(port int) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.KafkaPort = port
97 }
98}
99
100func ConsumerGroupPrefix(prefix string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupPrefix = prefix
103 }
104}
105
106func ConsumerGroupName(name string) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerGroupName = name
109 }
110}
111
112func ConsumerType(consumer int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.consumerType = consumer
115 }
116}
117
118func ProducerFlushFrequency(frequency int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushFrequency = frequency
121 }
122}
123
124func ProducerFlushMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMessages = num
127 }
128}
129
130func ProducerFlushMaxMessages(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerFlushMaxmessages = num
133 }
134}
135
136func ProducerMaxRetries(num int) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryMax = num
139 }
140}
141
142func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerRetryBackOff = duration
145 }
146}
147
148func ProducerReturnOnErrors(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.producerReturnErrors = opt
151 }
152}
153
154func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.producerReturnSuccess = opt
157 }
158}
159
160func ConsumerMaxWait(wait int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.consumerMaxwait = wait
163 }
164}
165
166func MaxProcessingTime(pTime int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.maxProcessingTime = pTime
169 }
170}
171
172func NumPartitions(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numPartitions = number
175 }
176}
177
178func NumReplicas(number int) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.numReplicas = number
181 }
182}
183
184func AutoCreateTopic(opt bool) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.autoCreateTopic = opt
187 }
188}
189
Mahir Gunyele77977b2019-06-27 05:36:22 -0700190func MetadatMaxRetries(retry int) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.metadataMaxRetry = retry
193 }
194}
195
cbabu95f21522019-11-13 14:25:18 +0100196func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
197 return func(args *SaramaClient) {
198 args.livenessChannelInterval = opt
199 }
200}
201
William Kurkianea869482019-04-09 15:16:11 -0400202func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
203 client := &SaramaClient{
204 KafkaHost: DefaultKafkaHost,
205 KafkaPort: DefaultKafkaPort,
206 }
207 client.consumerType = DefaultConsumerType
208 client.producerFlushFrequency = DefaultProducerFlushFrequency
209 client.producerFlushMessages = DefaultProducerFlushMessages
210 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
211 client.producerReturnErrors = DefaultProducerReturnErrors
212 client.producerReturnSuccess = DefaultProducerReturnSuccess
213 client.producerRetryMax = DefaultProducerRetryMax
214 client.producerRetryBackOff = DefaultProducerRetryBackoff
215 client.consumerMaxwait = DefaultConsumerMaxwait
216 client.maxProcessingTime = DefaultMaxProcessingTime
217 client.numPartitions = DefaultNumberPartitions
218 client.numReplicas = DefaultNumberReplicas
219 client.autoCreateTopic = DefaultAutoCreateTopic
Mahir Gunyele77977b2019-06-27 05:36:22 -0700220 client.metadataMaxRetry = DefaultMetadataMaxRetry
cbabu95f21522019-11-13 14:25:18 +0100221 client.livenessChannelInterval = DefaultLivenessChannelInterval
William Kurkianea869482019-04-09 15:16:11 -0400222
223 for _, option := range opts {
224 option(client)
225 }
226
227 client.groupConsumers = make(map[string]*scc.Consumer)
228
229 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
230 client.topicLockMap = make(map[string]*sync.RWMutex)
231 client.lockOfTopicLockMap = sync.RWMutex{}
232 client.lockOfGroupConsumers = sync.RWMutex{}
cbabu95f21522019-11-13 14:25:18 +0100233
Scott Baker86fce9a2019-12-12 09:47:17 -0800234 // healthy and alive until proven otherwise
cbabu95f21522019-11-13 14:25:18 +0100235 client.alive = true
Scott Baker86fce9a2019-12-12 09:47:17 -0800236 client.healthy = true
cbabu95f21522019-11-13 14:25:18 +0100237
William Kurkianea869482019-04-09 15:16:11 -0400238 return client
239}
240
241func (sc *SaramaClient) Start() error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000242 logger.Info("Starting-kafka-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400243
244 // Create the Done channel
245 sc.doneCh = make(chan int, 1)
246
247 var err error
248
Devmalya Paul495b94a2019-08-27 19:42:00 -0400249 // Add a cleanup in case of failure to startup
250 defer func() {
251 if err != nil {
252 sc.Stop()
253 }
254 }()
255
William Kurkianea869482019-04-09 15:16:11 -0400256 // Create the Cluster Admin
257 if err = sc.createClusterAdmin(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000258 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400259 return err
260 }
261
262 // Create the Publisher
263 if err := sc.createPublisher(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000264 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400265 return err
266 }
267
268 if sc.consumerType == DefaultConsumerType {
269 // Create the master consumers
270 if err := sc.createConsumer(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000271 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400272 return err
273 }
274 }
275
276 // Create the topic to consumers/channel map
277 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
278
Esin Karamanccb714b2019-11-29 15:02:06 +0000279 logger.Info("kafka-sarama-client-started")
William Kurkianea869482019-04-09 15:16:11 -0400280
cbabu95f21522019-11-13 14:25:18 +0100281 sc.started = true
282
William Kurkianea869482019-04-09 15:16:11 -0400283 return nil
284}
285
286func (sc *SaramaClient) Stop() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000287 logger.Info("stopping-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400288
cbabu95f21522019-11-13 14:25:18 +0100289 sc.started = false
290
William Kurkianea869482019-04-09 15:16:11 -0400291 //Send a message over the done channel to close all long running routines
292 sc.doneCh <- 1
293
294 if sc.producer != nil {
295 if err := sc.producer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000296 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400297 }
298 }
299
300 if sc.consumer != nil {
301 if err := sc.consumer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000302 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400303 }
304 }
305
306 for key, val := range sc.groupConsumers {
Esin Karamanccb714b2019-11-29 15:02:06 +0000307 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400308 if err := val.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000309 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400310 }
311 }
312
313 if sc.cAdmin != nil {
314 if err := sc.cAdmin.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000315 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400316 }
317 }
318
319 //TODO: Clear the consumers map
320 //sc.clearConsumerChannelMap()
321
Esin Karamanccb714b2019-11-29 15:02:06 +0000322 logger.Info("sarama-client-stopped")
William Kurkianea869482019-04-09 15:16:11 -0400323}
324
325//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
326// the invoking function must hold the lock
327func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
328 // Set the topic details
329 topicDetail := &sarama.TopicDetail{}
330 topicDetail.NumPartitions = int32(numPartition)
331 topicDetail.ReplicationFactor = int16(repFactor)
332 topicDetail.ConfigEntries = make(map[string]*string)
333 topicDetails := make(map[string]*sarama.TopicDetail)
334 topicDetails[topic.Name] = topicDetail
335
336 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
337 if err == sarama.ErrTopicAlreadyExists {
338 // Not an error
Esin Karamanccb714b2019-11-29 15:02:06 +0000339 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400340 return nil
341 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000342 logger.Errorw("create-topic-failure", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400343 return err
344 }
345 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
346 // do so.
Esin Karamanccb714b2019-11-29 15:02:06 +0000347 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
William Kurkianea869482019-04-09 15:16:11 -0400348 return nil
349}
350
351//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
352// ensure no two go routines are performing operations on the same topic
353func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
354 sc.lockTopic(topic)
355 defer sc.unLockTopic(topic)
356
357 return sc.createTopic(topic, numPartition, repFactor)
358}
359
360//DeleteTopic removes a topic from the kafka Broker
361func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
362 sc.lockTopic(topic)
363 defer sc.unLockTopic(topic)
364
365 // Remove the topic from the broker
366 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
367 if err == sarama.ErrUnknownTopicOrPartition {
368 // Not an error as does not exist
Esin Karamanccb714b2019-11-29 15:02:06 +0000369 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400370 return nil
371 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000372 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400373 return err
374 }
375
376 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
377 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000378 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400379 return err
380 }
381 return nil
382}
383
384// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
385// messages from that topic
386func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
387 sc.lockTopic(topic)
388 defer sc.unLockTopic(topic)
389
Esin Karamanccb714b2019-11-29 15:02:06 +0000390 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400391
392 // If a consumers already exist for that topic then resuse it
393 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000394 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400395 // Create a channel specific for that consumers and add it to the consumers channel map
396 ch := make(chan *ic.InterContainerMessage)
397 sc.addChannelToConsumerChannelMap(topic, ch)
398 return ch, nil
399 }
400
401 // Register for the topic and set it up
402 var consumerListeningChannel chan *ic.InterContainerMessage
403 var err error
404
405 // Use the consumerType option to figure out the type of consumer to launch
406 if sc.consumerType == PartitionConsumer {
407 if sc.autoCreateTopic {
408 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000409 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400410 return nil, err
411 }
412 }
413 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000414 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400415 return nil, err
416 }
417 } else if sc.consumerType == GroupCustomer {
418 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
419 // does not consume from a precreated topic in some scenarios
420 //if sc.autoCreateTopic {
421 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000422 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400423 // return nil, err
424 // }
425 //}
426 //groupId := sc.consumerGroupName
427 groupId := getGroupId(kvArgs...)
428 // Include the group prefix
429 if groupId != "" {
430 groupId = sc.consumerGroupPrefix + groupId
431 } else {
432 // Need to use a unique group Id per topic
433 groupId = sc.consumerGroupPrefix + topic.Name
434 }
435 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000436 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400437 return nil, err
438 }
439
440 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000441 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
William Kurkianea869482019-04-09 15:16:11 -0400442 return nil, errors.New("unknown-consumer-type")
443 }
444
445 return consumerListeningChannel, nil
446}
447
448//UnSubscribe unsubscribe a consumer from a given topic
449func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
450 sc.lockTopic(topic)
451 defer sc.unLockTopic(topic)
452
Esin Karamanccb714b2019-11-29 15:02:06 +0000453 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400454 var err error
455 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000456 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400457 }
458 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000459 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400460 }
461 return err
462}
463
npujarec5762e2020-01-01 14:08:48 +0530464func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
465 sc.metadataCallback = callback
466}
467
cbabu95f21522019-11-13 14:25:18 +0100468func (sc *SaramaClient) updateLiveness(alive bool) {
469 // Post a consistent stream of liveness data to the channel,
470 // so that in a live state, the core does not timeout and
471 // send a forced liveness message. Production of liveness
472 // events to the channel is rate-limited by livenessChannelInterval.
473 if sc.liveness != nil {
474 if sc.alive != alive {
Esin Karamanccb714b2019-11-29 15:02:06 +0000475 logger.Info("update-liveness-channel-because-change")
cbabu95f21522019-11-13 14:25:18 +0100476 sc.liveness <- alive
477 sc.lastLivenessTime = time.Now()
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000478 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Esin Karamanccb714b2019-11-29 15:02:06 +0000479 logger.Info("update-liveness-channel-because-interval")
cbabu95f21522019-11-13 14:25:18 +0100480 sc.liveness <- alive
481 sc.lastLivenessTime = time.Now()
482 }
483 }
484
485 // Only emit a log message when the state changes
486 if sc.alive != alive {
Esin Karamanccb714b2019-11-29 15:02:06 +0000487 logger.Info("set-client-alive", log.Fields{"alive": alive})
cbabu95f21522019-11-13 14:25:18 +0100488 sc.alive = alive
489 }
490}
491
Scott Baker86fce9a2019-12-12 09:47:17 -0800492// Once unhealthy, we never go back
493func (sc *SaramaClient) setUnhealthy() {
494 sc.healthy = false
495 if sc.healthiness != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000496 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker86fce9a2019-12-12 09:47:17 -0800497 sc.healthiness <- sc.healthy
498 }
499}
500
Devmalya Pauldd23a992019-11-14 07:06:31 +0000501func (sc *SaramaClient) isLivenessError(err error) bool {
502 // Sarama producers and consumers encapsulate the error inside
503 // a ProducerError or ConsumerError struct.
504 if prodError, ok := err.(*sarama.ProducerError); ok {
505 err = prodError.Err
506 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
507 err = consumerError.Err
508 }
509
510 // Sarama-Cluster will compose the error into a ClusterError struct,
511 // which we can't do a compare by reference. To handle that, we the
512 // best we can do is compare the error strings.
513
514 switch err.Error() {
515 case context.DeadlineExceeded.Error():
Esin Karamanccb714b2019-11-29 15:02:06 +0000516 logger.Info("is-liveness-error-timeout")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000517 return true
518 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Esin Karamanccb714b2019-11-29 15:02:06 +0000519 logger.Info("is-liveness-error-no-brokers")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000520 return true
521 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Esin Karamanccb714b2019-11-29 15:02:06 +0000522 logger.Info("is-liveness-error-shutting-down")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000523 return true
524 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Esin Karamanccb714b2019-11-29 15:02:06 +0000525 logger.Info("is-liveness-error-not-available")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000526 return true
527 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Esin Karamanccb714b2019-11-29 15:02:06 +0000528 logger.Info("is-liveness-error-circuit-breaker-open")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000529 return true
530 }
531
532 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Esin Karamanccb714b2019-11-29 15:02:06 +0000533 logger.Info("is-liveness-error-connection-refused")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000534 return true
535 }
536
Scott Bakeree7c0a02020-01-07 11:12:26 -0800537 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Esin Karamanccb714b2019-11-29 15:02:06 +0000538 logger.Info("is-liveness-error-io-timeout")
Scott Bakeree7c0a02020-01-07 11:12:26 -0800539 return true
540 }
541
Devmalya Pauldd23a992019-11-14 07:06:31 +0000542 // Other errors shouldn't trigger a loss of liveness
543
Esin Karamanccb714b2019-11-29 15:02:06 +0000544 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000545
546 return false
547}
548
William Kurkianea869482019-04-09 15:16:11 -0400549// send formats and sends the request onto the kafka messaging bus.
550func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
551
552 // Assert message is a proto message
553 var protoMsg proto.Message
554 var ok bool
555 // ascertain the value interface type is a proto.Message
556 if protoMsg, ok = msg.(proto.Message); !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000557 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000558 return fmt.Errorf("not-a-proto-msg-%s", msg)
William Kurkianea869482019-04-09 15:16:11 -0400559 }
560
561 var marshalled []byte
562 var err error
563 // Create the Sarama producer message
564 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000565 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400566 return err
567 }
568 key := ""
569 if len(keys) > 0 {
570 key = keys[0] // Only the first key is relevant
571 }
572 kafkaMsg := &sarama.ProducerMessage{
573 Topic: topic.Name,
574 Key: sarama.StringEncoder(key),
575 Value: sarama.ByteEncoder(marshalled),
576 }
577
578 // Send message to kafka
579 sc.producer.Input() <- kafkaMsg
William Kurkianea869482019-04-09 15:16:11 -0400580 // Wait for result
581 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
582 select {
583 case ok := <-sc.producer.Successes():
Esin Karamanccb714b2019-11-29 15:02:06 +0000584 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
cbabu95f21522019-11-13 14:25:18 +0100585 sc.updateLiveness(true)
William Kurkianea869482019-04-09 15:16:11 -0400586 case notOk := <-sc.producer.Errors():
Esin Karamanccb714b2019-11-29 15:02:06 +0000587 logger.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000588 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100589 sc.updateLiveness(false)
590 }
591 return notOk
592 }
593 return nil
594}
595
596// Enable the liveness monitor channel. This channel will report
597// a "true" or "false" on every publish, which indicates whether
598// or not the channel is still live. This channel is then picked up
599// by the service (i.e. rw_core / ro_core) to update readiness status
600// and/or take other actions.
601func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
Esin Karamanccb714b2019-11-29 15:02:06 +0000602 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
cbabu95f21522019-11-13 14:25:18 +0100603 if enable {
604 if sc.liveness == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000605 logger.Info("kafka-create-liveness-channel")
cbabu95f21522019-11-13 14:25:18 +0100606 // At least 1, so we can immediately post to it without blocking
607 // Setting a bigger number (10) allows the monitor to fall behind
608 // without blocking others. The monitor shouldn't really fall
609 // behind...
610 sc.liveness = make(chan bool, 10)
611 // post intial state to the channel
612 sc.liveness <- sc.alive
613 }
614 } else {
615 // TODO: Think about whether we need the ability to turn off
616 // liveness monitoring
617 panic("Turning off liveness reporting is not supported")
618 }
619 return sc.liveness
620}
621
Scott Baker86fce9a2019-12-12 09:47:17 -0800622// Enable the Healthiness monitor channel. This channel will report "false"
623// if the kafka consumers die, or some other problem occurs which is
624// catastrophic that would require re-creating the client.
625func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
Esin Karamanccb714b2019-11-29 15:02:06 +0000626 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker86fce9a2019-12-12 09:47:17 -0800627 if enable {
628 if sc.healthiness == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000629 logger.Info("kafka-create-healthiness-channel")
Scott Baker86fce9a2019-12-12 09:47:17 -0800630 // At least 1, so we can immediately post to it without blocking
631 // Setting a bigger number (10) allows the monitor to fall behind
632 // without blocking others. The monitor shouldn't really fall
633 // behind...
634 sc.healthiness = make(chan bool, 10)
635 // post intial state to the channel
636 sc.healthiness <- sc.healthy
637 }
638 } else {
639 // TODO: Think about whether we need the ability to turn off
640 // liveness monitoring
641 panic("Turning off healthiness reporting is not supported")
642 }
643 return sc.healthiness
644}
645
cbabu95f21522019-11-13 14:25:18 +0100646// send an empty message on the liveness channel to check whether connectivity has
647// been restored.
648func (sc *SaramaClient) SendLiveness() error {
649 if !sc.started {
650 return fmt.Errorf("SendLiveness() called while not started")
651 }
652
653 kafkaMsg := &sarama.ProducerMessage{
654 Topic: "_liveness_test",
655 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
656 }
657
658 // Send message to kafka
659 sc.producer.Input() <- kafkaMsg
660 // Wait for result
661 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
662 select {
663 case ok := <-sc.producer.Successes():
Esin Karamanccb714b2019-11-29 15:02:06 +0000664 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
cbabu95f21522019-11-13 14:25:18 +0100665 sc.updateLiveness(true)
666 case notOk := <-sc.producer.Errors():
Esin Karamanccb714b2019-11-29 15:02:06 +0000667 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000668 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100669 sc.updateLiveness(false)
670 }
William Kurkianea869482019-04-09 15:16:11 -0400671 return notOk
672 }
673 return nil
674}
675
676// getGroupId returns the group id from the key-value args.
677func getGroupId(kvArgs ...*KVArg) string {
678 for _, arg := range kvArgs {
679 if arg.Key == GroupIdKey {
680 return arg.Value.(string)
681 }
682 }
683 return ""
684}
685
686// getOffset returns the offset from the key-value args.
687func getOffset(kvArgs ...*KVArg) int64 {
688 for _, arg := range kvArgs {
689 if arg.Key == Offset {
690 return arg.Value.(int64)
691 }
692 }
693 return sarama.OffsetNewest
694}
695
696func (sc *SaramaClient) createClusterAdmin() error {
697 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
698 config := sarama.NewConfig()
699 config.Version = sarama.V1_0_0_0
700
701 // Create a cluster Admin
702 var cAdmin sarama.ClusterAdmin
703 var err error
704 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000705 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
William Kurkianea869482019-04-09 15:16:11 -0400706 return err
707 }
708 sc.cAdmin = cAdmin
709 return nil
710}
711
712func (sc *SaramaClient) lockTopic(topic *Topic) {
713 sc.lockOfTopicLockMap.Lock()
714 if _, exist := sc.topicLockMap[topic.Name]; exist {
715 sc.lockOfTopicLockMap.Unlock()
716 sc.topicLockMap[topic.Name].Lock()
717 } else {
718 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
719 sc.lockOfTopicLockMap.Unlock()
720 sc.topicLockMap[topic.Name].Lock()
721 }
722}
723
724func (sc *SaramaClient) unLockTopic(topic *Topic) {
725 sc.lockOfTopicLockMap.Lock()
726 defer sc.lockOfTopicLockMap.Unlock()
727 if _, exist := sc.topicLockMap[topic.Name]; exist {
728 sc.topicLockMap[topic.Name].Unlock()
729 }
730}
731
732func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
733 sc.lockTopicToConsumerChannelMap.Lock()
734 defer sc.lockTopicToConsumerChannelMap.Unlock()
735 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
736 sc.topicToConsumerChannelMap[id] = arg
737 }
738}
739
William Kurkianea869482019-04-09 15:16:11 -0400740func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
741 sc.lockTopicToConsumerChannelMap.RLock()
742 defer sc.lockTopicToConsumerChannelMap.RUnlock()
743
744 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
745 return consumerCh
746 }
747 return nil
748}
749
750func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
751 sc.lockTopicToConsumerChannelMap.Lock()
752 defer sc.lockTopicToConsumerChannelMap.Unlock()
753 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
754 consumerCh.channels = append(consumerCh.channels, ch)
755 return
756 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000757 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400758}
759
760//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
761func closeConsumers(consumers []interface{}) error {
762 var err error
763 for _, consumer := range consumers {
764 // Is it a partition consumers?
765 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
766 if errTemp := partionConsumer.Close(); errTemp != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000767 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
William Kurkianea869482019-04-09 15:16:11 -0400768 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
769 // This can occur on race condition
770 err = nil
771 } else {
772 err = errTemp
773 }
774 }
775 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
776 if errTemp := groupConsumer.Close(); errTemp != nil {
777 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
778 // This can occur on race condition
779 err = nil
780 } else {
781 err = errTemp
782 }
783 }
784 }
785 }
786 return err
787}
788
789func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
790 sc.lockTopicToConsumerChannelMap.Lock()
791 defer sc.lockTopicToConsumerChannelMap.Unlock()
792 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
793 // Channel will be closed in the removeChannel method
794 consumerCh.channels = removeChannel(consumerCh.channels, ch)
795 // If there are no more channels then we can close the consumers itself
796 if len(consumerCh.channels) == 0 {
Esin Karamanccb714b2019-11-29 15:02:06 +0000797 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400798 err := closeConsumers(consumerCh.consumers)
799 //err := consumerCh.consumers.Close()
800 delete(sc.topicToConsumerChannelMap, topic.Name)
801 return err
802 }
803 return nil
804 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000805 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400806 return errors.New("topic-does-not-exist")
807}
808
809func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
810 sc.lockTopicToConsumerChannelMap.Lock()
811 defer sc.lockTopicToConsumerChannelMap.Unlock()
812 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
813 for _, ch := range consumerCh.channels {
814 // Channel will be closed in the removeChannel method
815 removeChannel(consumerCh.channels, ch)
816 }
817 err := closeConsumers(consumerCh.consumers)
818 //if err == sarama.ErrUnknownTopicOrPartition {
819 // // Not an error
820 // err = nil
821 //}
822 //err := consumerCh.consumers.Close()
823 delete(sc.topicToConsumerChannelMap, topic.Name)
824 return err
825 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000826 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400827 return nil
828}
829
William Kurkianea869482019-04-09 15:16:11 -0400830//createPublisher creates the publisher which is used to send a message onto kafka
831func (sc *SaramaClient) createPublisher() error {
832 // This Creates the publisher
833 config := sarama.NewConfig()
834 config.Producer.Partitioner = sarama.NewRandomPartitioner
835 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
836 config.Producer.Flush.Messages = sc.producerFlushMessages
837 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
838 config.Producer.Return.Errors = sc.producerReturnErrors
839 config.Producer.Return.Successes = sc.producerReturnSuccess
840 //config.Producer.RequiredAcks = sarama.WaitForAll
841 config.Producer.RequiredAcks = sarama.WaitForLocal
842
843 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
844 brokers := []string{kafkaFullAddr}
845
846 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000847 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400848 return err
849 } else {
850 sc.producer = producer
851 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000852 logger.Info("Kafka-publisher-created")
William Kurkianea869482019-04-09 15:16:11 -0400853 return nil
854}
855
856func (sc *SaramaClient) createConsumer() error {
857 config := sarama.NewConfig()
858 config.Consumer.Return.Errors = true
859 config.Consumer.Fetch.Min = 1
860 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
861 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
862 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Mahir Gunyele77977b2019-06-27 05:36:22 -0700863 config.Metadata.Retry.Max = sc.metadataMaxRetry
William Kurkianea869482019-04-09 15:16:11 -0400864 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
865 brokers := []string{kafkaFullAddr}
866
867 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000868 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400869 return err
870 } else {
871 sc.consumer = consumer
872 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000873 logger.Info("Kafka-consumers-created")
William Kurkianea869482019-04-09 15:16:11 -0400874 return nil
875}
876
877// createGroupConsumer creates a consumers group
878func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
879 config := scc.NewConfig()
880 config.ClientID = uuid.New().String()
881 config.Group.Mode = scc.ConsumerModeMultiplex
cbabu95f21522019-11-13 14:25:18 +0100882 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
883 config.Consumer.Return.Errors = true
William Kurkianea869482019-04-09 15:16:11 -0400884 //config.Group.Return.Notifications = false
885 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
886 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
887 config.Consumer.Offsets.Initial = initialOffset
888 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
889 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
890 brokers := []string{kafkaFullAddr}
891
892 topics := []string{topic.Name}
893 var consumer *scc.Consumer
894 var err error
895
896 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000897 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400898 return nil, err
899 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000900 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400901
902 //sc.groupConsumers[topic.Name] = consumer
903 sc.addToGroupConsumers(topic.Name, consumer)
904 return consumer, nil
905}
906
907// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
908// topic via the unique channel each subscriber received during subscription
909func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
910 // Need to go over all channels and publish messages to them - do we need to copy msg?
911 sc.lockTopicToConsumerChannelMap.RLock()
William Kurkianea869482019-04-09 15:16:11 -0400912 for _, ch := range consumerCh.channels {
913 go func(c chan *ic.InterContainerMessage) {
914 c <- protoMessage
915 }(ch)
916 }
npujarec5762e2020-01-01 14:08:48 +0530917 sc.lockTopicToConsumerChannelMap.RUnlock()
918
919 if callback := sc.metadataCallback; callback != nil {
920 callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
921 }
William Kurkianea869482019-04-09 15:16:11 -0400922}
923
924func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000925 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400926startloop:
927 for {
928 select {
929 case err, ok := <-consumer.Errors():
930 if ok {
cbabu116b73f2019-12-10 17:56:32 +0530931 if sc.isLivenessError(err) {
932 sc.updateLiveness(false)
Esin Karamanccb714b2019-11-29 15:02:06 +0000933 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
cbabu116b73f2019-12-10 17:56:32 +0530934 }
William Kurkianea869482019-04-09 15:16:11 -0400935 } else {
936 // Channel is closed
937 break startloop
938 }
939 case msg, ok := <-consumer.Messages():
Esin Karamanccb714b2019-11-29 15:02:06 +0000940 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400941 if !ok {
942 // channel is closed
943 break startloop
944 }
945 msgBody := msg.Value
cbabu116b73f2019-12-10 17:56:32 +0530946 sc.updateLiveness(true)
Esin Karamanccb714b2019-11-29 15:02:06 +0000947 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400948 icm := &ic.InterContainerMessage{}
949 if err := proto.Unmarshal(msgBody, icm); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000950 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400951 continue
952 }
953 go sc.dispatchToConsumers(consumerChnls, icm)
954 case <-sc.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +0000955 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400956 break startloop
957 }
958 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000959 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -0800960 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -0400961}
962
963func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000964 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400965
966startloop:
967 for {
968 select {
969 case err, ok := <-consumer.Errors():
970 if ok {
Devmalya Pauldd23a992019-11-14 07:06:31 +0000971 if sc.isLivenessError(err) {
972 sc.updateLiveness(false)
973 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000974 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400975 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000976 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400977 // channel is closed
978 break startloop
979 }
980 case msg, ok := <-consumer.Messages():
981 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000982 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400983 // Channel closed
984 break startloop
985 }
cbabu95f21522019-11-13 14:25:18 +0100986 sc.updateLiveness(true)
Esin Karamanccb714b2019-11-29 15:02:06 +0000987 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400988 msgBody := msg.Value
989 icm := &ic.InterContainerMessage{}
990 if err := proto.Unmarshal(msgBody, icm); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000991 logger.Warnw("invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400992 continue
993 }
994 go sc.dispatchToConsumers(consumerChnls, icm)
995 consumer.MarkOffset(msg, "")
996 case ntf := <-consumer.Notifications():
Esin Karamanccb714b2019-11-29 15:02:06 +0000997 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
William Kurkianea869482019-04-09 15:16:11 -0400998 case <-sc.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +0000999 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001000 break startloop
1001 }
1002 }
Esin Karamanccb714b2019-11-29 15:02:06 +00001003 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -08001004 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -04001005}
1006
1007func (sc *SaramaClient) startConsumers(topic *Topic) error {
Esin Karamanccb714b2019-11-29 15:02:06 +00001008 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001009 var consumerCh *consumerChannels
1010 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001011 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001012 return errors.New("consumers-not-exist")
1013 }
1014 // For each consumer listening for that topic, start a consumption loop
1015 for _, consumer := range consumerCh.consumers {
1016 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1017 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1018 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1019 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1020 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +00001021 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -04001022 return errors.New("invalid-consumer")
1023 }
1024 }
1025 return nil
1026}
1027
1028//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1029//// for that topic. It also starts the routine that listens for messages on that topic.
1030func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1031 var pConsumers []sarama.PartitionConsumer
1032 var err error
1033
1034 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001035 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001036 return nil, err
1037 }
1038
1039 consumersIf := make([]interface{}, 0)
1040 for _, pConsumer := range pConsumers {
1041 consumersIf = append(consumersIf, pConsumer)
1042 }
1043
1044 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1045 // unbuffered to verify race conditions.
1046 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1047 cc := &consumerChannels{
1048 consumers: consumersIf,
1049 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1050 }
1051
1052 // Add the consumers channel to the map
1053 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1054
1055 //Start a consumers to listen on that specific topic
Rohan Agrawal02f784d2020-02-14 09:34:02 +00001056 go func() {
1057 if err := sc.startConsumers(topic); err != nil {
1058 logger.Errorw("start-consumers-failed", log.Fields{
1059 "topic": topic,
1060 "error": err})
1061 }
1062 }()
William Kurkianea869482019-04-09 15:16:11 -04001063
1064 return consumerListeningChannel, nil
1065}
1066
1067// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1068// for that topic. It also starts the routine that listens for messages on that topic.
1069func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1070 // TODO: Replace this development partition consumers with a group consumers
1071 var pConsumer *scc.Consumer
1072 var err error
1073 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001074 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001075 return nil, err
1076 }
1077 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1078 // unbuffered to verify race conditions.
1079 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1080 cc := &consumerChannels{
1081 consumers: []interface{}{pConsumer},
1082 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1083 }
1084
1085 // Add the consumers channel to the map
1086 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1087
1088 //Start a consumers to listen on that specific topic
Rohan Agrawal02f784d2020-02-14 09:34:02 +00001089 go func() {
1090 if err := sc.startConsumers(topic); err != nil {
1091 logger.Errorw("start-consumers-failed", log.Fields{
1092 "topic": topic,
1093 "error": err})
1094 }
1095 }()
William Kurkianea869482019-04-09 15:16:11 -04001096
1097 return consumerListeningChannel, nil
1098}
1099
1100func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +00001101 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001102 partitionList, err := sc.consumer.Partitions(topic.Name)
1103 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001104 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001105 return nil, err
1106 }
1107
1108 pConsumers := make([]sarama.PartitionConsumer, 0)
1109 for _, partition := range partitionList {
1110 var pConsumer sarama.PartitionConsumer
1111 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001112 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001113 return nil, err
1114 }
1115 pConsumers = append(pConsumers, pConsumer)
1116 }
1117 return pConsumers, nil
1118}
1119
1120func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1121 var i int
1122 var channel chan *ic.InterContainerMessage
1123 for i, channel = range channels {
1124 if channel == ch {
1125 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1126 close(channel)
Esin Karamanccb714b2019-11-29 15:02:06 +00001127 logger.Debug("channel-closed")
William Kurkianea869482019-04-09 15:16:11 -04001128 return channels[:len(channels)-1]
1129 }
1130 }
1131 return channels
1132}
1133
William Kurkianea869482019-04-09 15:16:11 -04001134func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1135 sc.lockOfGroupConsumers.Lock()
1136 defer sc.lockOfGroupConsumers.Unlock()
1137 if _, exist := sc.groupConsumers[topic]; !exist {
1138 sc.groupConsumers[topic] = consumer
1139 }
1140}
1141
1142func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1143 sc.lockOfGroupConsumers.Lock()
1144 defer sc.lockOfGroupConsumers.Unlock()
1145 if _, exist := sc.groupConsumers[topic]; exist {
1146 consumer := sc.groupConsumers[topic]
1147 delete(sc.groupConsumers, topic)
Matt Jeanneret384d8c92019-05-06 14:27:31 -04001148 if err := consumer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001149 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001150 return err
1151 }
1152 }
1153 return nil
1154}