blob: c0c16f94bb7764216c38311008dd33ba38118871 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080019 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070020 "errors"
21 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022 "strings"
23 "sync"
24 "time"
25
Scott Baker2c1c4822019-10-16 11:02:41 -070026 "github.com/Shopify/sarama"
27 scc "github.com/bsm/sarama-cluster"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080028 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070029 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
serkant.uluderyab38671c2019-11-01 09:35:38 -070031 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070033)
34
Scott Baker2c1c4822019-10-16 11:02:41 -070035type returnErrorFunction func() error
36
37// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
38// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
39//consumer or a group consumer
40type consumerChannels struct {
41 consumers []interface{}
42 channels []chan *ic.InterContainerMessage
43}
44
Kent Hagermanccfa2132019-12-17 13:29:34 -050045// static check to ensure SaramaClient implements Client
46var _ Client = &SaramaClient{}
47
Scott Baker2c1c4822019-10-16 11:02:41 -070048// SaramaClient represents the messaging proxy
49type SaramaClient struct {
50 cAdmin sarama.ClusterAdmin
51 client sarama.Client
52 KafkaHost string
53 KafkaPort int
54 producer sarama.AsyncProducer
55 consumer sarama.Consumer
56 groupConsumers map[string]*scc.Consumer
57 lockOfGroupConsumers sync.RWMutex
58 consumerGroupPrefix string
59 consumerType int
60 consumerGroupName string
61 producerFlushFrequency int
62 producerFlushMessages int
63 producerFlushMaxmessages int
64 producerRetryMax int
65 producerRetryBackOff time.Duration
66 producerReturnSuccess bool
67 producerReturnErrors bool
68 consumerMaxwait int
69 maxProcessingTime int
70 numPartitions int
71 numReplicas int
72 autoCreateTopic bool
73 doneCh chan int
Kent Hagermanccfa2132019-12-17 13:29:34 -050074 metadataCallback func(fromTopic string, timestamp int64)
Scott Baker2c1c4822019-10-16 11:02:41 -070075 topicToConsumerChannelMap map[string]*consumerChannels
76 lockTopicToConsumerChannelMap sync.RWMutex
77 topicLockMap map[string]*sync.RWMutex
78 lockOfTopicLockMap sync.RWMutex
79 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070080 alive bool
81 liveness chan bool
82 livenessChannelInterval time.Duration
83 lastLivenessTime time.Time
84 started bool
Scott Baker0fef6982019-12-12 09:49:42 -080085 healthy bool
86 healthiness chan bool
Scott Baker2c1c4822019-10-16 11:02:41 -070087}
88
89type SaramaClientOption func(*SaramaClient)
90
91func Host(host string) SaramaClientOption {
92 return func(args *SaramaClient) {
93 args.KafkaHost = host
94 }
95}
96
97func Port(port int) SaramaClientOption {
98 return func(args *SaramaClient) {
99 args.KafkaPort = port
100 }
101}
102
103func ConsumerGroupPrefix(prefix string) SaramaClientOption {
104 return func(args *SaramaClient) {
105 args.consumerGroupPrefix = prefix
106 }
107}
108
109func ConsumerGroupName(name string) SaramaClientOption {
110 return func(args *SaramaClient) {
111 args.consumerGroupName = name
112 }
113}
114
115func ConsumerType(consumer int) SaramaClientOption {
116 return func(args *SaramaClient) {
117 args.consumerType = consumer
118 }
119}
120
121func ProducerFlushFrequency(frequency int) SaramaClientOption {
122 return func(args *SaramaClient) {
123 args.producerFlushFrequency = frequency
124 }
125}
126
127func ProducerFlushMessages(num int) SaramaClientOption {
128 return func(args *SaramaClient) {
129 args.producerFlushMessages = num
130 }
131}
132
133func ProducerFlushMaxMessages(num int) SaramaClientOption {
134 return func(args *SaramaClient) {
135 args.producerFlushMaxmessages = num
136 }
137}
138
139func ProducerMaxRetries(num int) SaramaClientOption {
140 return func(args *SaramaClient) {
141 args.producerRetryMax = num
142 }
143}
144
145func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
146 return func(args *SaramaClient) {
147 args.producerRetryBackOff = duration
148 }
149}
150
151func ProducerReturnOnErrors(opt bool) SaramaClientOption {
152 return func(args *SaramaClient) {
153 args.producerReturnErrors = opt
154 }
155}
156
157func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
158 return func(args *SaramaClient) {
159 args.producerReturnSuccess = opt
160 }
161}
162
163func ConsumerMaxWait(wait int) SaramaClientOption {
164 return func(args *SaramaClient) {
165 args.consumerMaxwait = wait
166 }
167}
168
169func MaxProcessingTime(pTime int) SaramaClientOption {
170 return func(args *SaramaClient) {
171 args.maxProcessingTime = pTime
172 }
173}
174
175func NumPartitions(number int) SaramaClientOption {
176 return func(args *SaramaClient) {
177 args.numPartitions = number
178 }
179}
180
181func NumReplicas(number int) SaramaClientOption {
182 return func(args *SaramaClient) {
183 args.numReplicas = number
184 }
185}
186
187func AutoCreateTopic(opt bool) SaramaClientOption {
188 return func(args *SaramaClient) {
189 args.autoCreateTopic = opt
190 }
191}
192
193func MetadatMaxRetries(retry int) SaramaClientOption {
194 return func(args *SaramaClient) {
195 args.metadataMaxRetry = retry
196 }
197}
198
Scott Baker104b67d2019-10-29 15:56:27 -0700199func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
200 return func(args *SaramaClient) {
201 args.livenessChannelInterval = opt
202 }
203}
204
Scott Baker2c1c4822019-10-16 11:02:41 -0700205func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
206 client := &SaramaClient{
207 KafkaHost: DefaultKafkaHost,
208 KafkaPort: DefaultKafkaPort,
209 }
210 client.consumerType = DefaultConsumerType
211 client.producerFlushFrequency = DefaultProducerFlushFrequency
212 client.producerFlushMessages = DefaultProducerFlushMessages
213 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
214 client.producerReturnErrors = DefaultProducerReturnErrors
215 client.producerReturnSuccess = DefaultProducerReturnSuccess
216 client.producerRetryMax = DefaultProducerRetryMax
217 client.producerRetryBackOff = DefaultProducerRetryBackoff
218 client.consumerMaxwait = DefaultConsumerMaxwait
219 client.maxProcessingTime = DefaultMaxProcessingTime
220 client.numPartitions = DefaultNumberPartitions
221 client.numReplicas = DefaultNumberReplicas
222 client.autoCreateTopic = DefaultAutoCreateTopic
223 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700224 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700225
226 for _, option := range opts {
227 option(client)
228 }
229
230 client.groupConsumers = make(map[string]*scc.Consumer)
231
232 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
233 client.topicLockMap = make(map[string]*sync.RWMutex)
234 client.lockOfTopicLockMap = sync.RWMutex{}
235 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700236
Scott Baker0fef6982019-12-12 09:49:42 -0800237 // healthy and alive until proven otherwise
Scott Baker104b67d2019-10-29 15:56:27 -0700238 client.alive = true
Scott Baker0fef6982019-12-12 09:49:42 -0800239 client.healthy = true
Scott Baker104b67d2019-10-29 15:56:27 -0700240
Scott Baker2c1c4822019-10-16 11:02:41 -0700241 return client
242}
243
244func (sc *SaramaClient) Start() error {
khenaidoob332f9b2020-01-16 16:25:26 -0500245 logger.Info("Starting-kafka-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700246
247 // Create the Done channel
248 sc.doneCh = make(chan int, 1)
249
250 var err error
251
252 // Add a cleanup in case of failure to startup
253 defer func() {
254 if err != nil {
255 sc.Stop()
256 }
257 }()
258
259 // Create the Cluster Admin
260 if err = sc.createClusterAdmin(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500261 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700262 return err
263 }
264
265 // Create the Publisher
266 if err := sc.createPublisher(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500267 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700268 return err
269 }
270
271 if sc.consumerType == DefaultConsumerType {
272 // Create the master consumers
273 if err := sc.createConsumer(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500274 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700275 return err
276 }
277 }
278
279 // Create the topic to consumers/channel map
280 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
281
khenaidoob332f9b2020-01-16 16:25:26 -0500282 logger.Info("kafka-sarama-client-started")
Scott Baker2c1c4822019-10-16 11:02:41 -0700283
Scott Baker104b67d2019-10-29 15:56:27 -0700284 sc.started = true
285
Scott Baker2c1c4822019-10-16 11:02:41 -0700286 return nil
287}
288
289func (sc *SaramaClient) Stop() {
khenaidoob332f9b2020-01-16 16:25:26 -0500290 logger.Info("stopping-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700291
Scott Baker104b67d2019-10-29 15:56:27 -0700292 sc.started = false
293
Scott Baker2c1c4822019-10-16 11:02:41 -0700294 //Send a message over the done channel to close all long running routines
295 sc.doneCh <- 1
296
297 if sc.producer != nil {
298 if err := sc.producer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500299 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700300 }
301 }
302
303 if sc.consumer != nil {
304 if err := sc.consumer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500305 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700306 }
307 }
308
309 for key, val := range sc.groupConsumers {
khenaidoob332f9b2020-01-16 16:25:26 -0500310 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700311 if err := val.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500312 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700313 }
314 }
315
316 if sc.cAdmin != nil {
317 if err := sc.cAdmin.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500318 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700319 }
320 }
321
322 //TODO: Clear the consumers map
323 //sc.clearConsumerChannelMap()
324
khenaidoob332f9b2020-01-16 16:25:26 -0500325 logger.Info("sarama-client-stopped")
Scott Baker2c1c4822019-10-16 11:02:41 -0700326}
327
328//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
329// the invoking function must hold the lock
330func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
331 // Set the topic details
332 topicDetail := &sarama.TopicDetail{}
333 topicDetail.NumPartitions = int32(numPartition)
334 topicDetail.ReplicationFactor = int16(repFactor)
335 topicDetail.ConfigEntries = make(map[string]*string)
336 topicDetails := make(map[string]*sarama.TopicDetail)
337 topicDetails[topic.Name] = topicDetail
338
339 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
340 if err == sarama.ErrTopicAlreadyExists {
341 // Not an error
khenaidoob332f9b2020-01-16 16:25:26 -0500342 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700343 return nil
344 }
khenaidoob332f9b2020-01-16 16:25:26 -0500345 logger.Errorw("create-topic-failure", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700346 return err
347 }
348 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
349 // do so.
khenaidoob332f9b2020-01-16 16:25:26 -0500350 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
Scott Baker2c1c4822019-10-16 11:02:41 -0700351 return nil
352}
353
354//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
355// ensure no two go routines are performing operations on the same topic
356func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
357 sc.lockTopic(topic)
358 defer sc.unLockTopic(topic)
359
360 return sc.createTopic(topic, numPartition, repFactor)
361}
362
363//DeleteTopic removes a topic from the kafka Broker
364func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
365 sc.lockTopic(topic)
366 defer sc.unLockTopic(topic)
367
368 // Remove the topic from the broker
369 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
370 if err == sarama.ErrUnknownTopicOrPartition {
371 // Not an error as does not exist
khenaidoob332f9b2020-01-16 16:25:26 -0500372 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700373 return nil
374 }
khenaidoob332f9b2020-01-16 16:25:26 -0500375 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700376 return err
377 }
378
379 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
380 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500381 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700382 return err
383 }
384 return nil
385}
386
387// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
388// messages from that topic
389func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
390 sc.lockTopic(topic)
391 defer sc.unLockTopic(topic)
392
khenaidoob332f9b2020-01-16 16:25:26 -0500393 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700394
395 // If a consumers already exist for that topic then resuse it
396 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500397 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700398 // Create a channel specific for that consumers and add it to the consumers channel map
399 ch := make(chan *ic.InterContainerMessage)
400 sc.addChannelToConsumerChannelMap(topic, ch)
401 return ch, nil
402 }
403
404 // Register for the topic and set it up
405 var consumerListeningChannel chan *ic.InterContainerMessage
406 var err error
407
408 // Use the consumerType option to figure out the type of consumer to launch
409 if sc.consumerType == PartitionConsumer {
410 if sc.autoCreateTopic {
411 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500412 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700413 return nil, err
414 }
415 }
416 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500417 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700418 return nil, err
419 }
420 } else if sc.consumerType == GroupCustomer {
421 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
422 // does not consume from a precreated topic in some scenarios
423 //if sc.autoCreateTopic {
424 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500425 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700426 // return nil, err
427 // }
428 //}
429 //groupId := sc.consumerGroupName
430 groupId := getGroupId(kvArgs...)
431 // Include the group prefix
432 if groupId != "" {
433 groupId = sc.consumerGroupPrefix + groupId
434 } else {
435 // Need to use a unique group Id per topic
436 groupId = sc.consumerGroupPrefix + topic.Name
437 }
438 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500439 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700440 return nil, err
441 }
442
443 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500444 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700445 return nil, errors.New("unknown-consumer-type")
446 }
447
448 return consumerListeningChannel, nil
449}
450
451//UnSubscribe unsubscribe a consumer from a given topic
452func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
453 sc.lockTopic(topic)
454 defer sc.unLockTopic(topic)
455
khenaidoob332f9b2020-01-16 16:25:26 -0500456 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700457 var err error
458 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500459 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700460 }
461 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500462 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700463 }
464 return err
465}
466
Kent Hagermanccfa2132019-12-17 13:29:34 -0500467func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
468 sc.metadataCallback = callback
469}
470
Scott Baker104b67d2019-10-29 15:56:27 -0700471func (sc *SaramaClient) updateLiveness(alive bool) {
472 // Post a consistent stream of liveness data to the channel,
473 // so that in a live state, the core does not timeout and
474 // send a forced liveness message. Production of liveness
475 // events to the channel is rate-limited by livenessChannelInterval.
476 if sc.liveness != nil {
477 if sc.alive != alive {
khenaidoob332f9b2020-01-16 16:25:26 -0500478 logger.Info("update-liveness-channel-because-change")
Scott Baker104b67d2019-10-29 15:56:27 -0700479 sc.liveness <- alive
480 sc.lastLivenessTime = time.Now()
481 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
khenaidoob332f9b2020-01-16 16:25:26 -0500482 logger.Info("update-liveness-channel-because-interval")
Scott Baker104b67d2019-10-29 15:56:27 -0700483 sc.liveness <- alive
484 sc.lastLivenessTime = time.Now()
485 }
486 }
487
488 // Only emit a log message when the state changes
489 if sc.alive != alive {
khenaidoob332f9b2020-01-16 16:25:26 -0500490 logger.Info("set-client-alive", log.Fields{"alive": alive})
Scott Baker104b67d2019-10-29 15:56:27 -0700491 sc.alive = alive
492 }
493}
494
Scott Baker0fef6982019-12-12 09:49:42 -0800495// Once unhealthy, we never go back
496func (sc *SaramaClient) setUnhealthy() {
497 sc.healthy = false
498 if sc.healthiness != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500499 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker0fef6982019-12-12 09:49:42 -0800500 sc.healthiness <- sc.healthy
501 }
502}
503
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800504func (sc *SaramaClient) isLivenessError(err error) bool {
505 // Sarama producers and consumers encapsulate the error inside
506 // a ProducerError or ConsumerError struct.
507 if prodError, ok := err.(*sarama.ProducerError); ok {
508 err = prodError.Err
509 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
510 err = consumerError.Err
511 }
512
513 // Sarama-Cluster will compose the error into a ClusterError struct,
514 // which we can't do a compare by reference. To handle that, we the
515 // best we can do is compare the error strings.
516
517 switch err.Error() {
518 case context.DeadlineExceeded.Error():
khenaidoob332f9b2020-01-16 16:25:26 -0500519 logger.Info("is-liveness-error-timeout")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800520 return true
521 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
khenaidoob332f9b2020-01-16 16:25:26 -0500522 logger.Info("is-liveness-error-no-brokers")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800523 return true
524 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
khenaidoob332f9b2020-01-16 16:25:26 -0500525 logger.Info("is-liveness-error-shutting-down")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800526 return true
527 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
khenaidoob332f9b2020-01-16 16:25:26 -0500528 logger.Info("is-liveness-error-not-available")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800529 return true
530 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
khenaidoob332f9b2020-01-16 16:25:26 -0500531 logger.Info("is-liveness-error-circuit-breaker-open")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800532 return true
533 }
534
535 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
khenaidoob332f9b2020-01-16 16:25:26 -0500536 logger.Info("is-liveness-error-connection-refused")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800537 return true
538 }
539
Scott Baker718bee02020-01-07 09:52:02 -0800540 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
khenaidoob332f9b2020-01-16 16:25:26 -0500541 logger.Info("is-liveness-error-io-timeout")
Scott Baker718bee02020-01-07 09:52:02 -0800542 return true
543 }
544
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800545 // Other errors shouldn't trigger a loss of liveness
546
khenaidoob332f9b2020-01-16 16:25:26 -0500547 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800548
549 return false
550}
551
Scott Baker2c1c4822019-10-16 11:02:41 -0700552// send formats and sends the request onto the kafka messaging bus.
553func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
554
555 // Assert message is a proto message
556 var protoMsg proto.Message
557 var ok bool
558 // ascertain the value interface type is a proto.Message
559 if protoMsg, ok = msg.(proto.Message); !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500560 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
Scott Baker2c1c4822019-10-16 11:02:41 -0700561 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
562 }
563
564 var marshalled []byte
565 var err error
566 // Create the Sarama producer message
567 if marshalled, err = proto.Marshal(protoMsg); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500568 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700569 return err
570 }
571 key := ""
572 if len(keys) > 0 {
573 key = keys[0] // Only the first key is relevant
574 }
575 kafkaMsg := &sarama.ProducerMessage{
576 Topic: topic.Name,
577 Key: sarama.StringEncoder(key),
578 Value: sarama.ByteEncoder(marshalled),
579 }
580
581 // Send message to kafka
582 sc.producer.Input() <- kafkaMsg
583 // Wait for result
584 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
585 select {
586 case ok := <-sc.producer.Successes():
khenaidoob332f9b2020-01-16 16:25:26 -0500587 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700588 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700589 case notOk := <-sc.producer.Errors():
khenaidoob332f9b2020-01-16 16:25:26 -0500590 logger.Debugw("error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800591 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700592 sc.updateLiveness(false)
593 }
594 return notOk
595 }
596 return nil
597}
598
599// Enable the liveness monitor channel. This channel will report
600// a "true" or "false" on every publish, which indicates whether
601// or not the channel is still live. This channel is then picked up
602// by the service (i.e. rw_core / ro_core) to update readiness status
603// and/or take other actions.
604func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
khenaidoob332f9b2020-01-16 16:25:26 -0500605 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Baker104b67d2019-10-29 15:56:27 -0700606 if enable {
607 if sc.liveness == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500608 logger.Info("kafka-create-liveness-channel")
Scott Baker104b67d2019-10-29 15:56:27 -0700609 // At least 1, so we can immediately post to it without blocking
610 // Setting a bigger number (10) allows the monitor to fall behind
611 // without blocking others. The monitor shouldn't really fall
612 // behind...
613 sc.liveness = make(chan bool, 10)
614 // post intial state to the channel
615 sc.liveness <- sc.alive
616 }
617 } else {
618 // TODO: Think about whether we need the ability to turn off
619 // liveness monitoring
620 panic("Turning off liveness reporting is not supported")
621 }
622 return sc.liveness
623}
624
Scott Baker0fef6982019-12-12 09:49:42 -0800625// Enable the Healthiness monitor channel. This channel will report "false"
626// if the kafka consumers die, or some other problem occurs which is
627// catastrophic that would require re-creating the client.
628func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
khenaidoob332f9b2020-01-16 16:25:26 -0500629 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker0fef6982019-12-12 09:49:42 -0800630 if enable {
631 if sc.healthiness == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500632 logger.Info("kafka-create-healthiness-channel")
Scott Baker0fef6982019-12-12 09:49:42 -0800633 // At least 1, so we can immediately post to it without blocking
634 // Setting a bigger number (10) allows the monitor to fall behind
635 // without blocking others. The monitor shouldn't really fall
636 // behind...
637 sc.healthiness = make(chan bool, 10)
638 // post intial state to the channel
639 sc.healthiness <- sc.healthy
640 }
641 } else {
642 // TODO: Think about whether we need the ability to turn off
643 // liveness monitoring
644 panic("Turning off healthiness reporting is not supported")
645 }
646 return sc.healthiness
647}
648
Scott Baker104b67d2019-10-29 15:56:27 -0700649// send an empty message on the liveness channel to check whether connectivity has
650// been restored.
651func (sc *SaramaClient) SendLiveness() error {
652 if !sc.started {
653 return fmt.Errorf("SendLiveness() called while not started")
654 }
655
656 kafkaMsg := &sarama.ProducerMessage{
657 Topic: "_liveness_test",
658 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
659 }
660
661 // Send message to kafka
662 sc.producer.Input() <- kafkaMsg
663 // Wait for result
664 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
665 select {
666 case ok := <-sc.producer.Successes():
khenaidoob332f9b2020-01-16 16:25:26 -0500667 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700668 sc.updateLiveness(true)
669 case notOk := <-sc.producer.Errors():
khenaidoob332f9b2020-01-16 16:25:26 -0500670 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800671 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700672 sc.updateLiveness(false)
673 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700674 return notOk
675 }
676 return nil
677}
678
679// getGroupId returns the group id from the key-value args.
680func getGroupId(kvArgs ...*KVArg) string {
681 for _, arg := range kvArgs {
682 if arg.Key == GroupIdKey {
683 return arg.Value.(string)
684 }
685 }
686 return ""
687}
688
689// getOffset returns the offset from the key-value args.
690func getOffset(kvArgs ...*KVArg) int64 {
691 for _, arg := range kvArgs {
692 if arg.Key == Offset {
693 return arg.Value.(int64)
694 }
695 }
696 return sarama.OffsetNewest
697}
698
699func (sc *SaramaClient) createClusterAdmin() error {
700 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
701 config := sarama.NewConfig()
702 config.Version = sarama.V1_0_0_0
703
704 // Create a cluster Admin
705 var cAdmin sarama.ClusterAdmin
706 var err error
707 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500708 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
Scott Baker2c1c4822019-10-16 11:02:41 -0700709 return err
710 }
711 sc.cAdmin = cAdmin
712 return nil
713}
714
715func (sc *SaramaClient) lockTopic(topic *Topic) {
716 sc.lockOfTopicLockMap.Lock()
717 if _, exist := sc.topicLockMap[topic.Name]; exist {
718 sc.lockOfTopicLockMap.Unlock()
719 sc.topicLockMap[topic.Name].Lock()
720 } else {
721 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
722 sc.lockOfTopicLockMap.Unlock()
723 sc.topicLockMap[topic.Name].Lock()
724 }
725}
726
727func (sc *SaramaClient) unLockTopic(topic *Topic) {
728 sc.lockOfTopicLockMap.Lock()
729 defer sc.lockOfTopicLockMap.Unlock()
730 if _, exist := sc.topicLockMap[topic.Name]; exist {
731 sc.topicLockMap[topic.Name].Unlock()
732 }
733}
734
735func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
739 sc.topicToConsumerChannelMap[id] = arg
740 }
741}
742
743func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
744 sc.lockTopicToConsumerChannelMap.Lock()
745 defer sc.lockTopicToConsumerChannelMap.Unlock()
746 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
747 delete(sc.topicToConsumerChannelMap, id)
748 }
749}
750
751func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
752 sc.lockTopicToConsumerChannelMap.RLock()
753 defer sc.lockTopicToConsumerChannelMap.RUnlock()
754
755 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
756 return consumerCh
757 }
758 return nil
759}
760
761func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
762 sc.lockTopicToConsumerChannelMap.Lock()
763 defer sc.lockTopicToConsumerChannelMap.Unlock()
764 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
765 consumerCh.channels = append(consumerCh.channels, ch)
766 return
767 }
khenaidoob332f9b2020-01-16 16:25:26 -0500768 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700769}
770
771//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
772func closeConsumers(consumers []interface{}) error {
773 var err error
774 for _, consumer := range consumers {
775 // Is it a partition consumers?
776 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
777 if errTemp := partionConsumer.Close(); errTemp != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500778 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
Scott Baker2c1c4822019-10-16 11:02:41 -0700779 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
780 // This can occur on race condition
781 err = nil
782 } else {
783 err = errTemp
784 }
785 }
786 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
787 if errTemp := groupConsumer.Close(); errTemp != nil {
788 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
789 // This can occur on race condition
790 err = nil
791 } else {
792 err = errTemp
793 }
794 }
795 }
796 }
797 return err
798}
799
800func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
801 sc.lockTopicToConsumerChannelMap.Lock()
802 defer sc.lockTopicToConsumerChannelMap.Unlock()
803 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
804 // Channel will be closed in the removeChannel method
805 consumerCh.channels = removeChannel(consumerCh.channels, ch)
806 // If there are no more channels then we can close the consumers itself
807 if len(consumerCh.channels) == 0 {
khenaidoob332f9b2020-01-16 16:25:26 -0500808 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700809 err := closeConsumers(consumerCh.consumers)
810 //err := consumerCh.consumers.Close()
811 delete(sc.topicToConsumerChannelMap, topic.Name)
812 return err
813 }
814 return nil
815 }
khenaidoob332f9b2020-01-16 16:25:26 -0500816 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700817 return errors.New("topic-does-not-exist")
818}
819
820func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
821 sc.lockTopicToConsumerChannelMap.Lock()
822 defer sc.lockTopicToConsumerChannelMap.Unlock()
823 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
824 for _, ch := range consumerCh.channels {
825 // Channel will be closed in the removeChannel method
826 removeChannel(consumerCh.channels, ch)
827 }
828 err := closeConsumers(consumerCh.consumers)
829 //if err == sarama.ErrUnknownTopicOrPartition {
830 // // Not an error
831 // err = nil
832 //}
833 //err := consumerCh.consumers.Close()
834 delete(sc.topicToConsumerChannelMap, topic.Name)
835 return err
836 }
khenaidoob332f9b2020-01-16 16:25:26 -0500837 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700838 return nil
839}
840
841func (sc *SaramaClient) clearConsumerChannelMap() error {
842 sc.lockTopicToConsumerChannelMap.Lock()
843 defer sc.lockTopicToConsumerChannelMap.Unlock()
844 var err error
845 for topic, consumerCh := range sc.topicToConsumerChannelMap {
846 for _, ch := range consumerCh.channels {
847 // Channel will be closed in the removeChannel method
848 removeChannel(consumerCh.channels, ch)
849 }
850 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
851 err = errTemp
852 }
853 //err = consumerCh.consumers.Close()
854 delete(sc.topicToConsumerChannelMap, topic)
855 }
856 return err
857}
858
859//createPublisher creates the publisher which is used to send a message onto kafka
860func (sc *SaramaClient) createPublisher() error {
861 // This Creates the publisher
862 config := sarama.NewConfig()
863 config.Producer.Partitioner = sarama.NewRandomPartitioner
864 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
865 config.Producer.Flush.Messages = sc.producerFlushMessages
866 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
867 config.Producer.Return.Errors = sc.producerReturnErrors
868 config.Producer.Return.Successes = sc.producerReturnSuccess
869 //config.Producer.RequiredAcks = sarama.WaitForAll
870 config.Producer.RequiredAcks = sarama.WaitForLocal
871
872 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
873 brokers := []string{kafkaFullAddr}
874
875 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500876 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700877 return err
878 } else {
879 sc.producer = producer
880 }
khenaidoob332f9b2020-01-16 16:25:26 -0500881 logger.Info("Kafka-publisher-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700882 return nil
883}
884
885func (sc *SaramaClient) createConsumer() error {
886 config := sarama.NewConfig()
887 config.Consumer.Return.Errors = true
888 config.Consumer.Fetch.Min = 1
889 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
890 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
891 config.Consumer.Offsets.Initial = sarama.OffsetNewest
892 config.Metadata.Retry.Max = sc.metadataMaxRetry
893 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
894 brokers := []string{kafkaFullAddr}
895
896 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500897 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700898 return err
899 } else {
900 sc.consumer = consumer
901 }
khenaidoob332f9b2020-01-16 16:25:26 -0500902 logger.Info("Kafka-consumers-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700903 return nil
904}
905
906// createGroupConsumer creates a consumers group
907func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
908 config := scc.NewConfig()
909 config.ClientID = uuid.New().String()
910 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700911 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
912 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700913 //config.Group.Return.Notifications = false
914 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
915 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
916 config.Consumer.Offsets.Initial = initialOffset
917 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
918 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
919 brokers := []string{kafkaFullAddr}
920
921 topics := []string{topic.Name}
922 var consumer *scc.Consumer
923 var err error
924
925 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500926 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700927 return nil, err
928 }
khenaidoob332f9b2020-01-16 16:25:26 -0500929 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700930
931 //sc.groupConsumers[topic.Name] = consumer
932 sc.addToGroupConsumers(topic.Name, consumer)
933 return consumer, nil
934}
935
936// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
937// topic via the unique channel each subscriber received during subscription
938func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
939 // Need to go over all channels and publish messages to them - do we need to copy msg?
940 sc.lockTopicToConsumerChannelMap.RLock()
Scott Baker2c1c4822019-10-16 11:02:41 -0700941 for _, ch := range consumerCh.channels {
942 go func(c chan *ic.InterContainerMessage) {
943 c <- protoMessage
944 }(ch)
945 }
Kent Hagermanccfa2132019-12-17 13:29:34 -0500946 sc.lockTopicToConsumerChannelMap.RUnlock()
947
948 if callback := sc.metadataCallback; callback != nil {
949 callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
950 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700951}
952
953func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
khenaidoob332f9b2020-01-16 16:25:26 -0500954 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700955startloop:
956 for {
957 select {
958 case err, ok := <-consumer.Errors():
959 if ok {
cbabud4978652019-12-04 08:04:21 +0100960 if sc.isLivenessError(err) {
961 sc.updateLiveness(false)
khenaidoob332f9b2020-01-16 16:25:26 -0500962 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
cbabud4978652019-12-04 08:04:21 +0100963 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700964 } else {
965 // Channel is closed
966 break startloop
967 }
968 case msg, ok := <-consumer.Messages():
khenaidoob332f9b2020-01-16 16:25:26 -0500969 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700970 if !ok {
971 // channel is closed
972 break startloop
973 }
974 msgBody := msg.Value
cbabud4978652019-12-04 08:04:21 +0100975 sc.updateLiveness(true)
khenaidoob332f9b2020-01-16 16:25:26 -0500976 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700977 icm := &ic.InterContainerMessage{}
978 if err := proto.Unmarshal(msgBody, icm); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500979 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700980 continue
981 }
982 go sc.dispatchToConsumers(consumerChnls, icm)
983 case <-sc.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -0500984 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700985 break startloop
986 }
987 }
khenaidoob332f9b2020-01-16 16:25:26 -0500988 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -0800989 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -0700990}
991
992func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
khenaidoob332f9b2020-01-16 16:25:26 -0500993 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700994
995startloop:
996 for {
997 select {
998 case err, ok := <-consumer.Errors():
999 if ok {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -08001000 if sc.isLivenessError(err) {
1001 sc.updateLiveness(false)
1002 }
khenaidoob332f9b2020-01-16 16:25:26 -05001003 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001004 } else {
khenaidoob332f9b2020-01-16 16:25:26 -05001005 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001006 // channel is closed
1007 break startloop
1008 }
1009 case msg, ok := <-consumer.Messages():
1010 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -05001011 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001012 // Channel closed
1013 break startloop
1014 }
Scott Baker104b67d2019-10-29 15:56:27 -07001015 sc.updateLiveness(true)
khenaidoob332f9b2020-01-16 16:25:26 -05001016 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001017 msgBody := msg.Value
1018 icm := &ic.InterContainerMessage{}
1019 if err := proto.Unmarshal(msgBody, icm); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001020 logger.Warnw("invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001021 continue
1022 }
1023 go sc.dispatchToConsumers(consumerChnls, icm)
1024 consumer.MarkOffset(msg, "")
1025 case ntf := <-consumer.Notifications():
khenaidoob332f9b2020-01-16 16:25:26 -05001026 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
Scott Baker2c1c4822019-10-16 11:02:41 -07001027 case <-sc.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -05001028 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001029 break startloop
1030 }
1031 }
khenaidoob332f9b2020-01-16 16:25:26 -05001032 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -08001033 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -07001034}
1035
1036func (sc *SaramaClient) startConsumers(topic *Topic) error {
khenaidoob332f9b2020-01-16 16:25:26 -05001037 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001038 var consumerCh *consumerChannels
1039 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001040 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001041 return errors.New("consumers-not-exist")
1042 }
1043 // For each consumer listening for that topic, start a consumption loop
1044 for _, consumer := range consumerCh.consumers {
1045 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1046 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1047 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1048 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1049 } else {
khenaidoob332f9b2020-01-16 16:25:26 -05001050 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001051 return errors.New("invalid-consumer")
1052 }
1053 }
1054 return nil
1055}
1056
1057//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1058//// for that topic. It also starts the routine that listens for messages on that topic.
1059func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1060 var pConsumers []sarama.PartitionConsumer
1061 var err error
1062
1063 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001064 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001065 return nil, err
1066 }
1067
1068 consumersIf := make([]interface{}, 0)
1069 for _, pConsumer := range pConsumers {
1070 consumersIf = append(consumersIf, pConsumer)
1071 }
1072
1073 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1074 // unbuffered to verify race conditions.
1075 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1076 cc := &consumerChannels{
1077 consumers: consumersIf,
1078 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1079 }
1080
1081 // Add the consumers channel to the map
1082 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1083
1084 //Start a consumers to listen on that specific topic
1085 go sc.startConsumers(topic)
1086
1087 return consumerListeningChannel, nil
1088}
1089
1090// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1091// for that topic. It also starts the routine that listens for messages on that topic.
1092func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1093 // TODO: Replace this development partition consumers with a group consumers
1094 var pConsumer *scc.Consumer
1095 var err error
1096 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001097 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001098 return nil, err
1099 }
1100 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1101 // unbuffered to verify race conditions.
1102 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1103 cc := &consumerChannels{
1104 consumers: []interface{}{pConsumer},
1105 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1106 }
1107
1108 // Add the consumers channel to the map
1109 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1110
1111 //Start a consumers to listen on that specific topic
1112 go sc.startConsumers(topic)
1113
1114 return consumerListeningChannel, nil
1115}
1116
1117func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoob332f9b2020-01-16 16:25:26 -05001118 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001119 partitionList, err := sc.consumer.Partitions(topic.Name)
1120 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001121 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001122 return nil, err
1123 }
1124
1125 pConsumers := make([]sarama.PartitionConsumer, 0)
1126 for _, partition := range partitionList {
1127 var pConsumer sarama.PartitionConsumer
1128 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001129 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001130 return nil, err
1131 }
1132 pConsumers = append(pConsumers, pConsumer)
1133 }
1134 return pConsumers, nil
1135}
1136
1137func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1138 var i int
1139 var channel chan *ic.InterContainerMessage
1140 for i, channel = range channels {
1141 if channel == ch {
1142 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1143 close(channel)
khenaidoob332f9b2020-01-16 16:25:26 -05001144 logger.Debug("channel-closed")
Scott Baker2c1c4822019-10-16 11:02:41 -07001145 return channels[:len(channels)-1]
1146 }
1147 }
1148 return channels
1149}
1150
1151func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1152 sc.lockOfGroupConsumers.Lock()
1153 defer sc.lockOfGroupConsumers.Unlock()
1154 if _, exist := sc.groupConsumers[topic]; !exist {
1155 sc.groupConsumers[topic] = consumer
1156 }
1157}
1158
1159func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1160 sc.lockOfGroupConsumers.Lock()
1161 defer sc.lockOfGroupConsumers.Unlock()
1162 if _, exist := sc.groupConsumers[topic]; exist {
1163 consumer := sc.groupConsumers[topic]
1164 delete(sc.groupConsumers, topic)
1165 if err := consumer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001166 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001167 return err
1168 }
1169 }
1170 return nil
1171}