blob: 468e546ec4222f304dd2a99ab40d454fa34995e5 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Pauldd23a992019-11-14 07:06:31 +000019 "context"
William Kurkianea869482019-04-09 15:16:11 -040020 "errors"
21 "fmt"
Esin Karamanccb714b2019-11-29 15:02:06 +000022 "strings"
23 "sync"
24 "time"
25
kdarapub26b4502019-10-05 03:02:33 +053026 "github.com/Shopify/sarama"
William Kurkianea869482019-04-09 15:16:11 -040027 scc "github.com/bsm/sarama-cluster"
Devmalya Pauldd23a992019-11-14 07:06:31 +000028 "github.com/eapache/go-resiliency/breaker"
William Kurkianea869482019-04-09 15:16:11 -040029 "github.com/golang/protobuf/proto"
Scott Bakered4a8e72020-04-17 11:10:20 -070030 "github.com/golang/protobuf/ptypes"
William Kurkianea869482019-04-09 15:16:11 -040031 "github.com/google/uuid"
Esin Karamanccb714b2019-11-29 15:02:06 +000032 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
William Kurkianea869482019-04-09 15:16:11 -040034)
35
William Kurkianea869482019-04-09 15:16:11 -040036// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
37// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
38//consumer or a group consumer
39type consumerChannels struct {
40 consumers []interface{}
41 channels []chan *ic.InterContainerMessage
42}
43
npujarec5762e2020-01-01 14:08:48 +053044// static check to ensure SaramaClient implements Client
45var _ Client = &SaramaClient{}
46
William Kurkianea869482019-04-09 15:16:11 -040047// SaramaClient represents the messaging proxy
48type SaramaClient struct {
49 cAdmin sarama.ClusterAdmin
William Kurkianea869482019-04-09 15:16:11 -040050 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumers map[string]*scc.Consumer
Matt Jeanneret384d8c92019-05-06 14:27:31 -040055 lockOfGroupConsumers sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040056 consumerGroupPrefix string
57 consumerType int
58 consumerGroupName string
59 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
71 doneCh chan int
Scott Bakered4a8e72020-04-17 11:10:20 -070072 metadataCallback func(fromTopic string, timestamp time.Time)
William Kurkianea869482019-04-09 15:16:11 -040073 topicToConsumerChannelMap map[string]*consumerChannels
74 lockTopicToConsumerChannelMap sync.RWMutex
75 topicLockMap map[string]*sync.RWMutex
76 lockOfTopicLockMap sync.RWMutex
Mahir Gunyele77977b2019-06-27 05:36:22 -070077 metadataMaxRetry int
cbabu95f21522019-11-13 14:25:18 +010078 alive bool
79 liveness chan bool
80 livenessChannelInterval time.Duration
81 lastLivenessTime time.Time
82 started bool
Scott Baker86fce9a2019-12-12 09:47:17 -080083 healthy bool
84 healthiness chan bool
William Kurkianea869482019-04-09 15:16:11 -040085}
86
87type SaramaClientOption func(*SaramaClient)
88
89func Host(host string) SaramaClientOption {
90 return func(args *SaramaClient) {
91 args.KafkaHost = host
92 }
93}
94
95func Port(port int) SaramaClientOption {
96 return func(args *SaramaClient) {
97 args.KafkaPort = port
98 }
99}
100
101func ConsumerGroupPrefix(prefix string) SaramaClientOption {
102 return func(args *SaramaClient) {
103 args.consumerGroupPrefix = prefix
104 }
105}
106
107func ConsumerGroupName(name string) SaramaClientOption {
108 return func(args *SaramaClient) {
109 args.consumerGroupName = name
110 }
111}
112
113func ConsumerType(consumer int) SaramaClientOption {
114 return func(args *SaramaClient) {
115 args.consumerType = consumer
116 }
117}
118
119func ProducerFlushFrequency(frequency int) SaramaClientOption {
120 return func(args *SaramaClient) {
121 args.producerFlushFrequency = frequency
122 }
123}
124
125func ProducerFlushMessages(num int) SaramaClientOption {
126 return func(args *SaramaClient) {
127 args.producerFlushMessages = num
128 }
129}
130
131func ProducerFlushMaxMessages(num int) SaramaClientOption {
132 return func(args *SaramaClient) {
133 args.producerFlushMaxmessages = num
134 }
135}
136
137func ProducerMaxRetries(num int) SaramaClientOption {
138 return func(args *SaramaClient) {
139 args.producerRetryMax = num
140 }
141}
142
143func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
144 return func(args *SaramaClient) {
145 args.producerRetryBackOff = duration
146 }
147}
148
149func ProducerReturnOnErrors(opt bool) SaramaClientOption {
150 return func(args *SaramaClient) {
151 args.producerReturnErrors = opt
152 }
153}
154
155func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
156 return func(args *SaramaClient) {
157 args.producerReturnSuccess = opt
158 }
159}
160
161func ConsumerMaxWait(wait int) SaramaClientOption {
162 return func(args *SaramaClient) {
163 args.consumerMaxwait = wait
164 }
165}
166
167func MaxProcessingTime(pTime int) SaramaClientOption {
168 return func(args *SaramaClient) {
169 args.maxProcessingTime = pTime
170 }
171}
172
173func NumPartitions(number int) SaramaClientOption {
174 return func(args *SaramaClient) {
175 args.numPartitions = number
176 }
177}
178
179func NumReplicas(number int) SaramaClientOption {
180 return func(args *SaramaClient) {
181 args.numReplicas = number
182 }
183}
184
185func AutoCreateTopic(opt bool) SaramaClientOption {
186 return func(args *SaramaClient) {
187 args.autoCreateTopic = opt
188 }
189}
190
Mahir Gunyele77977b2019-06-27 05:36:22 -0700191func MetadatMaxRetries(retry int) SaramaClientOption {
192 return func(args *SaramaClient) {
193 args.metadataMaxRetry = retry
194 }
195}
196
cbabu95f21522019-11-13 14:25:18 +0100197func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
198 return func(args *SaramaClient) {
199 args.livenessChannelInterval = opt
200 }
201}
202
William Kurkianea869482019-04-09 15:16:11 -0400203func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
204 client := &SaramaClient{
205 KafkaHost: DefaultKafkaHost,
206 KafkaPort: DefaultKafkaPort,
207 }
208 client.consumerType = DefaultConsumerType
209 client.producerFlushFrequency = DefaultProducerFlushFrequency
210 client.producerFlushMessages = DefaultProducerFlushMessages
211 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
212 client.producerReturnErrors = DefaultProducerReturnErrors
213 client.producerReturnSuccess = DefaultProducerReturnSuccess
214 client.producerRetryMax = DefaultProducerRetryMax
215 client.producerRetryBackOff = DefaultProducerRetryBackoff
216 client.consumerMaxwait = DefaultConsumerMaxwait
217 client.maxProcessingTime = DefaultMaxProcessingTime
218 client.numPartitions = DefaultNumberPartitions
219 client.numReplicas = DefaultNumberReplicas
220 client.autoCreateTopic = DefaultAutoCreateTopic
Mahir Gunyele77977b2019-06-27 05:36:22 -0700221 client.metadataMaxRetry = DefaultMetadataMaxRetry
cbabu95f21522019-11-13 14:25:18 +0100222 client.livenessChannelInterval = DefaultLivenessChannelInterval
William Kurkianea869482019-04-09 15:16:11 -0400223
224 for _, option := range opts {
225 option(client)
226 }
227
228 client.groupConsumers = make(map[string]*scc.Consumer)
229
230 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
231 client.topicLockMap = make(map[string]*sync.RWMutex)
232 client.lockOfTopicLockMap = sync.RWMutex{}
233 client.lockOfGroupConsumers = sync.RWMutex{}
cbabu95f21522019-11-13 14:25:18 +0100234
Scott Baker86fce9a2019-12-12 09:47:17 -0800235 // healthy and alive until proven otherwise
cbabu95f21522019-11-13 14:25:18 +0100236 client.alive = true
Scott Baker86fce9a2019-12-12 09:47:17 -0800237 client.healthy = true
cbabu95f21522019-11-13 14:25:18 +0100238
William Kurkianea869482019-04-09 15:16:11 -0400239 return client
240}
241
242func (sc *SaramaClient) Start() error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000243 logger.Info("Starting-kafka-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400244
245 // Create the Done channel
246 sc.doneCh = make(chan int, 1)
247
248 var err error
249
Devmalya Paul495b94a2019-08-27 19:42:00 -0400250 // Add a cleanup in case of failure to startup
251 defer func() {
252 if err != nil {
253 sc.Stop()
254 }
255 }()
256
William Kurkianea869482019-04-09 15:16:11 -0400257 // Create the Cluster Admin
258 if err = sc.createClusterAdmin(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000259 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400260 return err
261 }
262
263 // Create the Publisher
264 if err := sc.createPublisher(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000265 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400266 return err
267 }
268
269 if sc.consumerType == DefaultConsumerType {
270 // Create the master consumers
271 if err := sc.createConsumer(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000272 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400273 return err
274 }
275 }
276
277 // Create the topic to consumers/channel map
278 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
279
Esin Karamanccb714b2019-11-29 15:02:06 +0000280 logger.Info("kafka-sarama-client-started")
William Kurkianea869482019-04-09 15:16:11 -0400281
cbabu95f21522019-11-13 14:25:18 +0100282 sc.started = true
283
William Kurkianea869482019-04-09 15:16:11 -0400284 return nil
285}
286
287func (sc *SaramaClient) Stop() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000288 logger.Info("stopping-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400289
cbabu95f21522019-11-13 14:25:18 +0100290 sc.started = false
291
William Kurkianea869482019-04-09 15:16:11 -0400292 //Send a message over the done channel to close all long running routines
293 sc.doneCh <- 1
294
295 if sc.producer != nil {
296 if err := sc.producer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000297 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400298 }
299 }
300
301 if sc.consumer != nil {
302 if err := sc.consumer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000303 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400304 }
305 }
306
307 for key, val := range sc.groupConsumers {
Esin Karamanccb714b2019-11-29 15:02:06 +0000308 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400309 if err := val.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000310 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400311 }
312 }
313
314 if sc.cAdmin != nil {
315 if err := sc.cAdmin.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000316 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400317 }
318 }
319
320 //TODO: Clear the consumers map
321 //sc.clearConsumerChannelMap()
322
Esin Karamanccb714b2019-11-29 15:02:06 +0000323 logger.Info("sarama-client-stopped")
William Kurkianea869482019-04-09 15:16:11 -0400324}
325
326//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
327// the invoking function must hold the lock
328func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
329 // Set the topic details
330 topicDetail := &sarama.TopicDetail{}
331 topicDetail.NumPartitions = int32(numPartition)
332 topicDetail.ReplicationFactor = int16(repFactor)
333 topicDetail.ConfigEntries = make(map[string]*string)
334 topicDetails := make(map[string]*sarama.TopicDetail)
335 topicDetails[topic.Name] = topicDetail
336
337 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
338 if err == sarama.ErrTopicAlreadyExists {
339 // Not an error
Esin Karamanccb714b2019-11-29 15:02:06 +0000340 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400341 return nil
342 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000343 logger.Errorw("create-topic-failure", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400344 return err
345 }
346 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
347 // do so.
Esin Karamanccb714b2019-11-29 15:02:06 +0000348 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
William Kurkianea869482019-04-09 15:16:11 -0400349 return nil
350}
351
352//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
353// ensure no two go routines are performing operations on the same topic
354func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
355 sc.lockTopic(topic)
356 defer sc.unLockTopic(topic)
357
358 return sc.createTopic(topic, numPartition, repFactor)
359}
360
361//DeleteTopic removes a topic from the kafka Broker
362func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
363 sc.lockTopic(topic)
364 defer sc.unLockTopic(topic)
365
366 // Remove the topic from the broker
367 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
368 if err == sarama.ErrUnknownTopicOrPartition {
369 // Not an error as does not exist
Esin Karamanccb714b2019-11-29 15:02:06 +0000370 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400371 return nil
372 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000373 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400374 return err
375 }
376
377 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
378 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000379 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400380 return err
381 }
382 return nil
383}
384
385// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
386// messages from that topic
387func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
388 sc.lockTopic(topic)
389 defer sc.unLockTopic(topic)
390
Esin Karamanccb714b2019-11-29 15:02:06 +0000391 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400392
393 // If a consumers already exist for that topic then resuse it
394 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000395 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400396 // Create a channel specific for that consumers and add it to the consumers channel map
397 ch := make(chan *ic.InterContainerMessage)
398 sc.addChannelToConsumerChannelMap(topic, ch)
399 return ch, nil
400 }
401
402 // Register for the topic and set it up
403 var consumerListeningChannel chan *ic.InterContainerMessage
404 var err error
405
406 // Use the consumerType option to figure out the type of consumer to launch
407 if sc.consumerType == PartitionConsumer {
408 if sc.autoCreateTopic {
409 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000410 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400411 return nil, err
412 }
413 }
414 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000415 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400416 return nil, err
417 }
418 } else if sc.consumerType == GroupCustomer {
419 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
420 // does not consume from a precreated topic in some scenarios
421 //if sc.autoCreateTopic {
422 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000423 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400424 // return nil, err
425 // }
426 //}
427 //groupId := sc.consumerGroupName
428 groupId := getGroupId(kvArgs...)
429 // Include the group prefix
430 if groupId != "" {
431 groupId = sc.consumerGroupPrefix + groupId
432 } else {
433 // Need to use a unique group Id per topic
434 groupId = sc.consumerGroupPrefix + topic.Name
435 }
436 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000437 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400438 return nil, err
439 }
440
441 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000442 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
William Kurkianea869482019-04-09 15:16:11 -0400443 return nil, errors.New("unknown-consumer-type")
444 }
445
446 return consumerListeningChannel, nil
447}
448
449//UnSubscribe unsubscribe a consumer from a given topic
450func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
451 sc.lockTopic(topic)
452 defer sc.unLockTopic(topic)
453
Esin Karamanccb714b2019-11-29 15:02:06 +0000454 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400455 var err error
456 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000457 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400458 }
459 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000460 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400461 }
462 return err
463}
464
Scott Bakered4a8e72020-04-17 11:10:20 -0700465func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp time.Time)) {
npujarec5762e2020-01-01 14:08:48 +0530466 sc.metadataCallback = callback
467}
468
cbabu95f21522019-11-13 14:25:18 +0100469func (sc *SaramaClient) updateLiveness(alive bool) {
470 // Post a consistent stream of liveness data to the channel,
471 // so that in a live state, the core does not timeout and
472 // send a forced liveness message. Production of liveness
473 // events to the channel is rate-limited by livenessChannelInterval.
474 if sc.liveness != nil {
475 if sc.alive != alive {
Esin Karamanccb714b2019-11-29 15:02:06 +0000476 logger.Info("update-liveness-channel-because-change")
cbabu95f21522019-11-13 14:25:18 +0100477 sc.liveness <- alive
478 sc.lastLivenessTime = time.Now()
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000479 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Esin Karamanccb714b2019-11-29 15:02:06 +0000480 logger.Info("update-liveness-channel-because-interval")
cbabu95f21522019-11-13 14:25:18 +0100481 sc.liveness <- alive
482 sc.lastLivenessTime = time.Now()
483 }
484 }
485
486 // Only emit a log message when the state changes
487 if sc.alive != alive {
Esin Karamanccb714b2019-11-29 15:02:06 +0000488 logger.Info("set-client-alive", log.Fields{"alive": alive})
cbabu95f21522019-11-13 14:25:18 +0100489 sc.alive = alive
490 }
491}
492
Scott Baker86fce9a2019-12-12 09:47:17 -0800493// Once unhealthy, we never go back
494func (sc *SaramaClient) setUnhealthy() {
495 sc.healthy = false
496 if sc.healthiness != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000497 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker86fce9a2019-12-12 09:47:17 -0800498 sc.healthiness <- sc.healthy
499 }
500}
501
Devmalya Pauldd23a992019-11-14 07:06:31 +0000502func (sc *SaramaClient) isLivenessError(err error) bool {
503 // Sarama producers and consumers encapsulate the error inside
504 // a ProducerError or ConsumerError struct.
505 if prodError, ok := err.(*sarama.ProducerError); ok {
506 err = prodError.Err
507 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
508 err = consumerError.Err
509 }
510
511 // Sarama-Cluster will compose the error into a ClusterError struct,
512 // which we can't do a compare by reference. To handle that, we the
513 // best we can do is compare the error strings.
514
515 switch err.Error() {
516 case context.DeadlineExceeded.Error():
Esin Karamanccb714b2019-11-29 15:02:06 +0000517 logger.Info("is-liveness-error-timeout")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000518 return true
519 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Esin Karamanccb714b2019-11-29 15:02:06 +0000520 logger.Info("is-liveness-error-no-brokers")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000521 return true
522 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Esin Karamanccb714b2019-11-29 15:02:06 +0000523 logger.Info("is-liveness-error-shutting-down")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000524 return true
525 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Esin Karamanccb714b2019-11-29 15:02:06 +0000526 logger.Info("is-liveness-error-not-available")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000527 return true
528 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Esin Karamanccb714b2019-11-29 15:02:06 +0000529 logger.Info("is-liveness-error-circuit-breaker-open")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000530 return true
531 }
532
533 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Esin Karamanccb714b2019-11-29 15:02:06 +0000534 logger.Info("is-liveness-error-connection-refused")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000535 return true
536 }
537
Scott Bakeree7c0a02020-01-07 11:12:26 -0800538 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Esin Karamanccb714b2019-11-29 15:02:06 +0000539 logger.Info("is-liveness-error-io-timeout")
Scott Bakeree7c0a02020-01-07 11:12:26 -0800540 return true
541 }
542
Devmalya Pauldd23a992019-11-14 07:06:31 +0000543 // Other errors shouldn't trigger a loss of liveness
544
Esin Karamanccb714b2019-11-29 15:02:06 +0000545 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000546
547 return false
548}
549
William Kurkianea869482019-04-09 15:16:11 -0400550// send formats and sends the request onto the kafka messaging bus.
551func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
552
553 // Assert message is a proto message
554 var protoMsg proto.Message
555 var ok bool
556 // ascertain the value interface type is a proto.Message
557 if protoMsg, ok = msg.(proto.Message); !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000558 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000559 return fmt.Errorf("not-a-proto-msg-%s", msg)
William Kurkianea869482019-04-09 15:16:11 -0400560 }
561
562 var marshalled []byte
563 var err error
564 // Create the Sarama producer message
565 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000566 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400567 return err
568 }
569 key := ""
570 if len(keys) > 0 {
571 key = keys[0] // Only the first key is relevant
572 }
573 kafkaMsg := &sarama.ProducerMessage{
574 Topic: topic.Name,
575 Key: sarama.StringEncoder(key),
576 Value: sarama.ByteEncoder(marshalled),
577 }
578
579 // Send message to kafka
580 sc.producer.Input() <- kafkaMsg
William Kurkianea869482019-04-09 15:16:11 -0400581 // Wait for result
582 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
583 select {
584 case ok := <-sc.producer.Successes():
Esin Karamanccb714b2019-11-29 15:02:06 +0000585 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
cbabu95f21522019-11-13 14:25:18 +0100586 sc.updateLiveness(true)
William Kurkianea869482019-04-09 15:16:11 -0400587 case notOk := <-sc.producer.Errors():
Esin Karamanccb714b2019-11-29 15:02:06 +0000588 logger.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000589 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100590 sc.updateLiveness(false)
591 }
592 return notOk
593 }
594 return nil
595}
596
597// Enable the liveness monitor channel. This channel will report
598// a "true" or "false" on every publish, which indicates whether
599// or not the channel is still live. This channel is then picked up
600// by the service (i.e. rw_core / ro_core) to update readiness status
601// and/or take other actions.
602func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
Esin Karamanccb714b2019-11-29 15:02:06 +0000603 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
cbabu95f21522019-11-13 14:25:18 +0100604 if enable {
605 if sc.liveness == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000606 logger.Info("kafka-create-liveness-channel")
cbabu95f21522019-11-13 14:25:18 +0100607 // At least 1, so we can immediately post to it without blocking
608 // Setting a bigger number (10) allows the monitor to fall behind
609 // without blocking others. The monitor shouldn't really fall
610 // behind...
611 sc.liveness = make(chan bool, 10)
612 // post intial state to the channel
613 sc.liveness <- sc.alive
614 }
615 } else {
616 // TODO: Think about whether we need the ability to turn off
617 // liveness monitoring
618 panic("Turning off liveness reporting is not supported")
619 }
620 return sc.liveness
621}
622
Scott Baker86fce9a2019-12-12 09:47:17 -0800623// Enable the Healthiness monitor channel. This channel will report "false"
624// if the kafka consumers die, or some other problem occurs which is
625// catastrophic that would require re-creating the client.
626func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
Esin Karamanccb714b2019-11-29 15:02:06 +0000627 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker86fce9a2019-12-12 09:47:17 -0800628 if enable {
629 if sc.healthiness == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000630 logger.Info("kafka-create-healthiness-channel")
Scott Baker86fce9a2019-12-12 09:47:17 -0800631 // At least 1, so we can immediately post to it without blocking
632 // Setting a bigger number (10) allows the monitor to fall behind
633 // without blocking others. The monitor shouldn't really fall
634 // behind...
635 sc.healthiness = make(chan bool, 10)
636 // post intial state to the channel
637 sc.healthiness <- sc.healthy
638 }
639 } else {
640 // TODO: Think about whether we need the ability to turn off
641 // liveness monitoring
642 panic("Turning off healthiness reporting is not supported")
643 }
644 return sc.healthiness
645}
646
cbabu95f21522019-11-13 14:25:18 +0100647// send an empty message on the liveness channel to check whether connectivity has
648// been restored.
649func (sc *SaramaClient) SendLiveness() error {
650 if !sc.started {
651 return fmt.Errorf("SendLiveness() called while not started")
652 }
653
654 kafkaMsg := &sarama.ProducerMessage{
655 Topic: "_liveness_test",
656 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
657 }
658
659 // Send message to kafka
660 sc.producer.Input() <- kafkaMsg
661 // Wait for result
662 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
663 select {
664 case ok := <-sc.producer.Successes():
Esin Karamanccb714b2019-11-29 15:02:06 +0000665 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
cbabu95f21522019-11-13 14:25:18 +0100666 sc.updateLiveness(true)
667 case notOk := <-sc.producer.Errors():
Esin Karamanccb714b2019-11-29 15:02:06 +0000668 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000669 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100670 sc.updateLiveness(false)
671 }
William Kurkianea869482019-04-09 15:16:11 -0400672 return notOk
673 }
674 return nil
675}
676
677// getGroupId returns the group id from the key-value args.
678func getGroupId(kvArgs ...*KVArg) string {
679 for _, arg := range kvArgs {
680 if arg.Key == GroupIdKey {
681 return arg.Value.(string)
682 }
683 }
684 return ""
685}
686
687// getOffset returns the offset from the key-value args.
688func getOffset(kvArgs ...*KVArg) int64 {
689 for _, arg := range kvArgs {
690 if arg.Key == Offset {
691 return arg.Value.(int64)
692 }
693 }
694 return sarama.OffsetNewest
695}
696
697func (sc *SaramaClient) createClusterAdmin() error {
698 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
699 config := sarama.NewConfig()
700 config.Version = sarama.V1_0_0_0
701
702 // Create a cluster Admin
703 var cAdmin sarama.ClusterAdmin
704 var err error
705 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000706 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
William Kurkianea869482019-04-09 15:16:11 -0400707 return err
708 }
709 sc.cAdmin = cAdmin
710 return nil
711}
712
713func (sc *SaramaClient) lockTopic(topic *Topic) {
714 sc.lockOfTopicLockMap.Lock()
715 if _, exist := sc.topicLockMap[topic.Name]; exist {
716 sc.lockOfTopicLockMap.Unlock()
717 sc.topicLockMap[topic.Name].Lock()
718 } else {
719 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
720 sc.lockOfTopicLockMap.Unlock()
721 sc.topicLockMap[topic.Name].Lock()
722 }
723}
724
725func (sc *SaramaClient) unLockTopic(topic *Topic) {
726 sc.lockOfTopicLockMap.Lock()
727 defer sc.lockOfTopicLockMap.Unlock()
728 if _, exist := sc.topicLockMap[topic.Name]; exist {
729 sc.topicLockMap[topic.Name].Unlock()
730 }
731}
732
733func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
734 sc.lockTopicToConsumerChannelMap.Lock()
735 defer sc.lockTopicToConsumerChannelMap.Unlock()
736 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
737 sc.topicToConsumerChannelMap[id] = arg
738 }
739}
740
William Kurkianea869482019-04-09 15:16:11 -0400741func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
742 sc.lockTopicToConsumerChannelMap.RLock()
743 defer sc.lockTopicToConsumerChannelMap.RUnlock()
744
745 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
746 return consumerCh
747 }
748 return nil
749}
750
751func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
752 sc.lockTopicToConsumerChannelMap.Lock()
753 defer sc.lockTopicToConsumerChannelMap.Unlock()
754 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
755 consumerCh.channels = append(consumerCh.channels, ch)
756 return
757 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000758 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400759}
760
761//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
762func closeConsumers(consumers []interface{}) error {
763 var err error
764 for _, consumer := range consumers {
765 // Is it a partition consumers?
766 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
767 if errTemp := partionConsumer.Close(); errTemp != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000768 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
William Kurkianea869482019-04-09 15:16:11 -0400769 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
770 // This can occur on race condition
771 err = nil
772 } else {
773 err = errTemp
774 }
775 }
776 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
777 if errTemp := groupConsumer.Close(); errTemp != nil {
778 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
779 // This can occur on race condition
780 err = nil
781 } else {
782 err = errTemp
783 }
784 }
785 }
786 }
787 return err
788}
789
790func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
791 sc.lockTopicToConsumerChannelMap.Lock()
792 defer sc.lockTopicToConsumerChannelMap.Unlock()
793 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
794 // Channel will be closed in the removeChannel method
795 consumerCh.channels = removeChannel(consumerCh.channels, ch)
796 // If there are no more channels then we can close the consumers itself
797 if len(consumerCh.channels) == 0 {
Esin Karamanccb714b2019-11-29 15:02:06 +0000798 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400799 err := closeConsumers(consumerCh.consumers)
800 //err := consumerCh.consumers.Close()
801 delete(sc.topicToConsumerChannelMap, topic.Name)
802 return err
803 }
804 return nil
805 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000806 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400807 return errors.New("topic-does-not-exist")
808}
809
810func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
811 sc.lockTopicToConsumerChannelMap.Lock()
812 defer sc.lockTopicToConsumerChannelMap.Unlock()
813 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
814 for _, ch := range consumerCh.channels {
815 // Channel will be closed in the removeChannel method
816 removeChannel(consumerCh.channels, ch)
817 }
818 err := closeConsumers(consumerCh.consumers)
819 //if err == sarama.ErrUnknownTopicOrPartition {
820 // // Not an error
821 // err = nil
822 //}
823 //err := consumerCh.consumers.Close()
824 delete(sc.topicToConsumerChannelMap, topic.Name)
825 return err
826 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000827 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400828 return nil
829}
830
William Kurkianea869482019-04-09 15:16:11 -0400831//createPublisher creates the publisher which is used to send a message onto kafka
832func (sc *SaramaClient) createPublisher() error {
833 // This Creates the publisher
834 config := sarama.NewConfig()
835 config.Producer.Partitioner = sarama.NewRandomPartitioner
836 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
837 config.Producer.Flush.Messages = sc.producerFlushMessages
838 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
839 config.Producer.Return.Errors = sc.producerReturnErrors
840 config.Producer.Return.Successes = sc.producerReturnSuccess
841 //config.Producer.RequiredAcks = sarama.WaitForAll
842 config.Producer.RequiredAcks = sarama.WaitForLocal
843
844 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
845 brokers := []string{kafkaFullAddr}
846
847 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000848 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400849 return err
850 } else {
851 sc.producer = producer
852 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000853 logger.Info("Kafka-publisher-created")
William Kurkianea869482019-04-09 15:16:11 -0400854 return nil
855}
856
857func (sc *SaramaClient) createConsumer() error {
858 config := sarama.NewConfig()
859 config.Consumer.Return.Errors = true
860 config.Consumer.Fetch.Min = 1
861 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
862 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
863 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Mahir Gunyele77977b2019-06-27 05:36:22 -0700864 config.Metadata.Retry.Max = sc.metadataMaxRetry
William Kurkianea869482019-04-09 15:16:11 -0400865 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
866 brokers := []string{kafkaFullAddr}
867
868 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000869 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400870 return err
871 } else {
872 sc.consumer = consumer
873 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000874 logger.Info("Kafka-consumers-created")
William Kurkianea869482019-04-09 15:16:11 -0400875 return nil
876}
877
878// createGroupConsumer creates a consumers group
879func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
880 config := scc.NewConfig()
881 config.ClientID = uuid.New().String()
882 config.Group.Mode = scc.ConsumerModeMultiplex
cbabu95f21522019-11-13 14:25:18 +0100883 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
884 config.Consumer.Return.Errors = true
William Kurkianea869482019-04-09 15:16:11 -0400885 //config.Group.Return.Notifications = false
886 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
887 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
888 config.Consumer.Offsets.Initial = initialOffset
889 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
890 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
891 brokers := []string{kafkaFullAddr}
892
893 topics := []string{topic.Name}
894 var consumer *scc.Consumer
895 var err error
896
897 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000898 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400899 return nil, err
900 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000901 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400902
903 //sc.groupConsumers[topic.Name] = consumer
904 sc.addToGroupConsumers(topic.Name, consumer)
905 return consumer, nil
906}
907
908// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
909// topic via the unique channel each subscriber received during subscription
910func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
911 // Need to go over all channels and publish messages to them - do we need to copy msg?
912 sc.lockTopicToConsumerChannelMap.RLock()
William Kurkianea869482019-04-09 15:16:11 -0400913 for _, ch := range consumerCh.channels {
914 go func(c chan *ic.InterContainerMessage) {
915 c <- protoMessage
916 }(ch)
917 }
npujarec5762e2020-01-01 14:08:48 +0530918 sc.lockTopicToConsumerChannelMap.RUnlock()
919
920 if callback := sc.metadataCallback; callback != nil {
Scott Bakered4a8e72020-04-17 11:10:20 -0700921 ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
922 callback(protoMessage.Header.FromTopic, ts)
npujarec5762e2020-01-01 14:08:48 +0530923 }
William Kurkianea869482019-04-09 15:16:11 -0400924}
925
926func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000927 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400928startloop:
929 for {
930 select {
931 case err, ok := <-consumer.Errors():
932 if ok {
cbabu116b73f2019-12-10 17:56:32 +0530933 if sc.isLivenessError(err) {
934 sc.updateLiveness(false)
Esin Karamanccb714b2019-11-29 15:02:06 +0000935 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
cbabu116b73f2019-12-10 17:56:32 +0530936 }
William Kurkianea869482019-04-09 15:16:11 -0400937 } else {
938 // Channel is closed
939 break startloop
940 }
941 case msg, ok := <-consumer.Messages():
Esin Karamanccb714b2019-11-29 15:02:06 +0000942 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400943 if !ok {
944 // channel is closed
945 break startloop
946 }
947 msgBody := msg.Value
cbabu116b73f2019-12-10 17:56:32 +0530948 sc.updateLiveness(true)
Esin Karamanccb714b2019-11-29 15:02:06 +0000949 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400950 icm := &ic.InterContainerMessage{}
951 if err := proto.Unmarshal(msgBody, icm); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000952 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400953 continue
954 }
955 go sc.dispatchToConsumers(consumerChnls, icm)
956 case <-sc.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +0000957 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400958 break startloop
959 }
960 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000961 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -0800962 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -0400963}
964
965func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000966 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400967
968startloop:
969 for {
970 select {
971 case err, ok := <-consumer.Errors():
972 if ok {
Devmalya Pauldd23a992019-11-14 07:06:31 +0000973 if sc.isLivenessError(err) {
974 sc.updateLiveness(false)
975 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000976 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400977 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000978 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400979 // channel is closed
980 break startloop
981 }
982 case msg, ok := <-consumer.Messages():
983 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000984 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400985 // Channel closed
986 break startloop
987 }
cbabu95f21522019-11-13 14:25:18 +0100988 sc.updateLiveness(true)
Esin Karamanccb714b2019-11-29 15:02:06 +0000989 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400990 msgBody := msg.Value
991 icm := &ic.InterContainerMessage{}
992 if err := proto.Unmarshal(msgBody, icm); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000993 logger.Warnw("invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400994 continue
995 }
996 go sc.dispatchToConsumers(consumerChnls, icm)
997 consumer.MarkOffset(msg, "")
998 case ntf := <-consumer.Notifications():
Esin Karamanccb714b2019-11-29 15:02:06 +0000999 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
William Kurkianea869482019-04-09 15:16:11 -04001000 case <-sc.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +00001001 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001002 break startloop
1003 }
1004 }
Esin Karamanccb714b2019-11-29 15:02:06 +00001005 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -08001006 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -04001007}
1008
1009func (sc *SaramaClient) startConsumers(topic *Topic) error {
Esin Karamanccb714b2019-11-29 15:02:06 +00001010 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001011 var consumerCh *consumerChannels
1012 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001013 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001014 return errors.New("consumers-not-exist")
1015 }
1016 // For each consumer listening for that topic, start a consumption loop
1017 for _, consumer := range consumerCh.consumers {
1018 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1019 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1020 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1021 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1022 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +00001023 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -04001024 return errors.New("invalid-consumer")
1025 }
1026 }
1027 return nil
1028}
1029
1030//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1031//// for that topic. It also starts the routine that listens for messages on that topic.
1032func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1033 var pConsumers []sarama.PartitionConsumer
1034 var err error
1035
1036 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001037 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001038 return nil, err
1039 }
1040
1041 consumersIf := make([]interface{}, 0)
1042 for _, pConsumer := range pConsumers {
1043 consumersIf = append(consumersIf, pConsumer)
1044 }
1045
1046 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1047 // unbuffered to verify race conditions.
1048 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1049 cc := &consumerChannels{
1050 consumers: consumersIf,
1051 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1052 }
1053
1054 // Add the consumers channel to the map
1055 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1056
1057 //Start a consumers to listen on that specific topic
Rohan Agrawal02f784d2020-02-14 09:34:02 +00001058 go func() {
1059 if err := sc.startConsumers(topic); err != nil {
1060 logger.Errorw("start-consumers-failed", log.Fields{
1061 "topic": topic,
1062 "error": err})
1063 }
1064 }()
William Kurkianea869482019-04-09 15:16:11 -04001065
1066 return consumerListeningChannel, nil
1067}
1068
1069// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1070// for that topic. It also starts the routine that listens for messages on that topic.
1071func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1072 // TODO: Replace this development partition consumers with a group consumers
1073 var pConsumer *scc.Consumer
1074 var err error
1075 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001076 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001077 return nil, err
1078 }
1079 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1080 // unbuffered to verify race conditions.
1081 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1082 cc := &consumerChannels{
1083 consumers: []interface{}{pConsumer},
1084 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1085 }
1086
1087 // Add the consumers channel to the map
1088 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1089
1090 //Start a consumers to listen on that specific topic
Rohan Agrawal02f784d2020-02-14 09:34:02 +00001091 go func() {
1092 if err := sc.startConsumers(topic); err != nil {
1093 logger.Errorw("start-consumers-failed", log.Fields{
1094 "topic": topic,
1095 "error": err})
1096 }
1097 }()
William Kurkianea869482019-04-09 15:16:11 -04001098
1099 return consumerListeningChannel, nil
1100}
1101
1102func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +00001103 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001104 partitionList, err := sc.consumer.Partitions(topic.Name)
1105 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001106 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001107 return nil, err
1108 }
1109
1110 pConsumers := make([]sarama.PartitionConsumer, 0)
1111 for _, partition := range partitionList {
1112 var pConsumer sarama.PartitionConsumer
1113 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001114 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001115 return nil, err
1116 }
1117 pConsumers = append(pConsumers, pConsumer)
1118 }
1119 return pConsumers, nil
1120}
1121
1122func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1123 var i int
1124 var channel chan *ic.InterContainerMessage
1125 for i, channel = range channels {
1126 if channel == ch {
1127 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1128 close(channel)
Esin Karamanccb714b2019-11-29 15:02:06 +00001129 logger.Debug("channel-closed")
William Kurkianea869482019-04-09 15:16:11 -04001130 return channels[:len(channels)-1]
1131 }
1132 }
1133 return channels
1134}
1135
William Kurkianea869482019-04-09 15:16:11 -04001136func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1137 sc.lockOfGroupConsumers.Lock()
1138 defer sc.lockOfGroupConsumers.Unlock()
1139 if _, exist := sc.groupConsumers[topic]; !exist {
1140 sc.groupConsumers[topic] = consumer
1141 }
1142}
1143
1144func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1145 sc.lockOfGroupConsumers.Lock()
1146 defer sc.lockOfGroupConsumers.Unlock()
1147 if _, exist := sc.groupConsumers[topic]; exist {
1148 consumer := sc.groupConsumers[topic]
1149 delete(sc.groupConsumers, topic)
Matt Jeanneret384d8c92019-05-06 14:27:31 -04001150 if err := consumer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001151 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001152 return err
1153 }
1154 }
1155 return nil
1156}