blob: 6bc2a4980227b723da4be7998023314b39622739 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080019 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070020 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 scc "github.com/bsm/sarama-cluster"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080024 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070025 "github.com/golang/protobuf/proto"
26 "github.com/google/uuid"
Scott Bakerce767002019-10-23 13:30:24 -070027 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Bakerf1b096c2019-11-01 12:36:30 -070028 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070029 "strings"
30 "sync"
31 "time"
32)
33
Scott Baker2c1c4822019-10-16 11:02:41 -070034type returnErrorFunction func() error
35
36// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
37// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
38//consumer or a group consumer
39type consumerChannels struct {
40 consumers []interface{}
41 channels []chan *ic.InterContainerMessage
42}
43
44// SaramaClient represents the messaging proxy
45type SaramaClient struct {
46 cAdmin sarama.ClusterAdmin
47 client sarama.Client
48 KafkaHost string
49 KafkaPort int
50 producer sarama.AsyncProducer
51 consumer sarama.Consumer
52 groupConsumers map[string]*scc.Consumer
53 lockOfGroupConsumers sync.RWMutex
54 consumerGroupPrefix string
55 consumerType int
56 consumerGroupName string
57 producerFlushFrequency int
58 producerFlushMessages int
59 producerFlushMaxmessages int
60 producerRetryMax int
61 producerRetryBackOff time.Duration
62 producerReturnSuccess bool
63 producerReturnErrors bool
64 consumerMaxwait int
65 maxProcessingTime int
66 numPartitions int
67 numReplicas int
68 autoCreateTopic bool
69 doneCh chan int
70 topicToConsumerChannelMap map[string]*consumerChannels
71 lockTopicToConsumerChannelMap sync.RWMutex
72 topicLockMap map[string]*sync.RWMutex
73 lockOfTopicLockMap sync.RWMutex
74 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070075 alive bool
76 liveness chan bool
77 livenessChannelInterval time.Duration
78 lastLivenessTime time.Time
79 started bool
Scott Baker0fef6982019-12-12 09:49:42 -080080 healthy bool
81 healthiness chan bool
Scott Baker2c1c4822019-10-16 11:02:41 -070082}
83
84type SaramaClientOption func(*SaramaClient)
85
86func Host(host string) SaramaClientOption {
87 return func(args *SaramaClient) {
88 args.KafkaHost = host
89 }
90}
91
92func Port(port int) SaramaClientOption {
93 return func(args *SaramaClient) {
94 args.KafkaPort = port
95 }
96}
97
98func ConsumerGroupPrefix(prefix string) SaramaClientOption {
99 return func(args *SaramaClient) {
100 args.consumerGroupPrefix = prefix
101 }
102}
103
104func ConsumerGroupName(name string) SaramaClientOption {
105 return func(args *SaramaClient) {
106 args.consumerGroupName = name
107 }
108}
109
110func ConsumerType(consumer int) SaramaClientOption {
111 return func(args *SaramaClient) {
112 args.consumerType = consumer
113 }
114}
115
116func ProducerFlushFrequency(frequency int) SaramaClientOption {
117 return func(args *SaramaClient) {
118 args.producerFlushFrequency = frequency
119 }
120}
121
122func ProducerFlushMessages(num int) SaramaClientOption {
123 return func(args *SaramaClient) {
124 args.producerFlushMessages = num
125 }
126}
127
128func ProducerFlushMaxMessages(num int) SaramaClientOption {
129 return func(args *SaramaClient) {
130 args.producerFlushMaxmessages = num
131 }
132}
133
134func ProducerMaxRetries(num int) SaramaClientOption {
135 return func(args *SaramaClient) {
136 args.producerRetryMax = num
137 }
138}
139
140func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
141 return func(args *SaramaClient) {
142 args.producerRetryBackOff = duration
143 }
144}
145
146func ProducerReturnOnErrors(opt bool) SaramaClientOption {
147 return func(args *SaramaClient) {
148 args.producerReturnErrors = opt
149 }
150}
151
152func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
153 return func(args *SaramaClient) {
154 args.producerReturnSuccess = opt
155 }
156}
157
158func ConsumerMaxWait(wait int) SaramaClientOption {
159 return func(args *SaramaClient) {
160 args.consumerMaxwait = wait
161 }
162}
163
164func MaxProcessingTime(pTime int) SaramaClientOption {
165 return func(args *SaramaClient) {
166 args.maxProcessingTime = pTime
167 }
168}
169
170func NumPartitions(number int) SaramaClientOption {
171 return func(args *SaramaClient) {
172 args.numPartitions = number
173 }
174}
175
176func NumReplicas(number int) SaramaClientOption {
177 return func(args *SaramaClient) {
178 args.numReplicas = number
179 }
180}
181
182func AutoCreateTopic(opt bool) SaramaClientOption {
183 return func(args *SaramaClient) {
184 args.autoCreateTopic = opt
185 }
186}
187
188func MetadatMaxRetries(retry int) SaramaClientOption {
189 return func(args *SaramaClient) {
190 args.metadataMaxRetry = retry
191 }
192}
193
Scott Baker104b67d2019-10-29 15:56:27 -0700194func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
195 return func(args *SaramaClient) {
196 args.livenessChannelInterval = opt
197 }
198}
199
Scott Baker2c1c4822019-10-16 11:02:41 -0700200func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
201 client := &SaramaClient{
202 KafkaHost: DefaultKafkaHost,
203 KafkaPort: DefaultKafkaPort,
204 }
205 client.consumerType = DefaultConsumerType
206 client.producerFlushFrequency = DefaultProducerFlushFrequency
207 client.producerFlushMessages = DefaultProducerFlushMessages
208 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
209 client.producerReturnErrors = DefaultProducerReturnErrors
210 client.producerReturnSuccess = DefaultProducerReturnSuccess
211 client.producerRetryMax = DefaultProducerRetryMax
212 client.producerRetryBackOff = DefaultProducerRetryBackoff
213 client.consumerMaxwait = DefaultConsumerMaxwait
214 client.maxProcessingTime = DefaultMaxProcessingTime
215 client.numPartitions = DefaultNumberPartitions
216 client.numReplicas = DefaultNumberReplicas
217 client.autoCreateTopic = DefaultAutoCreateTopic
218 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700219 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700220
221 for _, option := range opts {
222 option(client)
223 }
224
225 client.groupConsumers = make(map[string]*scc.Consumer)
226
227 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
228 client.topicLockMap = make(map[string]*sync.RWMutex)
229 client.lockOfTopicLockMap = sync.RWMutex{}
230 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700231
Scott Baker0fef6982019-12-12 09:49:42 -0800232 // healthy and alive until proven otherwise
Scott Baker104b67d2019-10-29 15:56:27 -0700233 client.alive = true
Scott Baker0fef6982019-12-12 09:49:42 -0800234 client.healthy = true
Scott Baker104b67d2019-10-29 15:56:27 -0700235
Scott Baker2c1c4822019-10-16 11:02:41 -0700236 return client
237}
238
239func (sc *SaramaClient) Start() error {
khenaidoob332f9b2020-01-16 16:25:26 -0500240 logger.Info("Starting-kafka-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700241
242 // Create the Done channel
243 sc.doneCh = make(chan int, 1)
244
245 var err error
246
247 // Add a cleanup in case of failure to startup
248 defer func() {
249 if err != nil {
250 sc.Stop()
251 }
252 }()
253
254 // Create the Cluster Admin
255 if err = sc.createClusterAdmin(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500256 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700257 return err
258 }
259
260 // Create the Publisher
261 if err := sc.createPublisher(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500262 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700263 return err
264 }
265
266 if sc.consumerType == DefaultConsumerType {
267 // Create the master consumers
268 if err := sc.createConsumer(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500269 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700270 return err
271 }
272 }
273
274 // Create the topic to consumers/channel map
275 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
276
khenaidoob332f9b2020-01-16 16:25:26 -0500277 logger.Info("kafka-sarama-client-started")
Scott Baker2c1c4822019-10-16 11:02:41 -0700278
Scott Baker104b67d2019-10-29 15:56:27 -0700279 sc.started = true
280
Scott Baker2c1c4822019-10-16 11:02:41 -0700281 return nil
282}
283
284func (sc *SaramaClient) Stop() {
khenaidoob332f9b2020-01-16 16:25:26 -0500285 logger.Info("stopping-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700286
Scott Baker104b67d2019-10-29 15:56:27 -0700287 sc.started = false
288
Scott Baker2c1c4822019-10-16 11:02:41 -0700289 //Send a message over the done channel to close all long running routines
290 sc.doneCh <- 1
291
292 if sc.producer != nil {
293 if err := sc.producer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500294 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700295 }
296 }
297
298 if sc.consumer != nil {
299 if err := sc.consumer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500300 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700301 }
302 }
303
304 for key, val := range sc.groupConsumers {
khenaidoob332f9b2020-01-16 16:25:26 -0500305 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700306 if err := val.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500307 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700308 }
309 }
310
311 if sc.cAdmin != nil {
312 if err := sc.cAdmin.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500313 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700314 }
315 }
316
317 //TODO: Clear the consumers map
318 //sc.clearConsumerChannelMap()
319
khenaidoob332f9b2020-01-16 16:25:26 -0500320 logger.Info("sarama-client-stopped")
Scott Baker2c1c4822019-10-16 11:02:41 -0700321}
322
323//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
324// the invoking function must hold the lock
325func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
326 // Set the topic details
327 topicDetail := &sarama.TopicDetail{}
328 topicDetail.NumPartitions = int32(numPartition)
329 topicDetail.ReplicationFactor = int16(repFactor)
330 topicDetail.ConfigEntries = make(map[string]*string)
331 topicDetails := make(map[string]*sarama.TopicDetail)
332 topicDetails[topic.Name] = topicDetail
333
334 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
335 if err == sarama.ErrTopicAlreadyExists {
336 // Not an error
khenaidoob332f9b2020-01-16 16:25:26 -0500337 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700338 return nil
339 }
khenaidoob332f9b2020-01-16 16:25:26 -0500340 logger.Errorw("create-topic-failure", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700341 return err
342 }
343 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
344 // do so.
khenaidoob332f9b2020-01-16 16:25:26 -0500345 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
Scott Baker2c1c4822019-10-16 11:02:41 -0700346 return nil
347}
348
349//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
350// ensure no two go routines are performing operations on the same topic
351func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
352 sc.lockTopic(topic)
353 defer sc.unLockTopic(topic)
354
355 return sc.createTopic(topic, numPartition, repFactor)
356}
357
358//DeleteTopic removes a topic from the kafka Broker
359func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
360 sc.lockTopic(topic)
361 defer sc.unLockTopic(topic)
362
363 // Remove the topic from the broker
364 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
365 if err == sarama.ErrUnknownTopicOrPartition {
366 // Not an error as does not exist
khenaidoob332f9b2020-01-16 16:25:26 -0500367 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700368 return nil
369 }
khenaidoob332f9b2020-01-16 16:25:26 -0500370 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700371 return err
372 }
373
374 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
375 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500376 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700377 return err
378 }
379 return nil
380}
381
382// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
383// messages from that topic
384func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
385 sc.lockTopic(topic)
386 defer sc.unLockTopic(topic)
387
khenaidoob332f9b2020-01-16 16:25:26 -0500388 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700389
390 // If a consumers already exist for that topic then resuse it
391 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500392 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700393 // Create a channel specific for that consumers and add it to the consumers channel map
394 ch := make(chan *ic.InterContainerMessage)
395 sc.addChannelToConsumerChannelMap(topic, ch)
396 return ch, nil
397 }
398
399 // Register for the topic and set it up
400 var consumerListeningChannel chan *ic.InterContainerMessage
401 var err error
402
403 // Use the consumerType option to figure out the type of consumer to launch
404 if sc.consumerType == PartitionConsumer {
405 if sc.autoCreateTopic {
406 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500407 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700408 return nil, err
409 }
410 }
411 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500412 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700413 return nil, err
414 }
415 } else if sc.consumerType == GroupCustomer {
416 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
417 // does not consume from a precreated topic in some scenarios
418 //if sc.autoCreateTopic {
419 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500420 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700421 // return nil, err
422 // }
423 //}
424 //groupId := sc.consumerGroupName
425 groupId := getGroupId(kvArgs...)
426 // Include the group prefix
427 if groupId != "" {
428 groupId = sc.consumerGroupPrefix + groupId
429 } else {
430 // Need to use a unique group Id per topic
431 groupId = sc.consumerGroupPrefix + topic.Name
432 }
433 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500434 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700435 return nil, err
436 }
437
438 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500439 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700440 return nil, errors.New("unknown-consumer-type")
441 }
442
443 return consumerListeningChannel, nil
444}
445
446//UnSubscribe unsubscribe a consumer from a given topic
447func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
448 sc.lockTopic(topic)
449 defer sc.unLockTopic(topic)
450
khenaidoob332f9b2020-01-16 16:25:26 -0500451 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700452 var err error
453 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500454 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700455 }
456 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500457 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700458 }
459 return err
460}
461
Scott Baker104b67d2019-10-29 15:56:27 -0700462func (sc *SaramaClient) updateLiveness(alive bool) {
463 // Post a consistent stream of liveness data to the channel,
464 // so that in a live state, the core does not timeout and
465 // send a forced liveness message. Production of liveness
466 // events to the channel is rate-limited by livenessChannelInterval.
467 if sc.liveness != nil {
468 if sc.alive != alive {
khenaidoob332f9b2020-01-16 16:25:26 -0500469 logger.Info("update-liveness-channel-because-change")
Scott Baker104b67d2019-10-29 15:56:27 -0700470 sc.liveness <- alive
471 sc.lastLivenessTime = time.Now()
472 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
khenaidoob332f9b2020-01-16 16:25:26 -0500473 logger.Info("update-liveness-channel-because-interval")
Scott Baker104b67d2019-10-29 15:56:27 -0700474 sc.liveness <- alive
475 sc.lastLivenessTime = time.Now()
476 }
477 }
478
479 // Only emit a log message when the state changes
480 if sc.alive != alive {
khenaidoob332f9b2020-01-16 16:25:26 -0500481 logger.Info("set-client-alive", log.Fields{"alive": alive})
Scott Baker104b67d2019-10-29 15:56:27 -0700482 sc.alive = alive
483 }
484}
485
Scott Baker0fef6982019-12-12 09:49:42 -0800486// Once unhealthy, we never go back
487func (sc *SaramaClient) setUnhealthy() {
488 sc.healthy = false
489 if sc.healthiness != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500490 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker0fef6982019-12-12 09:49:42 -0800491 sc.healthiness <- sc.healthy
492 }
493}
494
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800495func (sc *SaramaClient) isLivenessError(err error) bool {
496 // Sarama producers and consumers encapsulate the error inside
497 // a ProducerError or ConsumerError struct.
498 if prodError, ok := err.(*sarama.ProducerError); ok {
499 err = prodError.Err
500 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
501 err = consumerError.Err
502 }
503
504 // Sarama-Cluster will compose the error into a ClusterError struct,
505 // which we can't do a compare by reference. To handle that, we the
506 // best we can do is compare the error strings.
507
508 switch err.Error() {
509 case context.DeadlineExceeded.Error():
khenaidoob332f9b2020-01-16 16:25:26 -0500510 logger.Info("is-liveness-error-timeout")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800511 return true
512 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
khenaidoob332f9b2020-01-16 16:25:26 -0500513 logger.Info("is-liveness-error-no-brokers")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800514 return true
515 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
khenaidoob332f9b2020-01-16 16:25:26 -0500516 logger.Info("is-liveness-error-shutting-down")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800517 return true
518 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
khenaidoob332f9b2020-01-16 16:25:26 -0500519 logger.Info("is-liveness-error-not-available")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800520 return true
521 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
khenaidoob332f9b2020-01-16 16:25:26 -0500522 logger.Info("is-liveness-error-circuit-breaker-open")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800523 return true
524 }
525
526 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
khenaidoob332f9b2020-01-16 16:25:26 -0500527 logger.Info("is-liveness-error-connection-refused")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800528 return true
529 }
530
Scott Baker718bee02020-01-07 09:52:02 -0800531 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
khenaidoob332f9b2020-01-16 16:25:26 -0500532 logger.Info("is-liveness-error-io-timeout")
Scott Baker718bee02020-01-07 09:52:02 -0800533 return true
534 }
535
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800536 // Other errors shouldn't trigger a loss of liveness
537
khenaidoob332f9b2020-01-16 16:25:26 -0500538 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800539
540 return false
541}
542
Scott Baker2c1c4822019-10-16 11:02:41 -0700543// send formats and sends the request onto the kafka messaging bus.
544func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
545
546 // Assert message is a proto message
547 var protoMsg proto.Message
548 var ok bool
549 // ascertain the value interface type is a proto.Message
550 if protoMsg, ok = msg.(proto.Message); !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500551 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
Scott Baker2c1c4822019-10-16 11:02:41 -0700552 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
553 }
554
555 var marshalled []byte
556 var err error
557 // Create the Sarama producer message
558 if marshalled, err = proto.Marshal(protoMsg); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500559 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700560 return err
561 }
562 key := ""
563 if len(keys) > 0 {
564 key = keys[0] // Only the first key is relevant
565 }
566 kafkaMsg := &sarama.ProducerMessage{
567 Topic: topic.Name,
568 Key: sarama.StringEncoder(key),
569 Value: sarama.ByteEncoder(marshalled),
570 }
571
572 // Send message to kafka
573 sc.producer.Input() <- kafkaMsg
574 // Wait for result
575 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
576 select {
577 case ok := <-sc.producer.Successes():
khenaidoob332f9b2020-01-16 16:25:26 -0500578 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700579 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700580 case notOk := <-sc.producer.Errors():
khenaidoob332f9b2020-01-16 16:25:26 -0500581 logger.Debugw("error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800582 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700583 sc.updateLiveness(false)
584 }
585 return notOk
586 }
587 return nil
588}
589
590// Enable the liveness monitor channel. This channel will report
591// a "true" or "false" on every publish, which indicates whether
592// or not the channel is still live. This channel is then picked up
593// by the service (i.e. rw_core / ro_core) to update readiness status
594// and/or take other actions.
595func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
khenaidoob332f9b2020-01-16 16:25:26 -0500596 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Baker104b67d2019-10-29 15:56:27 -0700597 if enable {
598 if sc.liveness == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500599 logger.Info("kafka-create-liveness-channel")
Scott Baker104b67d2019-10-29 15:56:27 -0700600 // At least 1, so we can immediately post to it without blocking
601 // Setting a bigger number (10) allows the monitor to fall behind
602 // without blocking others. The monitor shouldn't really fall
603 // behind...
604 sc.liveness = make(chan bool, 10)
605 // post intial state to the channel
606 sc.liveness <- sc.alive
607 }
608 } else {
609 // TODO: Think about whether we need the ability to turn off
610 // liveness monitoring
611 panic("Turning off liveness reporting is not supported")
612 }
613 return sc.liveness
614}
615
Scott Baker0fef6982019-12-12 09:49:42 -0800616// Enable the Healthiness monitor channel. This channel will report "false"
617// if the kafka consumers die, or some other problem occurs which is
618// catastrophic that would require re-creating the client.
619func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
khenaidoob332f9b2020-01-16 16:25:26 -0500620 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker0fef6982019-12-12 09:49:42 -0800621 if enable {
622 if sc.healthiness == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500623 logger.Info("kafka-create-healthiness-channel")
Scott Baker0fef6982019-12-12 09:49:42 -0800624 // At least 1, so we can immediately post to it without blocking
625 // Setting a bigger number (10) allows the monitor to fall behind
626 // without blocking others. The monitor shouldn't really fall
627 // behind...
628 sc.healthiness = make(chan bool, 10)
629 // post intial state to the channel
630 sc.healthiness <- sc.healthy
631 }
632 } else {
633 // TODO: Think about whether we need the ability to turn off
634 // liveness monitoring
635 panic("Turning off healthiness reporting is not supported")
636 }
637 return sc.healthiness
638}
639
Scott Baker104b67d2019-10-29 15:56:27 -0700640// send an empty message on the liveness channel to check whether connectivity has
641// been restored.
642func (sc *SaramaClient) SendLiveness() error {
643 if !sc.started {
644 return fmt.Errorf("SendLiveness() called while not started")
645 }
646
647 kafkaMsg := &sarama.ProducerMessage{
648 Topic: "_liveness_test",
649 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
650 }
651
652 // Send message to kafka
653 sc.producer.Input() <- kafkaMsg
654 // Wait for result
655 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
656 select {
657 case ok := <-sc.producer.Successes():
khenaidoob332f9b2020-01-16 16:25:26 -0500658 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700659 sc.updateLiveness(true)
660 case notOk := <-sc.producer.Errors():
khenaidoob332f9b2020-01-16 16:25:26 -0500661 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800662 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700663 sc.updateLiveness(false)
664 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700665 return notOk
666 }
667 return nil
668}
669
670// getGroupId returns the group id from the key-value args.
671func getGroupId(kvArgs ...*KVArg) string {
672 for _, arg := range kvArgs {
673 if arg.Key == GroupIdKey {
674 return arg.Value.(string)
675 }
676 }
677 return ""
678}
679
680// getOffset returns the offset from the key-value args.
681func getOffset(kvArgs ...*KVArg) int64 {
682 for _, arg := range kvArgs {
683 if arg.Key == Offset {
684 return arg.Value.(int64)
685 }
686 }
687 return sarama.OffsetNewest
688}
689
690func (sc *SaramaClient) createClusterAdmin() error {
691 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
692 config := sarama.NewConfig()
693 config.Version = sarama.V1_0_0_0
694
695 // Create a cluster Admin
696 var cAdmin sarama.ClusterAdmin
697 var err error
698 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500699 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
Scott Baker2c1c4822019-10-16 11:02:41 -0700700 return err
701 }
702 sc.cAdmin = cAdmin
703 return nil
704}
705
706func (sc *SaramaClient) lockTopic(topic *Topic) {
707 sc.lockOfTopicLockMap.Lock()
708 if _, exist := sc.topicLockMap[topic.Name]; exist {
709 sc.lockOfTopicLockMap.Unlock()
710 sc.topicLockMap[topic.Name].Lock()
711 } else {
712 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
713 sc.lockOfTopicLockMap.Unlock()
714 sc.topicLockMap[topic.Name].Lock()
715 }
716}
717
718func (sc *SaramaClient) unLockTopic(topic *Topic) {
719 sc.lockOfTopicLockMap.Lock()
720 defer sc.lockOfTopicLockMap.Unlock()
721 if _, exist := sc.topicLockMap[topic.Name]; exist {
722 sc.topicLockMap[topic.Name].Unlock()
723 }
724}
725
726func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
727 sc.lockTopicToConsumerChannelMap.Lock()
728 defer sc.lockTopicToConsumerChannelMap.Unlock()
729 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
730 sc.topicToConsumerChannelMap[id] = arg
731 }
732}
733
734func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
735 sc.lockTopicToConsumerChannelMap.Lock()
736 defer sc.lockTopicToConsumerChannelMap.Unlock()
737 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
738 delete(sc.topicToConsumerChannelMap, id)
739 }
740}
741
742func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
743 sc.lockTopicToConsumerChannelMap.RLock()
744 defer sc.lockTopicToConsumerChannelMap.RUnlock()
745
746 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
747 return consumerCh
748 }
749 return nil
750}
751
752func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
753 sc.lockTopicToConsumerChannelMap.Lock()
754 defer sc.lockTopicToConsumerChannelMap.Unlock()
755 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
756 consumerCh.channels = append(consumerCh.channels, ch)
757 return
758 }
khenaidoob332f9b2020-01-16 16:25:26 -0500759 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700760}
761
762//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
763func closeConsumers(consumers []interface{}) error {
764 var err error
765 for _, consumer := range consumers {
766 // Is it a partition consumers?
767 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
768 if errTemp := partionConsumer.Close(); errTemp != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500769 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
Scott Baker2c1c4822019-10-16 11:02:41 -0700770 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
771 // This can occur on race condition
772 err = nil
773 } else {
774 err = errTemp
775 }
776 }
777 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
778 if errTemp := groupConsumer.Close(); errTemp != nil {
779 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
780 // This can occur on race condition
781 err = nil
782 } else {
783 err = errTemp
784 }
785 }
786 }
787 }
788 return err
789}
790
791func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
792 sc.lockTopicToConsumerChannelMap.Lock()
793 defer sc.lockTopicToConsumerChannelMap.Unlock()
794 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
795 // Channel will be closed in the removeChannel method
796 consumerCh.channels = removeChannel(consumerCh.channels, ch)
797 // If there are no more channels then we can close the consumers itself
798 if len(consumerCh.channels) == 0 {
khenaidoob332f9b2020-01-16 16:25:26 -0500799 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700800 err := closeConsumers(consumerCh.consumers)
801 //err := consumerCh.consumers.Close()
802 delete(sc.topicToConsumerChannelMap, topic.Name)
803 return err
804 }
805 return nil
806 }
khenaidoob332f9b2020-01-16 16:25:26 -0500807 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700808 return errors.New("topic-does-not-exist")
809}
810
811func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
812 sc.lockTopicToConsumerChannelMap.Lock()
813 defer sc.lockTopicToConsumerChannelMap.Unlock()
814 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
815 for _, ch := range consumerCh.channels {
816 // Channel will be closed in the removeChannel method
817 removeChannel(consumerCh.channels, ch)
818 }
819 err := closeConsumers(consumerCh.consumers)
820 //if err == sarama.ErrUnknownTopicOrPartition {
821 // // Not an error
822 // err = nil
823 //}
824 //err := consumerCh.consumers.Close()
825 delete(sc.topicToConsumerChannelMap, topic.Name)
826 return err
827 }
khenaidoob332f9b2020-01-16 16:25:26 -0500828 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700829 return nil
830}
831
832func (sc *SaramaClient) clearConsumerChannelMap() error {
833 sc.lockTopicToConsumerChannelMap.Lock()
834 defer sc.lockTopicToConsumerChannelMap.Unlock()
835 var err error
836 for topic, consumerCh := range sc.topicToConsumerChannelMap {
837 for _, ch := range consumerCh.channels {
838 // Channel will be closed in the removeChannel method
839 removeChannel(consumerCh.channels, ch)
840 }
841 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
842 err = errTemp
843 }
844 //err = consumerCh.consumers.Close()
845 delete(sc.topicToConsumerChannelMap, topic)
846 }
847 return err
848}
849
850//createPublisher creates the publisher which is used to send a message onto kafka
851func (sc *SaramaClient) createPublisher() error {
852 // This Creates the publisher
853 config := sarama.NewConfig()
854 config.Producer.Partitioner = sarama.NewRandomPartitioner
855 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
856 config.Producer.Flush.Messages = sc.producerFlushMessages
857 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
858 config.Producer.Return.Errors = sc.producerReturnErrors
859 config.Producer.Return.Successes = sc.producerReturnSuccess
860 //config.Producer.RequiredAcks = sarama.WaitForAll
861 config.Producer.RequiredAcks = sarama.WaitForLocal
862
863 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
864 brokers := []string{kafkaFullAddr}
865
866 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500867 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700868 return err
869 } else {
870 sc.producer = producer
871 }
khenaidoob332f9b2020-01-16 16:25:26 -0500872 logger.Info("Kafka-publisher-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700873 return nil
874}
875
876func (sc *SaramaClient) createConsumer() error {
877 config := sarama.NewConfig()
878 config.Consumer.Return.Errors = true
879 config.Consumer.Fetch.Min = 1
880 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
881 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
882 config.Consumer.Offsets.Initial = sarama.OffsetNewest
883 config.Metadata.Retry.Max = sc.metadataMaxRetry
884 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
885 brokers := []string{kafkaFullAddr}
886
887 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500888 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700889 return err
890 } else {
891 sc.consumer = consumer
892 }
khenaidoob332f9b2020-01-16 16:25:26 -0500893 logger.Info("Kafka-consumers-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700894 return nil
895}
896
897// createGroupConsumer creates a consumers group
898func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
899 config := scc.NewConfig()
900 config.ClientID = uuid.New().String()
901 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700902 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
903 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700904 //config.Group.Return.Notifications = false
905 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
906 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
907 config.Consumer.Offsets.Initial = initialOffset
908 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
909 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
910 brokers := []string{kafkaFullAddr}
911
912 topics := []string{topic.Name}
913 var consumer *scc.Consumer
914 var err error
915
916 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500917 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700918 return nil, err
919 }
khenaidoob332f9b2020-01-16 16:25:26 -0500920 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700921
922 //sc.groupConsumers[topic.Name] = consumer
923 sc.addToGroupConsumers(topic.Name, consumer)
924 return consumer, nil
925}
926
927// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
928// topic via the unique channel each subscriber received during subscription
929func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
930 // Need to go over all channels and publish messages to them - do we need to copy msg?
931 sc.lockTopicToConsumerChannelMap.RLock()
932 defer sc.lockTopicToConsumerChannelMap.RUnlock()
933 for _, ch := range consumerCh.channels {
934 go func(c chan *ic.InterContainerMessage) {
935 c <- protoMessage
936 }(ch)
937 }
938}
939
940func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
khenaidoob332f9b2020-01-16 16:25:26 -0500941 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700942startloop:
943 for {
944 select {
945 case err, ok := <-consumer.Errors():
946 if ok {
cbabud4978652019-12-04 08:04:21 +0100947 if sc.isLivenessError(err) {
948 sc.updateLiveness(false)
khenaidoob332f9b2020-01-16 16:25:26 -0500949 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
cbabud4978652019-12-04 08:04:21 +0100950 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700951 } else {
952 // Channel is closed
953 break startloop
954 }
955 case msg, ok := <-consumer.Messages():
khenaidoob332f9b2020-01-16 16:25:26 -0500956 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700957 if !ok {
958 // channel is closed
959 break startloop
960 }
961 msgBody := msg.Value
cbabud4978652019-12-04 08:04:21 +0100962 sc.updateLiveness(true)
khenaidoob332f9b2020-01-16 16:25:26 -0500963 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700964 icm := &ic.InterContainerMessage{}
965 if err := proto.Unmarshal(msgBody, icm); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500966 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700967 continue
968 }
969 go sc.dispatchToConsumers(consumerChnls, icm)
970 case <-sc.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -0500971 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700972 break startloop
973 }
974 }
khenaidoob332f9b2020-01-16 16:25:26 -0500975 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -0800976 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -0700977}
978
979func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
khenaidoob332f9b2020-01-16 16:25:26 -0500980 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700981
982startloop:
983 for {
984 select {
985 case err, ok := <-consumer.Errors():
986 if ok {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800987 if sc.isLivenessError(err) {
988 sc.updateLiveness(false)
989 }
khenaidoob332f9b2020-01-16 16:25:26 -0500990 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700991 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500992 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700993 // channel is closed
994 break startloop
995 }
996 case msg, ok := <-consumer.Messages():
997 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500998 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700999 // Channel closed
1000 break startloop
1001 }
Scott Baker104b67d2019-10-29 15:56:27 -07001002 sc.updateLiveness(true)
khenaidoob332f9b2020-01-16 16:25:26 -05001003 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001004 msgBody := msg.Value
1005 icm := &ic.InterContainerMessage{}
1006 if err := proto.Unmarshal(msgBody, icm); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001007 logger.Warnw("invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001008 continue
1009 }
1010 go sc.dispatchToConsumers(consumerChnls, icm)
1011 consumer.MarkOffset(msg, "")
1012 case ntf := <-consumer.Notifications():
khenaidoob332f9b2020-01-16 16:25:26 -05001013 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
Scott Baker2c1c4822019-10-16 11:02:41 -07001014 case <-sc.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -05001015 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001016 break startloop
1017 }
1018 }
khenaidoob332f9b2020-01-16 16:25:26 -05001019 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -08001020 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -07001021}
1022
1023func (sc *SaramaClient) startConsumers(topic *Topic) error {
khenaidoob332f9b2020-01-16 16:25:26 -05001024 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001025 var consumerCh *consumerChannels
1026 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001027 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001028 return errors.New("consumers-not-exist")
1029 }
1030 // For each consumer listening for that topic, start a consumption loop
1031 for _, consumer := range consumerCh.consumers {
1032 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1033 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1034 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1035 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1036 } else {
khenaidoob332f9b2020-01-16 16:25:26 -05001037 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001038 return errors.New("invalid-consumer")
1039 }
1040 }
1041 return nil
1042}
1043
1044//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1045//// for that topic. It also starts the routine that listens for messages on that topic.
1046func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1047 var pConsumers []sarama.PartitionConsumer
1048 var err error
1049
1050 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001051 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001052 return nil, err
1053 }
1054
1055 consumersIf := make([]interface{}, 0)
1056 for _, pConsumer := range pConsumers {
1057 consumersIf = append(consumersIf, pConsumer)
1058 }
1059
1060 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1061 // unbuffered to verify race conditions.
1062 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1063 cc := &consumerChannels{
1064 consumers: consumersIf,
1065 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1066 }
1067
1068 // Add the consumers channel to the map
1069 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1070
1071 //Start a consumers to listen on that specific topic
1072 go sc.startConsumers(topic)
1073
1074 return consumerListeningChannel, nil
1075}
1076
1077// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1078// for that topic. It also starts the routine that listens for messages on that topic.
1079func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1080 // TODO: Replace this development partition consumers with a group consumers
1081 var pConsumer *scc.Consumer
1082 var err error
1083 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001084 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001085 return nil, err
1086 }
1087 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1088 // unbuffered to verify race conditions.
1089 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1090 cc := &consumerChannels{
1091 consumers: []interface{}{pConsumer},
1092 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1093 }
1094
1095 // Add the consumers channel to the map
1096 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1097
1098 //Start a consumers to listen on that specific topic
1099 go sc.startConsumers(topic)
1100
1101 return consumerListeningChannel, nil
1102}
1103
1104func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoob332f9b2020-01-16 16:25:26 -05001105 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001106 partitionList, err := sc.consumer.Partitions(topic.Name)
1107 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001108 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001109 return nil, err
1110 }
1111
1112 pConsumers := make([]sarama.PartitionConsumer, 0)
1113 for _, partition := range partitionList {
1114 var pConsumer sarama.PartitionConsumer
1115 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001116 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001117 return nil, err
1118 }
1119 pConsumers = append(pConsumers, pConsumer)
1120 }
1121 return pConsumers, nil
1122}
1123
1124func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1125 var i int
1126 var channel chan *ic.InterContainerMessage
1127 for i, channel = range channels {
1128 if channel == ch {
1129 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1130 close(channel)
khenaidoob332f9b2020-01-16 16:25:26 -05001131 logger.Debug("channel-closed")
Scott Baker2c1c4822019-10-16 11:02:41 -07001132 return channels[:len(channels)-1]
1133 }
1134 }
1135 return channels
1136}
1137
1138func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1139 sc.lockOfGroupConsumers.Lock()
1140 defer sc.lockOfGroupConsumers.Unlock()
1141 if _, exist := sc.groupConsumers[topic]; !exist {
1142 sc.groupConsumers[topic] = consumer
1143 }
1144}
1145
1146func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1147 sc.lockOfGroupConsumers.Lock()
1148 defer sc.lockOfGroupConsumers.Unlock()
1149 if _, exist := sc.groupConsumers[topic]; exist {
1150 consumer := sc.groupConsumers[topic]
1151 delete(sc.groupConsumers, topic)
1152 if err := consumer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001153 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001154 return err
1155 }
1156 }
1157 return nil
1158}