blob: 9d4ab5209848b523ab625f0e1b574b202462e6df [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Pauldd23a992019-11-14 07:06:31 +000019 "context"
William Kurkianea869482019-04-09 15:16:11 -040020 "errors"
21 "fmt"
Esin Karamanccb714b2019-11-29 15:02:06 +000022 "strings"
23 "sync"
24 "time"
25
kdarapub26b4502019-10-05 03:02:33 +053026 "github.com/Shopify/sarama"
William Kurkianea869482019-04-09 15:16:11 -040027 scc "github.com/bsm/sarama-cluster"
Devmalya Pauldd23a992019-11-14 07:06:31 +000028 "github.com/eapache/go-resiliency/breaker"
William Kurkianea869482019-04-09 15:16:11 -040029 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
Esin Karamanccb714b2019-11-29 15:02:06 +000031 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
William Kurkianea869482019-04-09 15:16:11 -040033)
34
William Kurkianea869482019-04-09 15:16:11 -040035type returnErrorFunction func() error
36
37// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
38// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
39//consumer or a group consumer
40type consumerChannels struct {
41 consumers []interface{}
42 channels []chan *ic.InterContainerMessage
43}
44
45// SaramaClient represents the messaging proxy
46type SaramaClient struct {
47 cAdmin sarama.ClusterAdmin
48 client sarama.Client
49 KafkaHost string
50 KafkaPort int
51 producer sarama.AsyncProducer
52 consumer sarama.Consumer
53 groupConsumers map[string]*scc.Consumer
Matt Jeanneret384d8c92019-05-06 14:27:31 -040054 lockOfGroupConsumers sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040055 consumerGroupPrefix string
56 consumerType int
57 consumerGroupName string
58 producerFlushFrequency int
59 producerFlushMessages int
60 producerFlushMaxmessages int
61 producerRetryMax int
62 producerRetryBackOff time.Duration
63 producerReturnSuccess bool
64 producerReturnErrors bool
65 consumerMaxwait int
66 maxProcessingTime int
67 numPartitions int
68 numReplicas int
69 autoCreateTopic bool
70 doneCh chan int
71 topicToConsumerChannelMap map[string]*consumerChannels
72 lockTopicToConsumerChannelMap sync.RWMutex
73 topicLockMap map[string]*sync.RWMutex
74 lockOfTopicLockMap sync.RWMutex
Mahir Gunyele77977b2019-06-27 05:36:22 -070075 metadataMaxRetry int
cbabu95f21522019-11-13 14:25:18 +010076 alive bool
77 liveness chan bool
78 livenessChannelInterval time.Duration
79 lastLivenessTime time.Time
80 started bool
Scott Baker86fce9a2019-12-12 09:47:17 -080081 healthy bool
82 healthiness chan bool
William Kurkianea869482019-04-09 15:16:11 -040083}
84
85type SaramaClientOption func(*SaramaClient)
86
87func Host(host string) SaramaClientOption {
88 return func(args *SaramaClient) {
89 args.KafkaHost = host
90 }
91}
92
93func Port(port int) SaramaClientOption {
94 return func(args *SaramaClient) {
95 args.KafkaPort = port
96 }
97}
98
99func ConsumerGroupPrefix(prefix string) SaramaClientOption {
100 return func(args *SaramaClient) {
101 args.consumerGroupPrefix = prefix
102 }
103}
104
105func ConsumerGroupName(name string) SaramaClientOption {
106 return func(args *SaramaClient) {
107 args.consumerGroupName = name
108 }
109}
110
111func ConsumerType(consumer int) SaramaClientOption {
112 return func(args *SaramaClient) {
113 args.consumerType = consumer
114 }
115}
116
117func ProducerFlushFrequency(frequency int) SaramaClientOption {
118 return func(args *SaramaClient) {
119 args.producerFlushFrequency = frequency
120 }
121}
122
123func ProducerFlushMessages(num int) SaramaClientOption {
124 return func(args *SaramaClient) {
125 args.producerFlushMessages = num
126 }
127}
128
129func ProducerFlushMaxMessages(num int) SaramaClientOption {
130 return func(args *SaramaClient) {
131 args.producerFlushMaxmessages = num
132 }
133}
134
135func ProducerMaxRetries(num int) SaramaClientOption {
136 return func(args *SaramaClient) {
137 args.producerRetryMax = num
138 }
139}
140
141func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
142 return func(args *SaramaClient) {
143 args.producerRetryBackOff = duration
144 }
145}
146
147func ProducerReturnOnErrors(opt bool) SaramaClientOption {
148 return func(args *SaramaClient) {
149 args.producerReturnErrors = opt
150 }
151}
152
153func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
154 return func(args *SaramaClient) {
155 args.producerReturnSuccess = opt
156 }
157}
158
159func ConsumerMaxWait(wait int) SaramaClientOption {
160 return func(args *SaramaClient) {
161 args.consumerMaxwait = wait
162 }
163}
164
165func MaxProcessingTime(pTime int) SaramaClientOption {
166 return func(args *SaramaClient) {
167 args.maxProcessingTime = pTime
168 }
169}
170
171func NumPartitions(number int) SaramaClientOption {
172 return func(args *SaramaClient) {
173 args.numPartitions = number
174 }
175}
176
177func NumReplicas(number int) SaramaClientOption {
178 return func(args *SaramaClient) {
179 args.numReplicas = number
180 }
181}
182
183func AutoCreateTopic(opt bool) SaramaClientOption {
184 return func(args *SaramaClient) {
185 args.autoCreateTopic = opt
186 }
187}
188
Mahir Gunyele77977b2019-06-27 05:36:22 -0700189func MetadatMaxRetries(retry int) SaramaClientOption {
190 return func(args *SaramaClient) {
191 args.metadataMaxRetry = retry
192 }
193}
194
cbabu95f21522019-11-13 14:25:18 +0100195func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
196 return func(args *SaramaClient) {
197 args.livenessChannelInterval = opt
198 }
199}
200
William Kurkianea869482019-04-09 15:16:11 -0400201func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
202 client := &SaramaClient{
203 KafkaHost: DefaultKafkaHost,
204 KafkaPort: DefaultKafkaPort,
205 }
206 client.consumerType = DefaultConsumerType
207 client.producerFlushFrequency = DefaultProducerFlushFrequency
208 client.producerFlushMessages = DefaultProducerFlushMessages
209 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
210 client.producerReturnErrors = DefaultProducerReturnErrors
211 client.producerReturnSuccess = DefaultProducerReturnSuccess
212 client.producerRetryMax = DefaultProducerRetryMax
213 client.producerRetryBackOff = DefaultProducerRetryBackoff
214 client.consumerMaxwait = DefaultConsumerMaxwait
215 client.maxProcessingTime = DefaultMaxProcessingTime
216 client.numPartitions = DefaultNumberPartitions
217 client.numReplicas = DefaultNumberReplicas
218 client.autoCreateTopic = DefaultAutoCreateTopic
Mahir Gunyele77977b2019-06-27 05:36:22 -0700219 client.metadataMaxRetry = DefaultMetadataMaxRetry
cbabu95f21522019-11-13 14:25:18 +0100220 client.livenessChannelInterval = DefaultLivenessChannelInterval
William Kurkianea869482019-04-09 15:16:11 -0400221
222 for _, option := range opts {
223 option(client)
224 }
225
226 client.groupConsumers = make(map[string]*scc.Consumer)
227
228 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
229 client.topicLockMap = make(map[string]*sync.RWMutex)
230 client.lockOfTopicLockMap = sync.RWMutex{}
231 client.lockOfGroupConsumers = sync.RWMutex{}
cbabu95f21522019-11-13 14:25:18 +0100232
Scott Baker86fce9a2019-12-12 09:47:17 -0800233 // healthy and alive until proven otherwise
cbabu95f21522019-11-13 14:25:18 +0100234 client.alive = true
Scott Baker86fce9a2019-12-12 09:47:17 -0800235 client.healthy = true
cbabu95f21522019-11-13 14:25:18 +0100236
William Kurkianea869482019-04-09 15:16:11 -0400237 return client
238}
239
240func (sc *SaramaClient) Start() error {
Esin Karamanccb714b2019-11-29 15:02:06 +0000241 logger.Info("Starting-kafka-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400242
243 // Create the Done channel
244 sc.doneCh = make(chan int, 1)
245
246 var err error
247
Devmalya Paul495b94a2019-08-27 19:42:00 -0400248 // Add a cleanup in case of failure to startup
249 defer func() {
250 if err != nil {
251 sc.Stop()
252 }
253 }()
254
William Kurkianea869482019-04-09 15:16:11 -0400255 // Create the Cluster Admin
256 if err = sc.createClusterAdmin(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000257 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400258 return err
259 }
260
261 // Create the Publisher
262 if err := sc.createPublisher(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000263 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400264 return err
265 }
266
267 if sc.consumerType == DefaultConsumerType {
268 // Create the master consumers
269 if err := sc.createConsumer(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000270 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400271 return err
272 }
273 }
274
275 // Create the topic to consumers/channel map
276 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
277
Esin Karamanccb714b2019-11-29 15:02:06 +0000278 logger.Info("kafka-sarama-client-started")
William Kurkianea869482019-04-09 15:16:11 -0400279
cbabu95f21522019-11-13 14:25:18 +0100280 sc.started = true
281
William Kurkianea869482019-04-09 15:16:11 -0400282 return nil
283}
284
285func (sc *SaramaClient) Stop() {
Esin Karamanccb714b2019-11-29 15:02:06 +0000286 logger.Info("stopping-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400287
cbabu95f21522019-11-13 14:25:18 +0100288 sc.started = false
289
William Kurkianea869482019-04-09 15:16:11 -0400290 //Send a message over the done channel to close all long running routines
291 sc.doneCh <- 1
292
293 if sc.producer != nil {
294 if err := sc.producer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000295 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400296 }
297 }
298
299 if sc.consumer != nil {
300 if err := sc.consumer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000301 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400302 }
303 }
304
305 for key, val := range sc.groupConsumers {
Esin Karamanccb714b2019-11-29 15:02:06 +0000306 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400307 if err := val.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000308 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400309 }
310 }
311
312 if sc.cAdmin != nil {
313 if err := sc.cAdmin.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000314 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400315 }
316 }
317
318 //TODO: Clear the consumers map
319 //sc.clearConsumerChannelMap()
320
Esin Karamanccb714b2019-11-29 15:02:06 +0000321 logger.Info("sarama-client-stopped")
William Kurkianea869482019-04-09 15:16:11 -0400322}
323
324//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
325// the invoking function must hold the lock
326func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
327 // Set the topic details
328 topicDetail := &sarama.TopicDetail{}
329 topicDetail.NumPartitions = int32(numPartition)
330 topicDetail.ReplicationFactor = int16(repFactor)
331 topicDetail.ConfigEntries = make(map[string]*string)
332 topicDetails := make(map[string]*sarama.TopicDetail)
333 topicDetails[topic.Name] = topicDetail
334
335 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
336 if err == sarama.ErrTopicAlreadyExists {
337 // Not an error
Esin Karamanccb714b2019-11-29 15:02:06 +0000338 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400339 return nil
340 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000341 logger.Errorw("create-topic-failure", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400342 return err
343 }
344 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
345 // do so.
Esin Karamanccb714b2019-11-29 15:02:06 +0000346 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
William Kurkianea869482019-04-09 15:16:11 -0400347 return nil
348}
349
350//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
351// ensure no two go routines are performing operations on the same topic
352func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
353 sc.lockTopic(topic)
354 defer sc.unLockTopic(topic)
355
356 return sc.createTopic(topic, numPartition, repFactor)
357}
358
359//DeleteTopic removes a topic from the kafka Broker
360func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
361 sc.lockTopic(topic)
362 defer sc.unLockTopic(topic)
363
364 // Remove the topic from the broker
365 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
366 if err == sarama.ErrUnknownTopicOrPartition {
367 // Not an error as does not exist
Esin Karamanccb714b2019-11-29 15:02:06 +0000368 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400369 return nil
370 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000371 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400372 return err
373 }
374
375 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
376 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000377 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400378 return err
379 }
380 return nil
381}
382
383// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
384// messages from that topic
385func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
386 sc.lockTopic(topic)
387 defer sc.unLockTopic(topic)
388
Esin Karamanccb714b2019-11-29 15:02:06 +0000389 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400390
391 // If a consumers already exist for that topic then resuse it
392 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000393 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400394 // Create a channel specific for that consumers and add it to the consumers channel map
395 ch := make(chan *ic.InterContainerMessage)
396 sc.addChannelToConsumerChannelMap(topic, ch)
397 return ch, nil
398 }
399
400 // Register for the topic and set it up
401 var consumerListeningChannel chan *ic.InterContainerMessage
402 var err error
403
404 // Use the consumerType option to figure out the type of consumer to launch
405 if sc.consumerType == PartitionConsumer {
406 if sc.autoCreateTopic {
407 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000408 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400409 return nil, err
410 }
411 }
412 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000413 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400414 return nil, err
415 }
416 } else if sc.consumerType == GroupCustomer {
417 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
418 // does not consume from a precreated topic in some scenarios
419 //if sc.autoCreateTopic {
420 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000421 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400422 // return nil, err
423 // }
424 //}
425 //groupId := sc.consumerGroupName
426 groupId := getGroupId(kvArgs...)
427 // Include the group prefix
428 if groupId != "" {
429 groupId = sc.consumerGroupPrefix + groupId
430 } else {
431 // Need to use a unique group Id per topic
432 groupId = sc.consumerGroupPrefix + topic.Name
433 }
434 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000435 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400436 return nil, err
437 }
438
439 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000440 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
William Kurkianea869482019-04-09 15:16:11 -0400441 return nil, errors.New("unknown-consumer-type")
442 }
443
444 return consumerListeningChannel, nil
445}
446
447//UnSubscribe unsubscribe a consumer from a given topic
448func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
449 sc.lockTopic(topic)
450 defer sc.unLockTopic(topic)
451
Esin Karamanccb714b2019-11-29 15:02:06 +0000452 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400453 var err error
454 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000455 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400456 }
457 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000458 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400459 }
460 return err
461}
462
cbabu95f21522019-11-13 14:25:18 +0100463func (sc *SaramaClient) updateLiveness(alive bool) {
464 // Post a consistent stream of liveness data to the channel,
465 // so that in a live state, the core does not timeout and
466 // send a forced liveness message. Production of liveness
467 // events to the channel is rate-limited by livenessChannelInterval.
468 if sc.liveness != nil {
469 if sc.alive != alive {
Esin Karamanccb714b2019-11-29 15:02:06 +0000470 logger.Info("update-liveness-channel-because-change")
cbabu95f21522019-11-13 14:25:18 +0100471 sc.liveness <- alive
472 sc.lastLivenessTime = time.Now()
473 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
Esin Karamanccb714b2019-11-29 15:02:06 +0000474 logger.Info("update-liveness-channel-because-interval")
cbabu95f21522019-11-13 14:25:18 +0100475 sc.liveness <- alive
476 sc.lastLivenessTime = time.Now()
477 }
478 }
479
480 // Only emit a log message when the state changes
481 if sc.alive != alive {
Esin Karamanccb714b2019-11-29 15:02:06 +0000482 logger.Info("set-client-alive", log.Fields{"alive": alive})
cbabu95f21522019-11-13 14:25:18 +0100483 sc.alive = alive
484 }
485}
486
Scott Baker86fce9a2019-12-12 09:47:17 -0800487// Once unhealthy, we never go back
488func (sc *SaramaClient) setUnhealthy() {
489 sc.healthy = false
490 if sc.healthiness != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000491 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker86fce9a2019-12-12 09:47:17 -0800492 sc.healthiness <- sc.healthy
493 }
494}
495
Devmalya Pauldd23a992019-11-14 07:06:31 +0000496func (sc *SaramaClient) isLivenessError(err error) bool {
497 // Sarama producers and consumers encapsulate the error inside
498 // a ProducerError or ConsumerError struct.
499 if prodError, ok := err.(*sarama.ProducerError); ok {
500 err = prodError.Err
501 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
502 err = consumerError.Err
503 }
504
505 // Sarama-Cluster will compose the error into a ClusterError struct,
506 // which we can't do a compare by reference. To handle that, we the
507 // best we can do is compare the error strings.
508
509 switch err.Error() {
510 case context.DeadlineExceeded.Error():
Esin Karamanccb714b2019-11-29 15:02:06 +0000511 logger.Info("is-liveness-error-timeout")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000512 return true
513 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Esin Karamanccb714b2019-11-29 15:02:06 +0000514 logger.Info("is-liveness-error-no-brokers")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000515 return true
516 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Esin Karamanccb714b2019-11-29 15:02:06 +0000517 logger.Info("is-liveness-error-shutting-down")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000518 return true
519 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Esin Karamanccb714b2019-11-29 15:02:06 +0000520 logger.Info("is-liveness-error-not-available")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000521 return true
522 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Esin Karamanccb714b2019-11-29 15:02:06 +0000523 logger.Info("is-liveness-error-circuit-breaker-open")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000524 return true
525 }
526
527 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Esin Karamanccb714b2019-11-29 15:02:06 +0000528 logger.Info("is-liveness-error-connection-refused")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000529 return true
530 }
531
Scott Bakeree7c0a02020-01-07 11:12:26 -0800532 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Esin Karamanccb714b2019-11-29 15:02:06 +0000533 logger.Info("is-liveness-error-io-timeout")
Scott Bakeree7c0a02020-01-07 11:12:26 -0800534 return true
535 }
536
Devmalya Pauldd23a992019-11-14 07:06:31 +0000537 // Other errors shouldn't trigger a loss of liveness
538
Esin Karamanccb714b2019-11-29 15:02:06 +0000539 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000540
541 return false
542}
543
William Kurkianea869482019-04-09 15:16:11 -0400544// send formats and sends the request onto the kafka messaging bus.
545func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
546
547 // Assert message is a proto message
548 var protoMsg proto.Message
549 var ok bool
550 // ascertain the value interface type is a proto.Message
551 if protoMsg, ok = msg.(proto.Message); !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000552 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
William Kurkianea869482019-04-09 15:16:11 -0400553 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
554 }
555
556 var marshalled []byte
557 var err error
558 // Create the Sarama producer message
559 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000560 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400561 return err
562 }
563 key := ""
564 if len(keys) > 0 {
565 key = keys[0] // Only the first key is relevant
566 }
567 kafkaMsg := &sarama.ProducerMessage{
568 Topic: topic.Name,
569 Key: sarama.StringEncoder(key),
570 Value: sarama.ByteEncoder(marshalled),
571 }
572
573 // Send message to kafka
574 sc.producer.Input() <- kafkaMsg
William Kurkianea869482019-04-09 15:16:11 -0400575 // Wait for result
576 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
577 select {
578 case ok := <-sc.producer.Successes():
Esin Karamanccb714b2019-11-29 15:02:06 +0000579 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
cbabu95f21522019-11-13 14:25:18 +0100580 sc.updateLiveness(true)
William Kurkianea869482019-04-09 15:16:11 -0400581 case notOk := <-sc.producer.Errors():
Esin Karamanccb714b2019-11-29 15:02:06 +0000582 logger.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000583 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100584 sc.updateLiveness(false)
585 }
586 return notOk
587 }
588 return nil
589}
590
591// Enable the liveness monitor channel. This channel will report
592// a "true" or "false" on every publish, which indicates whether
593// or not the channel is still live. This channel is then picked up
594// by the service (i.e. rw_core / ro_core) to update readiness status
595// and/or take other actions.
596func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
Esin Karamanccb714b2019-11-29 15:02:06 +0000597 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
cbabu95f21522019-11-13 14:25:18 +0100598 if enable {
599 if sc.liveness == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000600 logger.Info("kafka-create-liveness-channel")
cbabu95f21522019-11-13 14:25:18 +0100601 // At least 1, so we can immediately post to it without blocking
602 // Setting a bigger number (10) allows the monitor to fall behind
603 // without blocking others. The monitor shouldn't really fall
604 // behind...
605 sc.liveness = make(chan bool, 10)
606 // post intial state to the channel
607 sc.liveness <- sc.alive
608 }
609 } else {
610 // TODO: Think about whether we need the ability to turn off
611 // liveness monitoring
612 panic("Turning off liveness reporting is not supported")
613 }
614 return sc.liveness
615}
616
Scott Baker86fce9a2019-12-12 09:47:17 -0800617// Enable the Healthiness monitor channel. This channel will report "false"
618// if the kafka consumers die, or some other problem occurs which is
619// catastrophic that would require re-creating the client.
620func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
Esin Karamanccb714b2019-11-29 15:02:06 +0000621 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker86fce9a2019-12-12 09:47:17 -0800622 if enable {
623 if sc.healthiness == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000624 logger.Info("kafka-create-healthiness-channel")
Scott Baker86fce9a2019-12-12 09:47:17 -0800625 // At least 1, so we can immediately post to it without blocking
626 // Setting a bigger number (10) allows the monitor to fall behind
627 // without blocking others. The monitor shouldn't really fall
628 // behind...
629 sc.healthiness = make(chan bool, 10)
630 // post intial state to the channel
631 sc.healthiness <- sc.healthy
632 }
633 } else {
634 // TODO: Think about whether we need the ability to turn off
635 // liveness monitoring
636 panic("Turning off healthiness reporting is not supported")
637 }
638 return sc.healthiness
639}
640
cbabu95f21522019-11-13 14:25:18 +0100641// send an empty message on the liveness channel to check whether connectivity has
642// been restored.
643func (sc *SaramaClient) SendLiveness() error {
644 if !sc.started {
645 return fmt.Errorf("SendLiveness() called while not started")
646 }
647
648 kafkaMsg := &sarama.ProducerMessage{
649 Topic: "_liveness_test",
650 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
651 }
652
653 // Send message to kafka
654 sc.producer.Input() <- kafkaMsg
655 // Wait for result
656 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
657 select {
658 case ok := <-sc.producer.Successes():
Esin Karamanccb714b2019-11-29 15:02:06 +0000659 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
cbabu95f21522019-11-13 14:25:18 +0100660 sc.updateLiveness(true)
661 case notOk := <-sc.producer.Errors():
Esin Karamanccb714b2019-11-29 15:02:06 +0000662 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000663 if sc.isLivenessError(notOk) {
cbabu95f21522019-11-13 14:25:18 +0100664 sc.updateLiveness(false)
665 }
William Kurkianea869482019-04-09 15:16:11 -0400666 return notOk
667 }
668 return nil
669}
670
671// getGroupId returns the group id from the key-value args.
672func getGroupId(kvArgs ...*KVArg) string {
673 for _, arg := range kvArgs {
674 if arg.Key == GroupIdKey {
675 return arg.Value.(string)
676 }
677 }
678 return ""
679}
680
681// getOffset returns the offset from the key-value args.
682func getOffset(kvArgs ...*KVArg) int64 {
683 for _, arg := range kvArgs {
684 if arg.Key == Offset {
685 return arg.Value.(int64)
686 }
687 }
688 return sarama.OffsetNewest
689}
690
691func (sc *SaramaClient) createClusterAdmin() error {
692 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
693 config := sarama.NewConfig()
694 config.Version = sarama.V1_0_0_0
695
696 // Create a cluster Admin
697 var cAdmin sarama.ClusterAdmin
698 var err error
699 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000700 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
William Kurkianea869482019-04-09 15:16:11 -0400701 return err
702 }
703 sc.cAdmin = cAdmin
704 return nil
705}
706
707func (sc *SaramaClient) lockTopic(topic *Topic) {
708 sc.lockOfTopicLockMap.Lock()
709 if _, exist := sc.topicLockMap[topic.Name]; exist {
710 sc.lockOfTopicLockMap.Unlock()
711 sc.topicLockMap[topic.Name].Lock()
712 } else {
713 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
714 sc.lockOfTopicLockMap.Unlock()
715 sc.topicLockMap[topic.Name].Lock()
716 }
717}
718
719func (sc *SaramaClient) unLockTopic(topic *Topic) {
720 sc.lockOfTopicLockMap.Lock()
721 defer sc.lockOfTopicLockMap.Unlock()
722 if _, exist := sc.topicLockMap[topic.Name]; exist {
723 sc.topicLockMap[topic.Name].Unlock()
724 }
725}
726
727func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
728 sc.lockTopicToConsumerChannelMap.Lock()
729 defer sc.lockTopicToConsumerChannelMap.Unlock()
730 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
731 sc.topicToConsumerChannelMap[id] = arg
732 }
733}
734
735func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
739 delete(sc.topicToConsumerChannelMap, id)
740 }
741}
742
743func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
744 sc.lockTopicToConsumerChannelMap.RLock()
745 defer sc.lockTopicToConsumerChannelMap.RUnlock()
746
747 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
748 return consumerCh
749 }
750 return nil
751}
752
753func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
754 sc.lockTopicToConsumerChannelMap.Lock()
755 defer sc.lockTopicToConsumerChannelMap.Unlock()
756 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
757 consumerCh.channels = append(consumerCh.channels, ch)
758 return
759 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000760 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400761}
762
763//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
764func closeConsumers(consumers []interface{}) error {
765 var err error
766 for _, consumer := range consumers {
767 // Is it a partition consumers?
768 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
769 if errTemp := partionConsumer.Close(); errTemp != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000770 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
William Kurkianea869482019-04-09 15:16:11 -0400771 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
772 // This can occur on race condition
773 err = nil
774 } else {
775 err = errTemp
776 }
777 }
778 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
779 if errTemp := groupConsumer.Close(); errTemp != nil {
780 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
781 // This can occur on race condition
782 err = nil
783 } else {
784 err = errTemp
785 }
786 }
787 }
788 }
789 return err
790}
791
792func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
793 sc.lockTopicToConsumerChannelMap.Lock()
794 defer sc.lockTopicToConsumerChannelMap.Unlock()
795 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
796 // Channel will be closed in the removeChannel method
797 consumerCh.channels = removeChannel(consumerCh.channels, ch)
798 // If there are no more channels then we can close the consumers itself
799 if len(consumerCh.channels) == 0 {
Esin Karamanccb714b2019-11-29 15:02:06 +0000800 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -0400801 err := closeConsumers(consumerCh.consumers)
802 //err := consumerCh.consumers.Close()
803 delete(sc.topicToConsumerChannelMap, topic.Name)
804 return err
805 }
806 return nil
807 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000808 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400809 return errors.New("topic-does-not-exist")
810}
811
812func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
813 sc.lockTopicToConsumerChannelMap.Lock()
814 defer sc.lockTopicToConsumerChannelMap.Unlock()
815 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
816 for _, ch := range consumerCh.channels {
817 // Channel will be closed in the removeChannel method
818 removeChannel(consumerCh.channels, ch)
819 }
820 err := closeConsumers(consumerCh.consumers)
821 //if err == sarama.ErrUnknownTopicOrPartition {
822 // // Not an error
823 // err = nil
824 //}
825 //err := consumerCh.consumers.Close()
826 delete(sc.topicToConsumerChannelMap, topic.Name)
827 return err
828 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000829 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400830 return nil
831}
832
833func (sc *SaramaClient) clearConsumerChannelMap() error {
834 sc.lockTopicToConsumerChannelMap.Lock()
835 defer sc.lockTopicToConsumerChannelMap.Unlock()
836 var err error
837 for topic, consumerCh := range sc.topicToConsumerChannelMap {
838 for _, ch := range consumerCh.channels {
839 // Channel will be closed in the removeChannel method
840 removeChannel(consumerCh.channels, ch)
841 }
842 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
843 err = errTemp
844 }
845 //err = consumerCh.consumers.Close()
846 delete(sc.topicToConsumerChannelMap, topic)
847 }
848 return err
849}
850
851//createPublisher creates the publisher which is used to send a message onto kafka
852func (sc *SaramaClient) createPublisher() error {
853 // This Creates the publisher
854 config := sarama.NewConfig()
855 config.Producer.Partitioner = sarama.NewRandomPartitioner
856 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
857 config.Producer.Flush.Messages = sc.producerFlushMessages
858 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
859 config.Producer.Return.Errors = sc.producerReturnErrors
860 config.Producer.Return.Successes = sc.producerReturnSuccess
861 //config.Producer.RequiredAcks = sarama.WaitForAll
862 config.Producer.RequiredAcks = sarama.WaitForLocal
863
864 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
865 brokers := []string{kafkaFullAddr}
866
867 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000868 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400869 return err
870 } else {
871 sc.producer = producer
872 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000873 logger.Info("Kafka-publisher-created")
William Kurkianea869482019-04-09 15:16:11 -0400874 return nil
875}
876
877func (sc *SaramaClient) createConsumer() error {
878 config := sarama.NewConfig()
879 config.Consumer.Return.Errors = true
880 config.Consumer.Fetch.Min = 1
881 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
882 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
883 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Mahir Gunyele77977b2019-06-27 05:36:22 -0700884 config.Metadata.Retry.Max = sc.metadataMaxRetry
William Kurkianea869482019-04-09 15:16:11 -0400885 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
886 brokers := []string{kafkaFullAddr}
887
888 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000889 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400890 return err
891 } else {
892 sc.consumer = consumer
893 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000894 logger.Info("Kafka-consumers-created")
William Kurkianea869482019-04-09 15:16:11 -0400895 return nil
896}
897
898// createGroupConsumer creates a consumers group
899func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
900 config := scc.NewConfig()
901 config.ClientID = uuid.New().String()
902 config.Group.Mode = scc.ConsumerModeMultiplex
cbabu95f21522019-11-13 14:25:18 +0100903 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
904 config.Consumer.Return.Errors = true
William Kurkianea869482019-04-09 15:16:11 -0400905 //config.Group.Return.Notifications = false
906 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
907 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
908 config.Consumer.Offsets.Initial = initialOffset
909 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
910 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
911 brokers := []string{kafkaFullAddr}
912
913 topics := []string{topic.Name}
914 var consumer *scc.Consumer
915 var err error
916
917 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000918 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400919 return nil, err
920 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000921 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400922
923 //sc.groupConsumers[topic.Name] = consumer
924 sc.addToGroupConsumers(topic.Name, consumer)
925 return consumer, nil
926}
927
928// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
929// topic via the unique channel each subscriber received during subscription
930func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
931 // Need to go over all channels and publish messages to them - do we need to copy msg?
932 sc.lockTopicToConsumerChannelMap.RLock()
933 defer sc.lockTopicToConsumerChannelMap.RUnlock()
934 for _, ch := range consumerCh.channels {
935 go func(c chan *ic.InterContainerMessage) {
936 c <- protoMessage
937 }(ch)
938 }
939}
940
941func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000942 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400943startloop:
944 for {
945 select {
946 case err, ok := <-consumer.Errors():
947 if ok {
cbabu116b73f2019-12-10 17:56:32 +0530948 if sc.isLivenessError(err) {
949 sc.updateLiveness(false)
Esin Karamanccb714b2019-11-29 15:02:06 +0000950 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
cbabu116b73f2019-12-10 17:56:32 +0530951 }
William Kurkianea869482019-04-09 15:16:11 -0400952 } else {
953 // Channel is closed
954 break startloop
955 }
956 case msg, ok := <-consumer.Messages():
Esin Karamanccb714b2019-11-29 15:02:06 +0000957 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400958 if !ok {
959 // channel is closed
960 break startloop
961 }
962 msgBody := msg.Value
cbabu116b73f2019-12-10 17:56:32 +0530963 sc.updateLiveness(true)
Esin Karamanccb714b2019-11-29 15:02:06 +0000964 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400965 icm := &ic.InterContainerMessage{}
966 if err := proto.Unmarshal(msgBody, icm); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000967 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400968 continue
969 }
970 go sc.dispatchToConsumers(consumerChnls, icm)
971 case <-sc.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +0000972 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400973 break startloop
974 }
975 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000976 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -0800977 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -0400978}
979
980func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
Esin Karamanccb714b2019-11-29 15:02:06 +0000981 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400982
983startloop:
984 for {
985 select {
986 case err, ok := <-consumer.Errors():
987 if ok {
Devmalya Pauldd23a992019-11-14 07:06:31 +0000988 if sc.isLivenessError(err) {
989 sc.updateLiveness(false)
990 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000991 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400992 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +0000993 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400994 // channel is closed
995 break startloop
996 }
997 case msg, ok := <-consumer.Messages():
998 if !ok {
Esin Karamanccb714b2019-11-29 15:02:06 +0000999 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001000 // Channel closed
1001 break startloop
1002 }
cbabu95f21522019-11-13 14:25:18 +01001003 sc.updateLiveness(true)
Esin Karamanccb714b2019-11-29 15:02:06 +00001004 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -04001005 msgBody := msg.Value
1006 icm := &ic.InterContainerMessage{}
1007 if err := proto.Unmarshal(msgBody, icm); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001008 logger.Warnw("invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001009 continue
1010 }
1011 go sc.dispatchToConsumers(consumerChnls, icm)
1012 consumer.MarkOffset(msg, "")
1013 case ntf := <-consumer.Notifications():
Esin Karamanccb714b2019-11-29 15:02:06 +00001014 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
William Kurkianea869482019-04-09 15:16:11 -04001015 case <-sc.doneCh:
Esin Karamanccb714b2019-11-29 15:02:06 +00001016 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001017 break startloop
1018 }
1019 }
Esin Karamanccb714b2019-11-29 15:02:06 +00001020 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker86fce9a2019-12-12 09:47:17 -08001021 sc.setUnhealthy()
William Kurkianea869482019-04-09 15:16:11 -04001022}
1023
1024func (sc *SaramaClient) startConsumers(topic *Topic) error {
Esin Karamanccb714b2019-11-29 15:02:06 +00001025 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001026 var consumerCh *consumerChannels
1027 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001028 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001029 return errors.New("consumers-not-exist")
1030 }
1031 // For each consumer listening for that topic, start a consumption loop
1032 for _, consumer := range consumerCh.consumers {
1033 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1034 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1035 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1036 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1037 } else {
Esin Karamanccb714b2019-11-29 15:02:06 +00001038 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -04001039 return errors.New("invalid-consumer")
1040 }
1041 }
1042 return nil
1043}
1044
1045//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1046//// for that topic. It also starts the routine that listens for messages on that topic.
1047func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1048 var pConsumers []sarama.PartitionConsumer
1049 var err error
1050
1051 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001052 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001053 return nil, err
1054 }
1055
1056 consumersIf := make([]interface{}, 0)
1057 for _, pConsumer := range pConsumers {
1058 consumersIf = append(consumersIf, pConsumer)
1059 }
1060
1061 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1062 // unbuffered to verify race conditions.
1063 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1064 cc := &consumerChannels{
1065 consumers: consumersIf,
1066 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1067 }
1068
1069 // Add the consumers channel to the map
1070 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1071
1072 //Start a consumers to listen on that specific topic
1073 go sc.startConsumers(topic)
1074
1075 return consumerListeningChannel, nil
1076}
1077
1078// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1079// for that topic. It also starts the routine that listens for messages on that topic.
1080func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1081 // TODO: Replace this development partition consumers with a group consumers
1082 var pConsumer *scc.Consumer
1083 var err error
1084 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001085 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001086 return nil, err
1087 }
1088 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1089 // unbuffered to verify race conditions.
1090 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1091 cc := &consumerChannels{
1092 consumers: []interface{}{pConsumer},
1093 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1094 }
1095
1096 // Add the consumers channel to the map
1097 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1098
1099 //Start a consumers to listen on that specific topic
1100 go sc.startConsumers(topic)
1101
1102 return consumerListeningChannel, nil
1103}
1104
1105func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
Esin Karamanccb714b2019-11-29 15:02:06 +00001106 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001107 partitionList, err := sc.consumer.Partitions(topic.Name)
1108 if err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001109 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001110 return nil, err
1111 }
1112
1113 pConsumers := make([]sarama.PartitionConsumer, 0)
1114 for _, partition := range partitionList {
1115 var pConsumer sarama.PartitionConsumer
1116 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001117 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001118 return nil, err
1119 }
1120 pConsumers = append(pConsumers, pConsumer)
1121 }
1122 return pConsumers, nil
1123}
1124
1125func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1126 var i int
1127 var channel chan *ic.InterContainerMessage
1128 for i, channel = range channels {
1129 if channel == ch {
1130 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1131 close(channel)
Esin Karamanccb714b2019-11-29 15:02:06 +00001132 logger.Debug("channel-closed")
William Kurkianea869482019-04-09 15:16:11 -04001133 return channels[:len(channels)-1]
1134 }
1135 }
1136 return channels
1137}
1138
William Kurkianea869482019-04-09 15:16:11 -04001139func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1140 sc.lockOfGroupConsumers.Lock()
1141 defer sc.lockOfGroupConsumers.Unlock()
1142 if _, exist := sc.groupConsumers[topic]; !exist {
1143 sc.groupConsumers[topic] = consumer
1144 }
1145}
1146
1147func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1148 sc.lockOfGroupConsumers.Lock()
1149 defer sc.lockOfGroupConsumers.Unlock()
1150 if _, exist := sc.groupConsumers[topic]; exist {
1151 consumer := sc.groupConsumers[topic]
1152 delete(sc.groupConsumers, topic)
Matt Jeanneret384d8c92019-05-06 14:27:31 -04001153 if err := consumer.Close(); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +00001154 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001155 return err
1156 }
1157 }
1158 return nil
1159}