blob: c0c16f94bb7764216c38311008dd33ba38118871 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Pauldd23a992019-11-14 07:06:31 +000019 "context"
William Kurkianea869482019-04-09 15:16:11 -040020 "errors"
21 "fmt"
Esin Karamanccb714b2019-11-29 15:02:06 +000022 "strings"
23 "sync"
24 "time"
25
kdarapub26b4502019-10-05 03:02:33 +053026 "github.com/Shopify/sarama"
William Kurkianea869482019-04-09 15:16:11 -040027 scc "github.com/bsm/sarama-cluster"
Devmalya Pauldd23a992019-11-14 07:06:31 +000028 "github.com/eapache/go-resiliency/breaker"
William Kurkianea869482019-04-09 15:16:11 -040029 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
Esin Karamanccb714b2019-11-29 15:02:06 +000031 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
William Kurkianea869482019-04-09 15:16:11 -040033)
34
William Kurkianea869482019-04-09 15:16:11 -040035type returnErrorFunction func() error
36
37// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
38// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
39//consumer or a group consumer
40type consumerChannels struct {
41 consumers []interface{}
42 channels []chan *ic.InterContainerMessage
43}
44
npujarec5762e2020-01-01 14:08:48 +053045// static check to ensure SaramaClient implements Client
46var _ Client = &SaramaClient{}
47
William Kurkianea869482019-04-09 15:16:11 -040048// SaramaClient represents the messaging proxy
49type SaramaClient struct {
50 cAdmin sarama.ClusterAdmin
51 client sarama.Client
52 KafkaHost string
53 KafkaPort int
54 producer sarama.AsyncProducer
55 consumer sarama.Consumer
56 groupConsumers map[string]*scc.Consumer
Matt Jeanneret384d8c92019-05-06 14:27:31 -040057 lockOfGroupConsumers sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040058 consumerGroupPrefix string
59 consumerType int
60 consumerGroupName string
61 producerFlushFrequency int
62 producerFlushMessages int
63 producerFlushMaxmessages int
64 producerRetryMax int
65 producerRetryBackOff time.Duration
66 producerReturnSuccess bool
67 producerReturnErrors bool
68 consumerMaxwait int
69 maxProcessingTime int
70 numPartitions int
71 numReplicas int
72 autoCreateTopic bool
73 doneCh chan int
npujarec5762e2020-01-01 14:08:48 +053074 metadataCallback func(fromTopic string, timestamp int64)
William Kurkianea869482019-04-09 15:16:11 -040075 topicToConsumerChannelMap map[string]*consumerChannels
76 lockTopicToConsumerChannelMap sync.RWMutex
77 topicLockMap map[string]*sync.RWMutex
78 lockOfTopicLockMap sync.RWMutex
Mahir Gunyele77977b2019-06-27 05:36:22 -070079 metadataMaxRetry int
cbabu95f21522019-11-13 14:25:18 +010080 alive bool
81 liveness chan bool
82 livenessChannelInterval time.Duration
83 lastLivenessTime time.Time
84 started bool
Scott Baker86fce9a2019-12-12 09:47:17 -080085 healthy bool
86 healthiness chan bool
William Kurkianea869482019-04-09 15:16:11 -040087}
88
89type SaramaClientOption func(*SaramaClient)
90
91func Host(host string) SaramaClientOption {
92 return func(args *SaramaClient) {
93 args.KafkaHost = host
94 }
95}
96
97func Port(port int) SaramaClientOption {
98 return func(args *SaramaClient) {
99 args.KafkaPort = port
100 }
101}
102
103func ConsumerGroupPrefix(prefix string) SaramaClientOption {
104 return func(args *SaramaClient) {
105 args.consumerGroupPrefix = prefix
106 }
107}
108
109func ConsumerGroupName(name string) SaramaClientOption {
110 return func(args *SaramaClient) {
111 args.consumerGroupName = name
112 }
113}
114
115func ConsumerType(consumer int) SaramaClientOption {
116 return func(args *SaramaClient) {
117 args.consumerType = consumer
118 }
119}
120
121func ProducerFlushFrequency(frequency int) SaramaClientOption {
122 return func(args *SaramaClient) {
123 args.producerFlushFrequency = frequency
124 }
125}
126
127func ProducerFlushMessages(num int) SaramaClientOption {
128 return func(args *SaramaClient) {
129 args.producerFlushMessages = num
130 }
131}
132
133func ProducerFlushMaxMessages(num int) SaramaClientOption {
134 return func(args *SaramaClient) {
135 args.producerFlushMaxmessages = num
136 }
137}
138
139func ProducerMaxRetries(num int) SaramaClientOption {
140 return func(args *SaramaClient) {
141 args.producerRetryMax = num
142 }
143}
144
145func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
146 return func(args *SaramaClient) {
147 args.producerRetryBackOff = duration
148 }
149}
150
151func ProducerReturnOnErrors(opt bool) SaramaClientOption {
152 return func(args *SaramaClient) {
153 args.producerReturnErrors = opt
154 }
155}
156
157func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
158 return func(args *SaramaClient) {
159 args.producerReturnSuccess = opt
160 }
161}
162
163func ConsumerMaxWait(wait int) SaramaClientOption {
164 return func(args *SaramaClient) {
165 args.consumerMaxwait = wait
166 }
167}
168
169func MaxProcessingTime(pTime int) SaramaClientOption {
170 return func(args *SaramaClient) {
171 args.maxProcessingTime = pTime
172 }
173}
174
175func NumPartitions(number int) SaramaClientOption {
176 return func(args *SaramaClient) {
177 args.numPartitions = number
178 }
179}
180
181func NumReplicas(number int) SaramaClientOption {
182 return func(args *SaramaClient) {
183 args.numReplicas = number
184 }
185}
186
187func AutoCreateTopic(opt bool) SaramaClientOption {
188 return func(args *SaramaClient) {
189 args.autoCreateTopic = opt
190 }
191}
192
Mahir Gunyele77977b2019-06-27 05:36:22 -0700193func MetadatMaxRetries(retry int) SaramaClientOption {
194 return func(args *SaramaClient) {
195 args.metadataMaxRetry = retry
196 }
197}
198
cbabu95f21522019-11-13 14:25:18 +0100199func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
200 return func(args *SaramaClient) {
201 args.livenessChannelInterval = opt
202 }
203}
204
William Kurkianea869482019-04-09 15:16:11 -0400205func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
206 client := &SaramaClient{
207 KafkaHost: DefaultKafkaHost,
208 KafkaPort: DefaultKafkaPort,
209 }
210 client.consumerType = DefaultConsumerType
211 client.producerFlushFrequency = DefaultProducerFlushFrequency
212 client.producerFlushMessages = DefaultProducerFlushMessages
213 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
214 client.producerReturnErrors = DefaultProducerReturnErrors
215 client.producerReturnSuccess = DefaultProducerReturnSuccess
216 client.producerRetryMax = DefaultProducerRetryMax
217 client.producerRetryBackOff = DefaultProducerRetryBackoff
218 client.consumerMaxwait = DefaultConsumerMaxwait
219 client.maxProcessingTime = DefaultMaxProcessingTime
220 client.numPartitions = DefaultNumberPartitions
221 client.numReplicas = DefaultNumberReplicas
222 client.autoCreateTopic = DefaultAutoCreateTopic
Mahir Gunyele77977b2019-06-27 05:36:22 -0700223 client.metadataMaxRetry = DefaultMetadataMaxRetry
cbabu95f21522019-11-13 14:25:18 +0100224 client.livenessChannelInterval = DefaultLivenessChannelInterval
William Kurkianea869482019-04-09 15:16:11 -0400225
226 for _, option := range opts {
227 option(client)
228 }
229
230 client.groupConsumers = make(map[string]*scc.Consumer)
231
232 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
233 client.topicLockMap = make(map[string]*sync.RWMutex)
234 client.lockOfTopicLockMap = sync.RWMutex{}
235 client.lockOfGroupConsumers = sync.RWMutex{}
cbabu95f21522019-11-13 14:25:18 +0100236
Scott Baker86fce9a2019-12-12 09:47:17 -0800237 // healthy and alive until proven otherwise
cbabu95f21522019-11-13 14:25:18 +0100238 client.alive = true
Scott Baker86fce9a2019-12-12 09:47:17 -0800239 client.healthy = true
cbabu95f21522019-11-13 14:25:18 +0100240
William Kurkianea869482019-04-09 15:16:11 -0400241 return client
242}
243
244func (sc *SaramaClient) Start() error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000245 logger.Info("Starting-kafka-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400246
247 // Create the Done channel
248 sc.doneCh = make(chan int, 1)
249
250 var err error
251
Devmalya Paul495b94a2019-08-27 19:42:00 -0400252 // Add a cleanup in case of failure to startup
253 defer func() {
254 if err != nil {
255 sc.Stop()
256 }
257 }()
258
William Kurkianea869482019-04-09 15:16:11 -0400259 // Create the Cluster Admin
260 if err = sc.createClusterAdmin(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000261 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400262 return err
263 }
264
265 // Create the Publisher
266 if err := sc.createPublisher(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000267 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400268 return err
269 }
270
271 if sc.consumerType == DefaultConsumerType {
272 // Create the master consumers
273 if err := sc.createConsumer(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000274 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400275 return err
276 }
277 }
278
279 // Create the topic to consumers/channel map
280 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
281
Esin Karamanccb714b2019-11-29 15:02:06 +0000282 logger.Info("kafka-sarama-client-started")
William Kurkianea869482019-04-09 15:16:11 -0400283
cbabu95f21522019-11-13 14:25:18 +0100284 sc.started = true
285
William Kurkianea869482019-04-09 15:16:11 -0400286 return nil
287}
288
289func (sc *SaramaClient) Stop() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000290 logger.Info("stopping-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400291
cbabu95f21522019-11-13 14:25:18 +0100292 sc.started = false
293
William Kurkianea869482019-04-09 15:16:11 -0400294 //Send a message over the done channel to close all long running routines
295 sc.doneCh <- 1
296
297 if sc.producer != nil {
298 if err := sc.producer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000299 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400300 }
301 }
302
303 if sc.consumer != nil {
304 if err := sc.consumer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000305 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400306 }
307 }
308
309 for key, val := range sc.groupConsumers {
Esin Karamanccb714b2019-11-29 15:02:06 +0000310 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400311 if err := val.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000312 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400313 }
314 }
315
316 if sc.cAdmin != nil {
317 if err := sc.cAdmin.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000318 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400319 }
320 }
321
322 //TODO: Clear the consumers map
323 //sc.clearConsumerChannelMap()
324
Esin Karamanccb714b2019-11-29 15:02:06 +0000325 logger.Info("sarama-client-stopped")
William Kurkianea869482019-04-09 15:16:11 -0400326}
327
328//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
329// the invoking function must hold the lock
330func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
331 // Set the topic details
332 topicDetail := &sarama.TopicDetail{}
333 topicDetail.NumPartitions = int32(numPartition)
334 topicDetail.ReplicationFactor = int16(repFactor)
335 topicDetail.ConfigEntries = make(map[string]*string)
336 topicDetails := make(map[string]*sarama.TopicDetail)
337 topicDetails[topic.Name] = topicDetail
338
339 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
340 if err == sarama.ErrTopicAlreadyExists {
341 // Not an error
Esin Karamanccb714b2019-11-29 15:02:06 +0000342 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400343 return nil
344 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000345 logger.Errorw("create-topic-failure", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400346 return err
347 }
348 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
349 // do so.
Esin Karamanccb714b2019-11-29 15:02:06 +0000350 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
William Kurkianea869482019-04-09 15:16:11 -0400351 return nil
352}
353
354//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
355// ensure no two go routines are performing operations on the same topic
356func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
357 sc.lockTopic(topic)
358 defer sc.unLockTopic(topic)
359
360 return sc.createTopic(topic, numPartition, repFactor)
361}
362
363//DeleteTopic removes a topic from the kafka Broker
364func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
365 sc.lockTopic(topic)
366 defer sc.unLockTopic(topic)
367
368 // Remove the topic from the broker
369 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
370 if err == sarama.ErrUnknownTopicOrPartition {
371 // Not an error as does not exist
Esin Karamanccb714b2019-11-29 15:02:06 +0000372 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400373 return nil
374 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000375 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400376 return err
377 }
378
379 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
380 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000381 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400382 return err
383 }
384 return nil
385}
386
387// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
388// messages from that topic
389func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
390 sc.lockTopic(topic)
391 defer sc.unLockTopic(topic)
392
Esin Karamanccb714b2019-11-29 15:02:06 +0000393 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400394
395 // If a consumers already exist for that topic then resuse it
396 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000397 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400398 // Create a channel specific for that consumers and add it to the consumers channel map
399 ch := make(chan *ic.InterContainerMessage)
400 sc.addChannelToConsumerChannelMap(topic, ch)
401 return ch, nil
402 }
403
404 // Register for the topic and set it up
405 var consumerListeningChannel chan *ic.InterContainerMessage
406 var err error
407
408 // Use the consumerType option to figure out the type of consumer to launch
409 if sc.consumerType == PartitionConsumer {
410 if sc.autoCreateTopic {
411 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000412 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400413 return nil, err
414 }
415 }
416 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000417 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400418 return nil, err
419 }
420 } else if sc.consumerType == GroupCustomer {
421 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
422 // does not consume from a precreated topic in some scenarios
423 //if sc.autoCreateTopic {
424 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000425 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400426 // return nil, err
427 // }
428 //}
429 //groupId := sc.consumerGroupName
430 groupId := getGroupId(kvArgs...)
431 // Include the group prefix
432 if groupId != "" {
433 groupId = sc.consumerGroupPrefix + groupId
434 } else {
435 // Need to use a unique group Id per topic
436 groupId = sc.consumerGroupPrefix + topic.Name
437 }
438 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000439 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400440 return nil, err
441 }
442
443 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000444 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
William Kurkianea869482019-04-09 15:16:11 -0400445 return nil, errors.New("unknown-consumer-type")
446 }
447
448 return consumerListeningChannel, nil
449}
450
451//UnSubscribe unsubscribe a consumer from a given topic
452func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
453 sc.lockTopic(topic)
454 defer sc.unLockTopic(topic)
455
Esin Karamanccb714b2019-11-29 15:02:06 +0000456 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400457 var err error
458 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000459 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400460 }
461 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000462 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400463 }
464 return err
465}
466
npujarec5762e2020-01-01 14:08:48 +0530467func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
468 sc.metadataCallback = callback
469}
470
cbabu95f21522019-11-13 14:25:18 +0100471func (sc *SaramaClient) updateLiveness(alive bool) {
472 // Post a consistent stream of liveness data to the channel,
473 // so that in a live state, the core does not timeout and
474 // send a forced liveness message. Production of liveness
475 // events to the channel is rate-limited by livenessChannelInterval.
476 if sc.liveness != nil {
477 if sc.alive != alive {
Esin Karamanccb714b2019-11-29 15:02:06 +0000478 logger.Info("update-liveness-channel-because-change")
cbabu95f21522019-11-13 14:25:18 +0100479 sc.liveness <- alive
480 sc.lastLivenessTime = time.Now()
481 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
Esin Karamanccb714b2019-11-29 15:02:06 +0000482 logger.Info("update-liveness-channel-because-interval")
cbabu95f21522019-11-13 14:25:18 +0100483 sc.liveness <- alive
484 sc.lastLivenessTime = time.Now()
485 }
486 }
487
488 // Only emit a log message when the state changes
489 if sc.alive != alive {
Esin Karamanccb714b2019-11-29 15:02:06 +0000490 logger.Info("set-client-alive", log.Fields{"alive": alive})
cbabu95f21522019-11-13 14:25:18 +0100491 sc.alive = alive
492 }
493}
494
Scott Baker86fce9a2019-12-12 09:47:17 -0800495// Once unhealthy, we never go back
496func (sc *SaramaClient) setUnhealthy() {
497 sc.healthy = false
498 if sc.healthiness != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000499 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker86fce9a2019-12-12 09:47:17 -0800500 sc.healthiness <- sc.healthy
501 }
502}
503
Devmalya Pauldd23a992019-11-14 07:06:31 +0000504func (sc *SaramaClient) isLivenessError(err error) bool {
505 // Sarama producers and consumers encapsulate the error inside
506 // a ProducerError or ConsumerError struct.
507 if prodError, ok := err.(*sarama.ProducerError); ok {
508 err = prodError.Err
509 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
510 err = consumerError.Err
511 }
512
513 // Sarama-Cluster will compose the error into a ClusterError struct,
514 // which we can't do a compare by reference. To handle that, we the
515 // best we can do is compare the error strings.
516
517 switch err.Error() {
518 case context.DeadlineExceeded.Error():
Esin Karamanccb714b2019-11-29 15:02:06 +0000519 logger.Info("is-liveness-error-timeout")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000520 return true
521 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Esin Karamanccb714b2019-11-29 15:02:06 +0000522 logger.Info("is-liveness-error-no-brokers")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000523 return true
524 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Esin Karamanccb714b2019-11-29 15:02:06 +0000525 logger.Info("is-liveness-error-shutting-down")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000526 return true
527 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Esin Karamanccb714b2019-11-29 15:02:06 +0000528 logger.Info("is-liveness-error-not-available")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000529 return true
530 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Esin Karamanccb714b2019-11-29 15:02:06 +0000531 logger.Info("is-liveness-error-circuit-breaker-open")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000532 return true
533 }
534
535 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Esin Karamanccb714b2019-11-29 15:02:06 +0000536 logger.Info("is-liveness-error-connection-refused")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000537 return true
538 }
539
Scott Bakeree7c0a02020-01-07 11:12:26 -0800540 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Esin Karamanccb714b2019-11-29 15:02:06 +0000541 logger.Info("is-liveness-error-io-timeout")
Scott Bakeree7c0a02020-01-07 11:12:26 -0800542 return true
543 }
544
Devmalya Pauldd23a992019-11-14 07:06:31 +0000545 // Other errors shouldn't trigger a loss of liveness
546
Esin Karamanccb714b2019-11-29 15:02:06 +0000547 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000548
549 return false
550}
551
William Kurkianea869482019-04-09 15:16:11 -0400552// send formats and sends the request onto the kafka messaging bus.
553func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
554
555 // Assert message is a proto message
556 var protoMsg proto.Message
557 var ok bool
558 // ascertain the value interface type is a proto.Message
559 if protoMsg, ok = msg.(proto.Message); !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000560 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
William Kurkianea869482019-04-09 15:16:11 -0400561 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
562 }
563
564 var marshalled []byte
565 var err error
566 // Create the Sarama producer message
567 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000568 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400569 return err
570 }
571 key := ""
572 if len(keys) > 0 {
573 key = keys[0] // Only the first key is relevant
574 }
575 kafkaMsg := &sarama.ProducerMessage{
576 Topic: topic.Name,
577 Key: sarama.StringEncoder(key),
578 Value: sarama.ByteEncoder(marshalled),
579 }
580
581 // Send message to kafka
582 sc.producer.Input() <- kafkaMsg
William Kurkianea869482019-04-09 15:16:11 -0400583 // Wait for result
584 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
585 select {
586 case ok := <-sc.producer.Successes():
Esin Karamanccb714b2019-11-29 15:02:06 +0000587 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
cbabu95f21522019-11-13 14:25:18 +0100588 sc.updateLiveness(true)
William Kurkianea869482019-04-09 15:16:11 -0400589 case notOk := <-sc.producer.Errors():
Esin Karamanccb714b2019-11-29 15:02:06 +0000590 logger.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000591 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100592 sc.updateLiveness(false)
593 }
594 return notOk
595 }
596 return nil
597}
598
599// Enable the liveness monitor channel. This channel will report
600// a "true" or "false" on every publish, which indicates whether
601// or not the channel is still live. This channel is then picked up
602// by the service (i.e. rw_core / ro_core) to update readiness status
603// and/or take other actions.
604func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
Esin Karamanccb714b2019-11-29 15:02:06 +0000605 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
cbabu95f21522019-11-13 14:25:18 +0100606 if enable {
607 if sc.liveness == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000608 logger.Info("kafka-create-liveness-channel")
cbabu95f21522019-11-13 14:25:18 +0100609 // At least 1, so we can immediately post to it without blocking
610 // Setting a bigger number (10) allows the monitor to fall behind
611 // without blocking others. The monitor shouldn't really fall
612 // behind...
613 sc.liveness = make(chan bool, 10)
614 // post intial state to the channel
615 sc.liveness <- sc.alive
616 }
617 } else {
618 // TODO: Think about whether we need the ability to turn off
619 // liveness monitoring
620 panic("Turning off liveness reporting is not supported")
621 }
622 return sc.liveness
623}
624
Scott Baker86fce9a2019-12-12 09:47:17 -0800625// Enable the Healthiness monitor channel. This channel will report "false"
626// if the kafka consumers die, or some other problem occurs which is
627// catastrophic that would require re-creating the client.
628func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
Esin Karamanccb714b2019-11-29 15:02:06 +0000629 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker86fce9a2019-12-12 09:47:17 -0800630 if enable {
631 if sc.healthiness == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000632 logger.Info("kafka-create-healthiness-channel")
Scott Baker86fce9a2019-12-12 09:47:17 -0800633 // At least 1, so we can immediately post to it without blocking
634 // Setting a bigger number (10) allows the monitor to fall behind
635 // without blocking others. The monitor shouldn't really fall
636 // behind...
637 sc.healthiness = make(chan bool, 10)
638 // post intial state to the channel
639 sc.healthiness <- sc.healthy
640 }
641 } else {
642 // TODO: Think about whether we need the ability to turn off
643 // liveness monitoring
644 panic("Turning off healthiness reporting is not supported")
645 }
646 return sc.healthiness
647}
648
cbabu95f21522019-11-13 14:25:18 +0100649// send an empty message on the liveness channel to check whether connectivity has
650// been restored.
651func (sc *SaramaClient) SendLiveness() error {
652 if !sc.started {
653 return fmt.Errorf("SendLiveness() called while not started")
654 }
655
656 kafkaMsg := &sarama.ProducerMessage{
657 Topic: "_liveness_test",
658 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
659 }
660
661 // Send message to kafka
662 sc.producer.Input() <- kafkaMsg
663 // Wait for result
664 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
665 select {
666 case ok := <-sc.producer.Successes():
Esin Karamanccb714b2019-11-29 15:02:06 +0000667 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
cbabu95f21522019-11-13 14:25:18 +0100668 sc.updateLiveness(true)
669 case notOk := <-sc.producer.Errors():
Esin Karamanccb714b2019-11-29 15:02:06 +0000670 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000671 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100672 sc.updateLiveness(false)
673 }
William Kurkianea869482019-04-09 15:16:11 -0400674 return notOk
675 }
676 return nil
677}
678
679// getGroupId returns the group id from the key-value args.
680func getGroupId(kvArgs ...*KVArg) string {
681 for _, arg := range kvArgs {
682 if arg.Key == GroupIdKey {
683 return arg.Value.(string)
684 }
685 }
686 return ""
687}
688
689// getOffset returns the offset from the key-value args.
690func getOffset(kvArgs ...*KVArg) int64 {
691 for _, arg := range kvArgs {
692 if arg.Key == Offset {
693 return arg.Value.(int64)
694 }
695 }
696 return sarama.OffsetNewest
697}
698
699func (sc *SaramaClient) createClusterAdmin() error {
700 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
701 config := sarama.NewConfig()
702 config.Version = sarama.V1_0_0_0
703
704 // Create a cluster Admin
705 var cAdmin sarama.ClusterAdmin
706 var err error
707 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000708 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
William Kurkianea869482019-04-09 15:16:11 -0400709 return err
710 }
711 sc.cAdmin = cAdmin
712 return nil
713}
714
715func (sc *SaramaClient) lockTopic(topic *Topic) {
716 sc.lockOfTopicLockMap.Lock()
717 if _, exist := sc.topicLockMap[topic.Name]; exist {
718 sc.lockOfTopicLockMap.Unlock()
719 sc.topicLockMap[topic.Name].Lock()
720 } else {
721 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
722 sc.lockOfTopicLockMap.Unlock()
723 sc.topicLockMap[topic.Name].Lock()
724 }
725}
726
727func (sc *SaramaClient) unLockTopic(topic *Topic) {
728 sc.lockOfTopicLockMap.Lock()
729 defer sc.lockOfTopicLockMap.Unlock()
730 if _, exist := sc.topicLockMap[topic.Name]; exist {
731 sc.topicLockMap[topic.Name].Unlock()
732 }
733}
734
735func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
739 sc.topicToConsumerChannelMap[id] = arg
740 }
741}
742
743func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
744 sc.lockTopicToConsumerChannelMap.Lock()
745 defer sc.lockTopicToConsumerChannelMap.Unlock()
746 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
747 delete(sc.topicToConsumerChannelMap, id)
748 }
749}
750
751func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
752 sc.lockTopicToConsumerChannelMap.RLock()
753 defer sc.lockTopicToConsumerChannelMap.RUnlock()
754
755 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
756 return consumerCh
757 }
758 return nil
759}
760
761func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
762 sc.lockTopicToConsumerChannelMap.Lock()
763 defer sc.lockTopicToConsumerChannelMap.Unlock()
764 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
765 consumerCh.channels = append(consumerCh.channels, ch)
766 return
767 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000768 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400769}
770
771//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
772func closeConsumers(consumers []interface{}) error {
773 var err error
774 for _, consumer := range consumers {
775 // Is it a partition consumers?
776 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
777 if errTemp := partionConsumer.Close(); errTemp != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000778 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
William Kurkianea869482019-04-09 15:16:11 -0400779 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
780 // This can occur on race condition
781 err = nil
782 } else {
783 err = errTemp
784 }
785 }
786 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
787 if errTemp := groupConsumer.Close(); errTemp != nil {
788 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
789 // This can occur on race condition
790 err = nil
791 } else {
792 err = errTemp
793 }
794 }
795 }
796 }
797 return err
798}
799
800func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
801 sc.lockTopicToConsumerChannelMap.Lock()
802 defer sc.lockTopicToConsumerChannelMap.Unlock()
803 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
804 // Channel will be closed in the removeChannel method
805 consumerCh.channels = removeChannel(consumerCh.channels, ch)
806 // If there are no more channels then we can close the consumers itself
807 if len(consumerCh.channels) == 0 {
Esin Karamanccb714b2019-11-29 15:02:06 +0000808 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400809 err := closeConsumers(consumerCh.consumers)
810 //err := consumerCh.consumers.Close()
811 delete(sc.topicToConsumerChannelMap, topic.Name)
812 return err
813 }
814 return nil
815 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000816 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400817 return errors.New("topic-does-not-exist")
818}
819
820func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
821 sc.lockTopicToConsumerChannelMap.Lock()
822 defer sc.lockTopicToConsumerChannelMap.Unlock()
823 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
824 for _, ch := range consumerCh.channels {
825 // Channel will be closed in the removeChannel method
826 removeChannel(consumerCh.channels, ch)
827 }
828 err := closeConsumers(consumerCh.consumers)
829 //if err == sarama.ErrUnknownTopicOrPartition {
830 // // Not an error
831 // err = nil
832 //}
833 //err := consumerCh.consumers.Close()
834 delete(sc.topicToConsumerChannelMap, topic.Name)
835 return err
836 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000837 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400838 return nil
839}
840
841func (sc *SaramaClient) clearConsumerChannelMap() error {
842 sc.lockTopicToConsumerChannelMap.Lock()
843 defer sc.lockTopicToConsumerChannelMap.Unlock()
844 var err error
845 for topic, consumerCh := range sc.topicToConsumerChannelMap {
846 for _, ch := range consumerCh.channels {
847 // Channel will be closed in the removeChannel method
848 removeChannel(consumerCh.channels, ch)
849 }
850 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
851 err = errTemp
852 }
853 //err = consumerCh.consumers.Close()
854 delete(sc.topicToConsumerChannelMap, topic)
855 }
856 return err
857}
858
859//createPublisher creates the publisher which is used to send a message onto kafka
860func (sc *SaramaClient) createPublisher() error {
861 // This Creates the publisher
862 config := sarama.NewConfig()
863 config.Producer.Partitioner = sarama.NewRandomPartitioner
864 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
865 config.Producer.Flush.Messages = sc.producerFlushMessages
866 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
867 config.Producer.Return.Errors = sc.producerReturnErrors
868 config.Producer.Return.Successes = sc.producerReturnSuccess
869 //config.Producer.RequiredAcks = sarama.WaitForAll
870 config.Producer.RequiredAcks = sarama.WaitForLocal
871
872 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
873 brokers := []string{kafkaFullAddr}
874
875 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000876 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400877 return err
878 } else {
879 sc.producer = producer
880 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000881 logger.Info("Kafka-publisher-created")
William Kurkianea869482019-04-09 15:16:11 -0400882 return nil
883}
884
885func (sc *SaramaClient) createConsumer() error {
886 config := sarama.NewConfig()
887 config.Consumer.Return.Errors = true
888 config.Consumer.Fetch.Min = 1
889 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
890 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
891 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Mahir Gunyele77977b2019-06-27 05:36:22 -0700892 config.Metadata.Retry.Max = sc.metadataMaxRetry
William Kurkianea869482019-04-09 15:16:11 -0400893 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
894 brokers := []string{kafkaFullAddr}
895
896 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000897 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400898 return err
899 } else {
900 sc.consumer = consumer
901 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000902 logger.Info("Kafka-consumers-created")
William Kurkianea869482019-04-09 15:16:11 -0400903 return nil
904}
905
906// createGroupConsumer creates a consumers group
907func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
908 config := scc.NewConfig()
909 config.ClientID = uuid.New().String()
910 config.Group.Mode = scc.ConsumerModeMultiplex
cbabu95f21522019-11-13 14:25:18 +0100911 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
912 config.Consumer.Return.Errors = true
William Kurkianea869482019-04-09 15:16:11 -0400913 //config.Group.Return.Notifications = false
914 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
915 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
916 config.Consumer.Offsets.Initial = initialOffset
917 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
918 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
919 brokers := []string{kafkaFullAddr}
920
921 topics := []string{topic.Name}
922 var consumer *scc.Consumer
923 var err error
924
925 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000926 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400927 return nil, err
928 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000929 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400930
931 //sc.groupConsumers[topic.Name] = consumer
932 sc.addToGroupConsumers(topic.Name, consumer)
933 return consumer, nil
934}
935
936// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
937// topic via the unique channel each subscriber received during subscription
938func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
939 // Need to go over all channels and publish messages to them - do we need to copy msg?
940 sc.lockTopicToConsumerChannelMap.RLock()
William Kurkianea869482019-04-09 15:16:11 -0400941 for _, ch := range consumerCh.channels {
942 go func(c chan *ic.InterContainerMessage) {
943 c <- protoMessage
944 }(ch)
945 }
npujarec5762e2020-01-01 14:08:48 +0530946 sc.lockTopicToConsumerChannelMap.RUnlock()
947
948 if callback := sc.metadataCallback; callback != nil {
949 callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
950 }
William Kurkianea869482019-04-09 15:16:11 -0400951}
952
953func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000954 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400955startloop:
956 for {
957 select {
958 case err, ok := <-consumer.Errors():
959 if ok {
cbabu116b73f2019-12-10 17:56:32 +0530960 if sc.isLivenessError(err) {
961 sc.updateLiveness(false)
Esin Karamanccb714b2019-11-29 15:02:06 +0000962 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
cbabu116b73f2019-12-10 17:56:32 +0530963 }
William Kurkianea869482019-04-09 15:16:11 -0400964 } else {
965 // Channel is closed
966 break startloop
967 }
968 case msg, ok := <-consumer.Messages():
Esin Karamanccb714b2019-11-29 15:02:06 +0000969 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400970 if !ok {
971 // channel is closed
972 break startloop
973 }
974 msgBody := msg.Value
cbabu116b73f2019-12-10 17:56:32 +0530975 sc.updateLiveness(true)
Esin Karamanccb714b2019-11-29 15:02:06 +0000976 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400977 icm := &ic.InterContainerMessage{}
978 if err := proto.Unmarshal(msgBody, icm); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000979 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400980 continue
981 }
982 go sc.dispatchToConsumers(consumerChnls, icm)
983 case <-sc.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +0000984 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400985 break startloop
986 }
987 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000988 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -0800989 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -0400990}
991
992func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000993 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400994
995startloop:
996 for {
997 select {
998 case err, ok := <-consumer.Errors():
999 if ok {
Devmalya Pauldd23a992019-11-14 07:06:31 +00001000 if sc.isLivenessError(err) {
1001 sc.updateLiveness(false)
1002 }
Esin Karamanccb714b2019-11-29 15:02:06 +00001003 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
William Kurkianea869482019-04-09 15:16:11 -04001004 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +00001005 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001006 // channel is closed
1007 break startloop
1008 }
1009 case msg, ok := <-consumer.Messages():
1010 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +00001011 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001012 // Channel closed
1013 break startloop
1014 }
cbabu95f21522019-11-13 14:25:18 +01001015 sc.updateLiveness(true)
Esin Karamanccb714b2019-11-29 15:02:06 +00001016 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -04001017 msgBody := msg.Value
1018 icm := &ic.InterContainerMessage{}
1019 if err := proto.Unmarshal(msgBody, icm); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001020 logger.Warnw("invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001021 continue
1022 }
1023 go sc.dispatchToConsumers(consumerChnls, icm)
1024 consumer.MarkOffset(msg, "")
1025 case ntf := <-consumer.Notifications():
Esin Karamanccb714b2019-11-29 15:02:06 +00001026 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
William Kurkianea869482019-04-09 15:16:11 -04001027 case <-sc.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +00001028 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001029 break startloop
1030 }
1031 }
Esin Karamanccb714b2019-11-29 15:02:06 +00001032 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -08001033 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -04001034}
1035
1036func (sc *SaramaClient) startConsumers(topic *Topic) error {
Esin Karamanccb714b2019-11-29 15:02:06 +00001037 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001038 var consumerCh *consumerChannels
1039 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001040 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001041 return errors.New("consumers-not-exist")
1042 }
1043 // For each consumer listening for that topic, start a consumption loop
1044 for _, consumer := range consumerCh.consumers {
1045 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1046 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1047 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1048 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1049 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +00001050 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -04001051 return errors.New("invalid-consumer")
1052 }
1053 }
1054 return nil
1055}
1056
1057//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1058//// for that topic. It also starts the routine that listens for messages on that topic.
1059func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1060 var pConsumers []sarama.PartitionConsumer
1061 var err error
1062
1063 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001064 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001065 return nil, err
1066 }
1067
1068 consumersIf := make([]interface{}, 0)
1069 for _, pConsumer := range pConsumers {
1070 consumersIf = append(consumersIf, pConsumer)
1071 }
1072
1073 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1074 // unbuffered to verify race conditions.
1075 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1076 cc := &consumerChannels{
1077 consumers: consumersIf,
1078 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1079 }
1080
1081 // Add the consumers channel to the map
1082 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1083
1084 //Start a consumers to listen on that specific topic
1085 go sc.startConsumers(topic)
1086
1087 return consumerListeningChannel, nil
1088}
1089
1090// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1091// for that topic. It also starts the routine that listens for messages on that topic.
1092func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1093 // TODO: Replace this development partition consumers with a group consumers
1094 var pConsumer *scc.Consumer
1095 var err error
1096 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001097 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001098 return nil, err
1099 }
1100 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1101 // unbuffered to verify race conditions.
1102 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1103 cc := &consumerChannels{
1104 consumers: []interface{}{pConsumer},
1105 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1106 }
1107
1108 // Add the consumers channel to the map
1109 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1110
1111 //Start a consumers to listen on that specific topic
1112 go sc.startConsumers(topic)
1113
1114 return consumerListeningChannel, nil
1115}
1116
1117func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +00001118 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001119 partitionList, err := sc.consumer.Partitions(topic.Name)
1120 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001121 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001122 return nil, err
1123 }
1124
1125 pConsumers := make([]sarama.PartitionConsumer, 0)
1126 for _, partition := range partitionList {
1127 var pConsumer sarama.PartitionConsumer
1128 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001129 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001130 return nil, err
1131 }
1132 pConsumers = append(pConsumers, pConsumer)
1133 }
1134 return pConsumers, nil
1135}
1136
1137func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1138 var i int
1139 var channel chan *ic.InterContainerMessage
1140 for i, channel = range channels {
1141 if channel == ch {
1142 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1143 close(channel)
Esin Karamanccb714b2019-11-29 15:02:06 +00001144 logger.Debug("channel-closed")
William Kurkianea869482019-04-09 15:16:11 -04001145 return channels[:len(channels)-1]
1146 }
1147 }
1148 return channels
1149}
1150
William Kurkianea869482019-04-09 15:16:11 -04001151func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1152 sc.lockOfGroupConsumers.Lock()
1153 defer sc.lockOfGroupConsumers.Unlock()
1154 if _, exist := sc.groupConsumers[topic]; !exist {
1155 sc.groupConsumers[topic] = consumer
1156 }
1157}
1158
1159func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1160 sc.lockOfGroupConsumers.Lock()
1161 defer sc.lockOfGroupConsumers.Unlock()
1162 if _, exist := sc.groupConsumers[topic]; exist {
1163 consumer := sc.groupConsumers[topic]
1164 delete(sc.groupConsumers, topic)
Matt Jeanneret384d8c92019-05-06 14:27:31 -04001165 if err := consumer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001166 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001167 return err
1168 }
1169 }
1170 return nil
1171}