blob: 468e546ec4222f304dd2a99ab40d454fa34995e5 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080019 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070020 "errors"
21 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022 "strings"
23 "sync"
24 "time"
25
Scott Baker2c1c4822019-10-16 11:02:41 -070026 "github.com/Shopify/sarama"
27 scc "github.com/bsm/sarama-cluster"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080028 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070029 "github.com/golang/protobuf/proto"
Scott Baker84a55ce2020-04-17 10:11:30 -070030 "github.com/golang/protobuf/ptypes"
Scott Baker2c1c4822019-10-16 11:02:41 -070031 "github.com/google/uuid"
serkant.uluderyab38671c2019-11-01 09:35:38 -070032 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070034)
35
Scott Baker2c1c4822019-10-16 11:02:41 -070036// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
37// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
38//consumer or a group consumer
39type consumerChannels struct {
40 consumers []interface{}
41 channels []chan *ic.InterContainerMessage
42}
43
Kent Hagermanccfa2132019-12-17 13:29:34 -050044// static check to ensure SaramaClient implements Client
45var _ Client = &SaramaClient{}
46
Scott Baker2c1c4822019-10-16 11:02:41 -070047// SaramaClient represents the messaging proxy
48type SaramaClient struct {
49 cAdmin sarama.ClusterAdmin
Scott Baker2c1c4822019-10-16 11:02:41 -070050 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumers map[string]*scc.Consumer
55 lockOfGroupConsumers sync.RWMutex
56 consumerGroupPrefix string
57 consumerType int
58 consumerGroupName string
59 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
71 doneCh chan int
Scott Baker84a55ce2020-04-17 10:11:30 -070072 metadataCallback func(fromTopic string, timestamp time.Time)
Scott Baker2c1c4822019-10-16 11:02:41 -070073 topicToConsumerChannelMap map[string]*consumerChannels
74 lockTopicToConsumerChannelMap sync.RWMutex
75 topicLockMap map[string]*sync.RWMutex
76 lockOfTopicLockMap sync.RWMutex
77 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070078 alive bool
79 liveness chan bool
80 livenessChannelInterval time.Duration
81 lastLivenessTime time.Time
82 started bool
Scott Baker0fef6982019-12-12 09:49:42 -080083 healthy bool
84 healthiness chan bool
Scott Baker2c1c4822019-10-16 11:02:41 -070085}
86
87type SaramaClientOption func(*SaramaClient)
88
89func Host(host string) SaramaClientOption {
90 return func(args *SaramaClient) {
91 args.KafkaHost = host
92 }
93}
94
95func Port(port int) SaramaClientOption {
96 return func(args *SaramaClient) {
97 args.KafkaPort = port
98 }
99}
100
101func ConsumerGroupPrefix(prefix string) SaramaClientOption {
102 return func(args *SaramaClient) {
103 args.consumerGroupPrefix = prefix
104 }
105}
106
107func ConsumerGroupName(name string) SaramaClientOption {
108 return func(args *SaramaClient) {
109 args.consumerGroupName = name
110 }
111}
112
113func ConsumerType(consumer int) SaramaClientOption {
114 return func(args *SaramaClient) {
115 args.consumerType = consumer
116 }
117}
118
119func ProducerFlushFrequency(frequency int) SaramaClientOption {
120 return func(args *SaramaClient) {
121 args.producerFlushFrequency = frequency
122 }
123}
124
125func ProducerFlushMessages(num int) SaramaClientOption {
126 return func(args *SaramaClient) {
127 args.producerFlushMessages = num
128 }
129}
130
131func ProducerFlushMaxMessages(num int) SaramaClientOption {
132 return func(args *SaramaClient) {
133 args.producerFlushMaxmessages = num
134 }
135}
136
137func ProducerMaxRetries(num int) SaramaClientOption {
138 return func(args *SaramaClient) {
139 args.producerRetryMax = num
140 }
141}
142
143func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
144 return func(args *SaramaClient) {
145 args.producerRetryBackOff = duration
146 }
147}
148
149func ProducerReturnOnErrors(opt bool) SaramaClientOption {
150 return func(args *SaramaClient) {
151 args.producerReturnErrors = opt
152 }
153}
154
155func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
156 return func(args *SaramaClient) {
157 args.producerReturnSuccess = opt
158 }
159}
160
161func ConsumerMaxWait(wait int) SaramaClientOption {
162 return func(args *SaramaClient) {
163 args.consumerMaxwait = wait
164 }
165}
166
167func MaxProcessingTime(pTime int) SaramaClientOption {
168 return func(args *SaramaClient) {
169 args.maxProcessingTime = pTime
170 }
171}
172
173func NumPartitions(number int) SaramaClientOption {
174 return func(args *SaramaClient) {
175 args.numPartitions = number
176 }
177}
178
179func NumReplicas(number int) SaramaClientOption {
180 return func(args *SaramaClient) {
181 args.numReplicas = number
182 }
183}
184
185func AutoCreateTopic(opt bool) SaramaClientOption {
186 return func(args *SaramaClient) {
187 args.autoCreateTopic = opt
188 }
189}
190
191func MetadatMaxRetries(retry int) SaramaClientOption {
192 return func(args *SaramaClient) {
193 args.metadataMaxRetry = retry
194 }
195}
196
Scott Baker104b67d2019-10-29 15:56:27 -0700197func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
198 return func(args *SaramaClient) {
199 args.livenessChannelInterval = opt
200 }
201}
202
Scott Baker2c1c4822019-10-16 11:02:41 -0700203func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
204 client := &SaramaClient{
205 KafkaHost: DefaultKafkaHost,
206 KafkaPort: DefaultKafkaPort,
207 }
208 client.consumerType = DefaultConsumerType
209 client.producerFlushFrequency = DefaultProducerFlushFrequency
210 client.producerFlushMessages = DefaultProducerFlushMessages
211 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
212 client.producerReturnErrors = DefaultProducerReturnErrors
213 client.producerReturnSuccess = DefaultProducerReturnSuccess
214 client.producerRetryMax = DefaultProducerRetryMax
215 client.producerRetryBackOff = DefaultProducerRetryBackoff
216 client.consumerMaxwait = DefaultConsumerMaxwait
217 client.maxProcessingTime = DefaultMaxProcessingTime
218 client.numPartitions = DefaultNumberPartitions
219 client.numReplicas = DefaultNumberReplicas
220 client.autoCreateTopic = DefaultAutoCreateTopic
221 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700222 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700223
224 for _, option := range opts {
225 option(client)
226 }
227
228 client.groupConsumers = make(map[string]*scc.Consumer)
229
230 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
231 client.topicLockMap = make(map[string]*sync.RWMutex)
232 client.lockOfTopicLockMap = sync.RWMutex{}
233 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700234
Scott Baker0fef6982019-12-12 09:49:42 -0800235 // healthy and alive until proven otherwise
Scott Baker104b67d2019-10-29 15:56:27 -0700236 client.alive = true
Scott Baker0fef6982019-12-12 09:49:42 -0800237 client.healthy = true
Scott Baker104b67d2019-10-29 15:56:27 -0700238
Scott Baker2c1c4822019-10-16 11:02:41 -0700239 return client
240}
241
242func (sc *SaramaClient) Start() error {
khenaidoob332f9b2020-01-16 16:25:26 -0500243 logger.Info("Starting-kafka-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700244
245 // Create the Done channel
246 sc.doneCh = make(chan int, 1)
247
248 var err error
249
250 // Add a cleanup in case of failure to startup
251 defer func() {
252 if err != nil {
253 sc.Stop()
254 }
255 }()
256
257 // Create the Cluster Admin
258 if err = sc.createClusterAdmin(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500259 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700260 return err
261 }
262
263 // Create the Publisher
264 if err := sc.createPublisher(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500265 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700266 return err
267 }
268
269 if sc.consumerType == DefaultConsumerType {
270 // Create the master consumers
271 if err := sc.createConsumer(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500272 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700273 return err
274 }
275 }
276
277 // Create the topic to consumers/channel map
278 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
279
khenaidoob332f9b2020-01-16 16:25:26 -0500280 logger.Info("kafka-sarama-client-started")
Scott Baker2c1c4822019-10-16 11:02:41 -0700281
Scott Baker104b67d2019-10-29 15:56:27 -0700282 sc.started = true
283
Scott Baker2c1c4822019-10-16 11:02:41 -0700284 return nil
285}
286
287func (sc *SaramaClient) Stop() {
khenaidoob332f9b2020-01-16 16:25:26 -0500288 logger.Info("stopping-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700289
Scott Baker104b67d2019-10-29 15:56:27 -0700290 sc.started = false
291
Scott Baker2c1c4822019-10-16 11:02:41 -0700292 //Send a message over the done channel to close all long running routines
293 sc.doneCh <- 1
294
295 if sc.producer != nil {
296 if err := sc.producer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500297 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700298 }
299 }
300
301 if sc.consumer != nil {
302 if err := sc.consumer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500303 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700304 }
305 }
306
307 for key, val := range sc.groupConsumers {
khenaidoob332f9b2020-01-16 16:25:26 -0500308 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700309 if err := val.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500310 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700311 }
312 }
313
314 if sc.cAdmin != nil {
315 if err := sc.cAdmin.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500316 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700317 }
318 }
319
320 //TODO: Clear the consumers map
321 //sc.clearConsumerChannelMap()
322
khenaidoob332f9b2020-01-16 16:25:26 -0500323 logger.Info("sarama-client-stopped")
Scott Baker2c1c4822019-10-16 11:02:41 -0700324}
325
326//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
327// the invoking function must hold the lock
328func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
329 // Set the topic details
330 topicDetail := &sarama.TopicDetail{}
331 topicDetail.NumPartitions = int32(numPartition)
332 topicDetail.ReplicationFactor = int16(repFactor)
333 topicDetail.ConfigEntries = make(map[string]*string)
334 topicDetails := make(map[string]*sarama.TopicDetail)
335 topicDetails[topic.Name] = topicDetail
336
337 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
338 if err == sarama.ErrTopicAlreadyExists {
339 // Not an error
khenaidoob332f9b2020-01-16 16:25:26 -0500340 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700341 return nil
342 }
khenaidoob332f9b2020-01-16 16:25:26 -0500343 logger.Errorw("create-topic-failure", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700344 return err
345 }
346 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
347 // do so.
khenaidoob332f9b2020-01-16 16:25:26 -0500348 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
Scott Baker2c1c4822019-10-16 11:02:41 -0700349 return nil
350}
351
352//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
353// ensure no two go routines are performing operations on the same topic
354func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
355 sc.lockTopic(topic)
356 defer sc.unLockTopic(topic)
357
358 return sc.createTopic(topic, numPartition, repFactor)
359}
360
361//DeleteTopic removes a topic from the kafka Broker
362func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
363 sc.lockTopic(topic)
364 defer sc.unLockTopic(topic)
365
366 // Remove the topic from the broker
367 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
368 if err == sarama.ErrUnknownTopicOrPartition {
369 // Not an error as does not exist
khenaidoob332f9b2020-01-16 16:25:26 -0500370 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700371 return nil
372 }
khenaidoob332f9b2020-01-16 16:25:26 -0500373 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700374 return err
375 }
376
377 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
378 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500379 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700380 return err
381 }
382 return nil
383}
384
385// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
386// messages from that topic
387func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
388 sc.lockTopic(topic)
389 defer sc.unLockTopic(topic)
390
khenaidoob332f9b2020-01-16 16:25:26 -0500391 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700392
393 // If a consumers already exist for that topic then resuse it
394 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500395 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700396 // Create a channel specific for that consumers and add it to the consumers channel map
397 ch := make(chan *ic.InterContainerMessage)
398 sc.addChannelToConsumerChannelMap(topic, ch)
399 return ch, nil
400 }
401
402 // Register for the topic and set it up
403 var consumerListeningChannel chan *ic.InterContainerMessage
404 var err error
405
406 // Use the consumerType option to figure out the type of consumer to launch
407 if sc.consumerType == PartitionConsumer {
408 if sc.autoCreateTopic {
409 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500410 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700411 return nil, err
412 }
413 }
414 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500415 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700416 return nil, err
417 }
418 } else if sc.consumerType == GroupCustomer {
419 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
420 // does not consume from a precreated topic in some scenarios
421 //if sc.autoCreateTopic {
422 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500423 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700424 // return nil, err
425 // }
426 //}
427 //groupId := sc.consumerGroupName
428 groupId := getGroupId(kvArgs...)
429 // Include the group prefix
430 if groupId != "" {
431 groupId = sc.consumerGroupPrefix + groupId
432 } else {
433 // Need to use a unique group Id per topic
434 groupId = sc.consumerGroupPrefix + topic.Name
435 }
436 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500437 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700438 return nil, err
439 }
440
441 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500442 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700443 return nil, errors.New("unknown-consumer-type")
444 }
445
446 return consumerListeningChannel, nil
447}
448
449//UnSubscribe unsubscribe a consumer from a given topic
450func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
451 sc.lockTopic(topic)
452 defer sc.unLockTopic(topic)
453
khenaidoob332f9b2020-01-16 16:25:26 -0500454 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700455 var err error
456 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500457 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700458 }
459 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500460 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700461 }
462 return err
463}
464
Scott Baker84a55ce2020-04-17 10:11:30 -0700465func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp time.Time)) {
Kent Hagermanccfa2132019-12-17 13:29:34 -0500466 sc.metadataCallback = callback
467}
468
Scott Baker104b67d2019-10-29 15:56:27 -0700469func (sc *SaramaClient) updateLiveness(alive bool) {
470 // Post a consistent stream of liveness data to the channel,
471 // so that in a live state, the core does not timeout and
472 // send a forced liveness message. Production of liveness
473 // events to the channel is rate-limited by livenessChannelInterval.
474 if sc.liveness != nil {
475 if sc.alive != alive {
khenaidoob332f9b2020-01-16 16:25:26 -0500476 logger.Info("update-liveness-channel-because-change")
Scott Baker104b67d2019-10-29 15:56:27 -0700477 sc.liveness <- alive
478 sc.lastLivenessTime = time.Now()
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800479 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
khenaidoob332f9b2020-01-16 16:25:26 -0500480 logger.Info("update-liveness-channel-because-interval")
Scott Baker104b67d2019-10-29 15:56:27 -0700481 sc.liveness <- alive
482 sc.lastLivenessTime = time.Now()
483 }
484 }
485
486 // Only emit a log message when the state changes
487 if sc.alive != alive {
khenaidoob332f9b2020-01-16 16:25:26 -0500488 logger.Info("set-client-alive", log.Fields{"alive": alive})
Scott Baker104b67d2019-10-29 15:56:27 -0700489 sc.alive = alive
490 }
491}
492
Scott Baker0fef6982019-12-12 09:49:42 -0800493// Once unhealthy, we never go back
494func (sc *SaramaClient) setUnhealthy() {
495 sc.healthy = false
496 if sc.healthiness != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500497 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker0fef6982019-12-12 09:49:42 -0800498 sc.healthiness <- sc.healthy
499 }
500}
501
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800502func (sc *SaramaClient) isLivenessError(err error) bool {
503 // Sarama producers and consumers encapsulate the error inside
504 // a ProducerError or ConsumerError struct.
505 if prodError, ok := err.(*sarama.ProducerError); ok {
506 err = prodError.Err
507 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
508 err = consumerError.Err
509 }
510
511 // Sarama-Cluster will compose the error into a ClusterError struct,
512 // which we can't do a compare by reference. To handle that, we the
513 // best we can do is compare the error strings.
514
515 switch err.Error() {
516 case context.DeadlineExceeded.Error():
khenaidoob332f9b2020-01-16 16:25:26 -0500517 logger.Info("is-liveness-error-timeout")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800518 return true
519 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
khenaidoob332f9b2020-01-16 16:25:26 -0500520 logger.Info("is-liveness-error-no-brokers")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800521 return true
522 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
khenaidoob332f9b2020-01-16 16:25:26 -0500523 logger.Info("is-liveness-error-shutting-down")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800524 return true
525 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
khenaidoob332f9b2020-01-16 16:25:26 -0500526 logger.Info("is-liveness-error-not-available")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800527 return true
528 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
khenaidoob332f9b2020-01-16 16:25:26 -0500529 logger.Info("is-liveness-error-circuit-breaker-open")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800530 return true
531 }
532
533 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
khenaidoob332f9b2020-01-16 16:25:26 -0500534 logger.Info("is-liveness-error-connection-refused")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800535 return true
536 }
537
Scott Baker718bee02020-01-07 09:52:02 -0800538 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
khenaidoob332f9b2020-01-16 16:25:26 -0500539 logger.Info("is-liveness-error-io-timeout")
Scott Baker718bee02020-01-07 09:52:02 -0800540 return true
541 }
542
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800543 // Other errors shouldn't trigger a loss of liveness
544
khenaidoob332f9b2020-01-16 16:25:26 -0500545 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800546
547 return false
548}
549
Scott Baker2c1c4822019-10-16 11:02:41 -0700550// send formats and sends the request onto the kafka messaging bus.
551func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
552
553 // Assert message is a proto message
554 var protoMsg proto.Message
555 var ok bool
556 // ascertain the value interface type is a proto.Message
557 if protoMsg, ok = msg.(proto.Message); !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500558 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800559 return fmt.Errorf("not-a-proto-msg-%s", msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700560 }
561
562 var marshalled []byte
563 var err error
564 // Create the Sarama producer message
565 if marshalled, err = proto.Marshal(protoMsg); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500566 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700567 return err
568 }
569 key := ""
570 if len(keys) > 0 {
571 key = keys[0] // Only the first key is relevant
572 }
573 kafkaMsg := &sarama.ProducerMessage{
574 Topic: topic.Name,
575 Key: sarama.StringEncoder(key),
576 Value: sarama.ByteEncoder(marshalled),
577 }
578
579 // Send message to kafka
580 sc.producer.Input() <- kafkaMsg
581 // Wait for result
582 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
583 select {
584 case ok := <-sc.producer.Successes():
khenaidoob332f9b2020-01-16 16:25:26 -0500585 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700586 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700587 case notOk := <-sc.producer.Errors():
khenaidoob332f9b2020-01-16 16:25:26 -0500588 logger.Debugw("error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800589 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700590 sc.updateLiveness(false)
591 }
592 return notOk
593 }
594 return nil
595}
596
597// Enable the liveness monitor channel. This channel will report
598// a "true" or "false" on every publish, which indicates whether
599// or not the channel is still live. This channel is then picked up
600// by the service (i.e. rw_core / ro_core) to update readiness status
601// and/or take other actions.
602func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
khenaidoob332f9b2020-01-16 16:25:26 -0500603 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Baker104b67d2019-10-29 15:56:27 -0700604 if enable {
605 if sc.liveness == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500606 logger.Info("kafka-create-liveness-channel")
Scott Baker104b67d2019-10-29 15:56:27 -0700607 // At least 1, so we can immediately post to it without blocking
608 // Setting a bigger number (10) allows the monitor to fall behind
609 // without blocking others. The monitor shouldn't really fall
610 // behind...
611 sc.liveness = make(chan bool, 10)
612 // post intial state to the channel
613 sc.liveness <- sc.alive
614 }
615 } else {
616 // TODO: Think about whether we need the ability to turn off
617 // liveness monitoring
618 panic("Turning off liveness reporting is not supported")
619 }
620 return sc.liveness
621}
622
Scott Baker0fef6982019-12-12 09:49:42 -0800623// Enable the Healthiness monitor channel. This channel will report "false"
624// if the kafka consumers die, or some other problem occurs which is
625// catastrophic that would require re-creating the client.
626func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
khenaidoob332f9b2020-01-16 16:25:26 -0500627 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker0fef6982019-12-12 09:49:42 -0800628 if enable {
629 if sc.healthiness == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500630 logger.Info("kafka-create-healthiness-channel")
Scott Baker0fef6982019-12-12 09:49:42 -0800631 // At least 1, so we can immediately post to it without blocking
632 // Setting a bigger number (10) allows the monitor to fall behind
633 // without blocking others. The monitor shouldn't really fall
634 // behind...
635 sc.healthiness = make(chan bool, 10)
636 // post intial state to the channel
637 sc.healthiness <- sc.healthy
638 }
639 } else {
640 // TODO: Think about whether we need the ability to turn off
641 // liveness monitoring
642 panic("Turning off healthiness reporting is not supported")
643 }
644 return sc.healthiness
645}
646
Scott Baker104b67d2019-10-29 15:56:27 -0700647// send an empty message on the liveness channel to check whether connectivity has
648// been restored.
649func (sc *SaramaClient) SendLiveness() error {
650 if !sc.started {
651 return fmt.Errorf("SendLiveness() called while not started")
652 }
653
654 kafkaMsg := &sarama.ProducerMessage{
655 Topic: "_liveness_test",
656 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
657 }
658
659 // Send message to kafka
660 sc.producer.Input() <- kafkaMsg
661 // Wait for result
662 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
663 select {
664 case ok := <-sc.producer.Successes():
khenaidoob332f9b2020-01-16 16:25:26 -0500665 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700666 sc.updateLiveness(true)
667 case notOk := <-sc.producer.Errors():
khenaidoob332f9b2020-01-16 16:25:26 -0500668 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800669 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700670 sc.updateLiveness(false)
671 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700672 return notOk
673 }
674 return nil
675}
676
677// getGroupId returns the group id from the key-value args.
678func getGroupId(kvArgs ...*KVArg) string {
679 for _, arg := range kvArgs {
680 if arg.Key == GroupIdKey {
681 return arg.Value.(string)
682 }
683 }
684 return ""
685}
686
687// getOffset returns the offset from the key-value args.
688func getOffset(kvArgs ...*KVArg) int64 {
689 for _, arg := range kvArgs {
690 if arg.Key == Offset {
691 return arg.Value.(int64)
692 }
693 }
694 return sarama.OffsetNewest
695}
696
697func (sc *SaramaClient) createClusterAdmin() error {
698 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
699 config := sarama.NewConfig()
700 config.Version = sarama.V1_0_0_0
701
702 // Create a cluster Admin
703 var cAdmin sarama.ClusterAdmin
704 var err error
705 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500706 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
Scott Baker2c1c4822019-10-16 11:02:41 -0700707 return err
708 }
709 sc.cAdmin = cAdmin
710 return nil
711}
712
713func (sc *SaramaClient) lockTopic(topic *Topic) {
714 sc.lockOfTopicLockMap.Lock()
715 if _, exist := sc.topicLockMap[topic.Name]; exist {
716 sc.lockOfTopicLockMap.Unlock()
717 sc.topicLockMap[topic.Name].Lock()
718 } else {
719 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
720 sc.lockOfTopicLockMap.Unlock()
721 sc.topicLockMap[topic.Name].Lock()
722 }
723}
724
725func (sc *SaramaClient) unLockTopic(topic *Topic) {
726 sc.lockOfTopicLockMap.Lock()
727 defer sc.lockOfTopicLockMap.Unlock()
728 if _, exist := sc.topicLockMap[topic.Name]; exist {
729 sc.topicLockMap[topic.Name].Unlock()
730 }
731}
732
733func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
734 sc.lockTopicToConsumerChannelMap.Lock()
735 defer sc.lockTopicToConsumerChannelMap.Unlock()
736 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
737 sc.topicToConsumerChannelMap[id] = arg
738 }
739}
740
Scott Baker2c1c4822019-10-16 11:02:41 -0700741func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
742 sc.lockTopicToConsumerChannelMap.RLock()
743 defer sc.lockTopicToConsumerChannelMap.RUnlock()
744
745 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
746 return consumerCh
747 }
748 return nil
749}
750
751func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
752 sc.lockTopicToConsumerChannelMap.Lock()
753 defer sc.lockTopicToConsumerChannelMap.Unlock()
754 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
755 consumerCh.channels = append(consumerCh.channels, ch)
756 return
757 }
khenaidoob332f9b2020-01-16 16:25:26 -0500758 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700759}
760
761//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
762func closeConsumers(consumers []interface{}) error {
763 var err error
764 for _, consumer := range consumers {
765 // Is it a partition consumers?
766 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
767 if errTemp := partionConsumer.Close(); errTemp != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500768 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
Scott Baker2c1c4822019-10-16 11:02:41 -0700769 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
770 // This can occur on race condition
771 err = nil
772 } else {
773 err = errTemp
774 }
775 }
776 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
777 if errTemp := groupConsumer.Close(); errTemp != nil {
778 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
779 // This can occur on race condition
780 err = nil
781 } else {
782 err = errTemp
783 }
784 }
785 }
786 }
787 return err
788}
789
790func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
791 sc.lockTopicToConsumerChannelMap.Lock()
792 defer sc.lockTopicToConsumerChannelMap.Unlock()
793 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
794 // Channel will be closed in the removeChannel method
795 consumerCh.channels = removeChannel(consumerCh.channels, ch)
796 // If there are no more channels then we can close the consumers itself
797 if len(consumerCh.channels) == 0 {
khenaidoob332f9b2020-01-16 16:25:26 -0500798 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700799 err := closeConsumers(consumerCh.consumers)
800 //err := consumerCh.consumers.Close()
801 delete(sc.topicToConsumerChannelMap, topic.Name)
802 return err
803 }
804 return nil
805 }
khenaidoob332f9b2020-01-16 16:25:26 -0500806 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700807 return errors.New("topic-does-not-exist")
808}
809
810func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
811 sc.lockTopicToConsumerChannelMap.Lock()
812 defer sc.lockTopicToConsumerChannelMap.Unlock()
813 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
814 for _, ch := range consumerCh.channels {
815 // Channel will be closed in the removeChannel method
816 removeChannel(consumerCh.channels, ch)
817 }
818 err := closeConsumers(consumerCh.consumers)
819 //if err == sarama.ErrUnknownTopicOrPartition {
820 // // Not an error
821 // err = nil
822 //}
823 //err := consumerCh.consumers.Close()
824 delete(sc.topicToConsumerChannelMap, topic.Name)
825 return err
826 }
khenaidoob332f9b2020-01-16 16:25:26 -0500827 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700828 return nil
829}
830
Scott Baker2c1c4822019-10-16 11:02:41 -0700831//createPublisher creates the publisher which is used to send a message onto kafka
832func (sc *SaramaClient) createPublisher() error {
833 // This Creates the publisher
834 config := sarama.NewConfig()
835 config.Producer.Partitioner = sarama.NewRandomPartitioner
836 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
837 config.Producer.Flush.Messages = sc.producerFlushMessages
838 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
839 config.Producer.Return.Errors = sc.producerReturnErrors
840 config.Producer.Return.Successes = sc.producerReturnSuccess
841 //config.Producer.RequiredAcks = sarama.WaitForAll
842 config.Producer.RequiredAcks = sarama.WaitForLocal
843
844 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
845 brokers := []string{kafkaFullAddr}
846
847 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500848 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700849 return err
850 } else {
851 sc.producer = producer
852 }
khenaidoob332f9b2020-01-16 16:25:26 -0500853 logger.Info("Kafka-publisher-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700854 return nil
855}
856
857func (sc *SaramaClient) createConsumer() error {
858 config := sarama.NewConfig()
859 config.Consumer.Return.Errors = true
860 config.Consumer.Fetch.Min = 1
861 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
862 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
863 config.Consumer.Offsets.Initial = sarama.OffsetNewest
864 config.Metadata.Retry.Max = sc.metadataMaxRetry
865 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
866 brokers := []string{kafkaFullAddr}
867
868 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500869 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700870 return err
871 } else {
872 sc.consumer = consumer
873 }
khenaidoob332f9b2020-01-16 16:25:26 -0500874 logger.Info("Kafka-consumers-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700875 return nil
876}
877
878// createGroupConsumer creates a consumers group
879func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
880 config := scc.NewConfig()
881 config.ClientID = uuid.New().String()
882 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700883 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
884 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700885 //config.Group.Return.Notifications = false
886 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
887 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
888 config.Consumer.Offsets.Initial = initialOffset
889 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
890 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
891 brokers := []string{kafkaFullAddr}
892
893 topics := []string{topic.Name}
894 var consumer *scc.Consumer
895 var err error
896
897 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500898 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700899 return nil, err
900 }
khenaidoob332f9b2020-01-16 16:25:26 -0500901 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700902
903 //sc.groupConsumers[topic.Name] = consumer
904 sc.addToGroupConsumers(topic.Name, consumer)
905 return consumer, nil
906}
907
908// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
909// topic via the unique channel each subscriber received during subscription
910func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
911 // Need to go over all channels and publish messages to them - do we need to copy msg?
912 sc.lockTopicToConsumerChannelMap.RLock()
Scott Baker2c1c4822019-10-16 11:02:41 -0700913 for _, ch := range consumerCh.channels {
914 go func(c chan *ic.InterContainerMessage) {
915 c <- protoMessage
916 }(ch)
917 }
Kent Hagermanccfa2132019-12-17 13:29:34 -0500918 sc.lockTopicToConsumerChannelMap.RUnlock()
919
920 if callback := sc.metadataCallback; callback != nil {
Scott Baker84a55ce2020-04-17 10:11:30 -0700921 ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
922 callback(protoMessage.Header.FromTopic, ts)
Kent Hagermanccfa2132019-12-17 13:29:34 -0500923 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700924}
925
926func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
khenaidoob332f9b2020-01-16 16:25:26 -0500927 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700928startloop:
929 for {
930 select {
931 case err, ok := <-consumer.Errors():
932 if ok {
cbabud4978652019-12-04 08:04:21 +0100933 if sc.isLivenessError(err) {
934 sc.updateLiveness(false)
khenaidoob332f9b2020-01-16 16:25:26 -0500935 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
cbabud4978652019-12-04 08:04:21 +0100936 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700937 } else {
938 // Channel is closed
939 break startloop
940 }
941 case msg, ok := <-consumer.Messages():
khenaidoob332f9b2020-01-16 16:25:26 -0500942 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700943 if !ok {
944 // channel is closed
945 break startloop
946 }
947 msgBody := msg.Value
cbabud4978652019-12-04 08:04:21 +0100948 sc.updateLiveness(true)
khenaidoob332f9b2020-01-16 16:25:26 -0500949 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700950 icm := &ic.InterContainerMessage{}
951 if err := proto.Unmarshal(msgBody, icm); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500952 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700953 continue
954 }
955 go sc.dispatchToConsumers(consumerChnls, icm)
956 case <-sc.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -0500957 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700958 break startloop
959 }
960 }
khenaidoob332f9b2020-01-16 16:25:26 -0500961 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -0800962 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -0700963}
964
965func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
khenaidoob332f9b2020-01-16 16:25:26 -0500966 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700967
968startloop:
969 for {
970 select {
971 case err, ok := <-consumer.Errors():
972 if ok {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800973 if sc.isLivenessError(err) {
974 sc.updateLiveness(false)
975 }
khenaidoob332f9b2020-01-16 16:25:26 -0500976 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700977 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500978 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700979 // channel is closed
980 break startloop
981 }
982 case msg, ok := <-consumer.Messages():
983 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500984 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700985 // Channel closed
986 break startloop
987 }
Scott Baker104b67d2019-10-29 15:56:27 -0700988 sc.updateLiveness(true)
khenaidoob332f9b2020-01-16 16:25:26 -0500989 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700990 msgBody := msg.Value
991 icm := &ic.InterContainerMessage{}
992 if err := proto.Unmarshal(msgBody, icm); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500993 logger.Warnw("invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700994 continue
995 }
996 go sc.dispatchToConsumers(consumerChnls, icm)
997 consumer.MarkOffset(msg, "")
998 case ntf := <-consumer.Notifications():
khenaidoob332f9b2020-01-16 16:25:26 -0500999 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
Scott Baker2c1c4822019-10-16 11:02:41 -07001000 case <-sc.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -05001001 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001002 break startloop
1003 }
1004 }
khenaidoob332f9b2020-01-16 16:25:26 -05001005 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -08001006 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -07001007}
1008
1009func (sc *SaramaClient) startConsumers(topic *Topic) error {
khenaidoob332f9b2020-01-16 16:25:26 -05001010 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001011 var consumerCh *consumerChannels
1012 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001013 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001014 return errors.New("consumers-not-exist")
1015 }
1016 // For each consumer listening for that topic, start a consumption loop
1017 for _, consumer := range consumerCh.consumers {
1018 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1019 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1020 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1021 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1022 } else {
khenaidoob332f9b2020-01-16 16:25:26 -05001023 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001024 return errors.New("invalid-consumer")
1025 }
1026 }
1027 return nil
1028}
1029
1030//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1031//// for that topic. It also starts the routine that listens for messages on that topic.
1032func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1033 var pConsumers []sarama.PartitionConsumer
1034 var err error
1035
1036 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001037 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001038 return nil, err
1039 }
1040
1041 consumersIf := make([]interface{}, 0)
1042 for _, pConsumer := range pConsumers {
1043 consumersIf = append(consumersIf, pConsumer)
1044 }
1045
1046 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1047 // unbuffered to verify race conditions.
1048 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1049 cc := &consumerChannels{
1050 consumers: consumersIf,
1051 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1052 }
1053
1054 // Add the consumers channel to the map
1055 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1056
1057 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001058 go func() {
1059 if err := sc.startConsumers(topic); err != nil {
1060 logger.Errorw("start-consumers-failed", log.Fields{
1061 "topic": topic,
1062 "error": err})
1063 }
1064 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001065
1066 return consumerListeningChannel, nil
1067}
1068
1069// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1070// for that topic. It also starts the routine that listens for messages on that topic.
1071func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1072 // TODO: Replace this development partition consumers with a group consumers
1073 var pConsumer *scc.Consumer
1074 var err error
1075 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001076 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001077 return nil, err
1078 }
1079 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1080 // unbuffered to verify race conditions.
1081 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1082 cc := &consumerChannels{
1083 consumers: []interface{}{pConsumer},
1084 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1085 }
1086
1087 // Add the consumers channel to the map
1088 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1089
1090 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001091 go func() {
1092 if err := sc.startConsumers(topic); err != nil {
1093 logger.Errorw("start-consumers-failed", log.Fields{
1094 "topic": topic,
1095 "error": err})
1096 }
1097 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001098
1099 return consumerListeningChannel, nil
1100}
1101
1102func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoob332f9b2020-01-16 16:25:26 -05001103 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001104 partitionList, err := sc.consumer.Partitions(topic.Name)
1105 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001106 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001107 return nil, err
1108 }
1109
1110 pConsumers := make([]sarama.PartitionConsumer, 0)
1111 for _, partition := range partitionList {
1112 var pConsumer sarama.PartitionConsumer
1113 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001114 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001115 return nil, err
1116 }
1117 pConsumers = append(pConsumers, pConsumer)
1118 }
1119 return pConsumers, nil
1120}
1121
1122func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1123 var i int
1124 var channel chan *ic.InterContainerMessage
1125 for i, channel = range channels {
1126 if channel == ch {
1127 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1128 close(channel)
khenaidoob332f9b2020-01-16 16:25:26 -05001129 logger.Debug("channel-closed")
Scott Baker2c1c4822019-10-16 11:02:41 -07001130 return channels[:len(channels)-1]
1131 }
1132 }
1133 return channels
1134}
1135
1136func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1137 sc.lockOfGroupConsumers.Lock()
1138 defer sc.lockOfGroupConsumers.Unlock()
1139 if _, exist := sc.groupConsumers[topic]; !exist {
1140 sc.groupConsumers[topic] = consumer
1141 }
1142}
1143
1144func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1145 sc.lockOfGroupConsumers.Lock()
1146 defer sc.lockOfGroupConsumers.Unlock()
1147 if _, exist := sc.groupConsumers[topic]; exist {
1148 consumer := sc.groupConsumers[topic]
1149 delete(sc.groupConsumers, topic)
1150 if err := consumer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001151 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001152 return err
1153 }
1154 }
1155 return nil
1156}