blob: 9d4ab5209848b523ab625f0e1b574b202462e6df [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080019 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070020 "errors"
21 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022 "strings"
23 "sync"
24 "time"
25
Scott Baker2c1c4822019-10-16 11:02:41 -070026 "github.com/Shopify/sarama"
27 scc "github.com/bsm/sarama-cluster"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080028 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070029 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
serkant.uluderyab38671c2019-11-01 09:35:38 -070031 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070033)
34
Scott Baker2c1c4822019-10-16 11:02:41 -070035type returnErrorFunction func() error
36
37// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
38// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
39//consumer or a group consumer
40type consumerChannels struct {
41 consumers []interface{}
42 channels []chan *ic.InterContainerMessage
43}
44
45// SaramaClient represents the messaging proxy
46type SaramaClient struct {
47 cAdmin sarama.ClusterAdmin
48 client sarama.Client
49 KafkaHost string
50 KafkaPort int
51 producer sarama.AsyncProducer
52 consumer sarama.Consumer
53 groupConsumers map[string]*scc.Consumer
54 lockOfGroupConsumers sync.RWMutex
55 consumerGroupPrefix string
56 consumerType int
57 consumerGroupName string
58 producerFlushFrequency int
59 producerFlushMessages int
60 producerFlushMaxmessages int
61 producerRetryMax int
62 producerRetryBackOff time.Duration
63 producerReturnSuccess bool
64 producerReturnErrors bool
65 consumerMaxwait int
66 maxProcessingTime int
67 numPartitions int
68 numReplicas int
69 autoCreateTopic bool
70 doneCh chan int
71 topicToConsumerChannelMap map[string]*consumerChannels
72 lockTopicToConsumerChannelMap sync.RWMutex
73 topicLockMap map[string]*sync.RWMutex
74 lockOfTopicLockMap sync.RWMutex
75 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070076 alive bool
77 liveness chan bool
78 livenessChannelInterval time.Duration
79 lastLivenessTime time.Time
80 started bool
Scott Baker0fef6982019-12-12 09:49:42 -080081 healthy bool
82 healthiness chan bool
Scott Baker2c1c4822019-10-16 11:02:41 -070083}
84
85type SaramaClientOption func(*SaramaClient)
86
87func Host(host string) SaramaClientOption {
88 return func(args *SaramaClient) {
89 args.KafkaHost = host
90 }
91}
92
93func Port(port int) SaramaClientOption {
94 return func(args *SaramaClient) {
95 args.KafkaPort = port
96 }
97}
98
99func ConsumerGroupPrefix(prefix string) SaramaClientOption {
100 return func(args *SaramaClient) {
101 args.consumerGroupPrefix = prefix
102 }
103}
104
105func ConsumerGroupName(name string) SaramaClientOption {
106 return func(args *SaramaClient) {
107 args.consumerGroupName = name
108 }
109}
110
111func ConsumerType(consumer int) SaramaClientOption {
112 return func(args *SaramaClient) {
113 args.consumerType = consumer
114 }
115}
116
117func ProducerFlushFrequency(frequency int) SaramaClientOption {
118 return func(args *SaramaClient) {
119 args.producerFlushFrequency = frequency
120 }
121}
122
123func ProducerFlushMessages(num int) SaramaClientOption {
124 return func(args *SaramaClient) {
125 args.producerFlushMessages = num
126 }
127}
128
129func ProducerFlushMaxMessages(num int) SaramaClientOption {
130 return func(args *SaramaClient) {
131 args.producerFlushMaxmessages = num
132 }
133}
134
135func ProducerMaxRetries(num int) SaramaClientOption {
136 return func(args *SaramaClient) {
137 args.producerRetryMax = num
138 }
139}
140
141func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
142 return func(args *SaramaClient) {
143 args.producerRetryBackOff = duration
144 }
145}
146
147func ProducerReturnOnErrors(opt bool) SaramaClientOption {
148 return func(args *SaramaClient) {
149 args.producerReturnErrors = opt
150 }
151}
152
153func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
154 return func(args *SaramaClient) {
155 args.producerReturnSuccess = opt
156 }
157}
158
159func ConsumerMaxWait(wait int) SaramaClientOption {
160 return func(args *SaramaClient) {
161 args.consumerMaxwait = wait
162 }
163}
164
165func MaxProcessingTime(pTime int) SaramaClientOption {
166 return func(args *SaramaClient) {
167 args.maxProcessingTime = pTime
168 }
169}
170
171func NumPartitions(number int) SaramaClientOption {
172 return func(args *SaramaClient) {
173 args.numPartitions = number
174 }
175}
176
177func NumReplicas(number int) SaramaClientOption {
178 return func(args *SaramaClient) {
179 args.numReplicas = number
180 }
181}
182
183func AutoCreateTopic(opt bool) SaramaClientOption {
184 return func(args *SaramaClient) {
185 args.autoCreateTopic = opt
186 }
187}
188
189func MetadatMaxRetries(retry int) SaramaClientOption {
190 return func(args *SaramaClient) {
191 args.metadataMaxRetry = retry
192 }
193}
194
Scott Baker104b67d2019-10-29 15:56:27 -0700195func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
196 return func(args *SaramaClient) {
197 args.livenessChannelInterval = opt
198 }
199}
200
Scott Baker2c1c4822019-10-16 11:02:41 -0700201func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
202 client := &SaramaClient{
203 KafkaHost: DefaultKafkaHost,
204 KafkaPort: DefaultKafkaPort,
205 }
206 client.consumerType = DefaultConsumerType
207 client.producerFlushFrequency = DefaultProducerFlushFrequency
208 client.producerFlushMessages = DefaultProducerFlushMessages
209 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
210 client.producerReturnErrors = DefaultProducerReturnErrors
211 client.producerReturnSuccess = DefaultProducerReturnSuccess
212 client.producerRetryMax = DefaultProducerRetryMax
213 client.producerRetryBackOff = DefaultProducerRetryBackoff
214 client.consumerMaxwait = DefaultConsumerMaxwait
215 client.maxProcessingTime = DefaultMaxProcessingTime
216 client.numPartitions = DefaultNumberPartitions
217 client.numReplicas = DefaultNumberReplicas
218 client.autoCreateTopic = DefaultAutoCreateTopic
219 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700220 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700221
222 for _, option := range opts {
223 option(client)
224 }
225
226 client.groupConsumers = make(map[string]*scc.Consumer)
227
228 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
229 client.topicLockMap = make(map[string]*sync.RWMutex)
230 client.lockOfTopicLockMap = sync.RWMutex{}
231 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700232
Scott Baker0fef6982019-12-12 09:49:42 -0800233 // healthy and alive until proven otherwise
Scott Baker104b67d2019-10-29 15:56:27 -0700234 client.alive = true
Scott Baker0fef6982019-12-12 09:49:42 -0800235 client.healthy = true
Scott Baker104b67d2019-10-29 15:56:27 -0700236
Scott Baker2c1c4822019-10-16 11:02:41 -0700237 return client
238}
239
240func (sc *SaramaClient) Start() error {
khenaidoob332f9b2020-01-16 16:25:26 -0500241 logger.Info("Starting-kafka-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700242
243 // Create the Done channel
244 sc.doneCh = make(chan int, 1)
245
246 var err error
247
248 // Add a cleanup in case of failure to startup
249 defer func() {
250 if err != nil {
251 sc.Stop()
252 }
253 }()
254
255 // Create the Cluster Admin
256 if err = sc.createClusterAdmin(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500257 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700258 return err
259 }
260
261 // Create the Publisher
262 if err := sc.createPublisher(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500263 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700264 return err
265 }
266
267 if sc.consumerType == DefaultConsumerType {
268 // Create the master consumers
269 if err := sc.createConsumer(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500270 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700271 return err
272 }
273 }
274
275 // Create the topic to consumers/channel map
276 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
277
khenaidoob332f9b2020-01-16 16:25:26 -0500278 logger.Info("kafka-sarama-client-started")
Scott Baker2c1c4822019-10-16 11:02:41 -0700279
Scott Baker104b67d2019-10-29 15:56:27 -0700280 sc.started = true
281
Scott Baker2c1c4822019-10-16 11:02:41 -0700282 return nil
283}
284
285func (sc *SaramaClient) Stop() {
khenaidoob332f9b2020-01-16 16:25:26 -0500286 logger.Info("stopping-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700287
Scott Baker104b67d2019-10-29 15:56:27 -0700288 sc.started = false
289
Scott Baker2c1c4822019-10-16 11:02:41 -0700290 //Send a message over the done channel to close all long running routines
291 sc.doneCh <- 1
292
293 if sc.producer != nil {
294 if err := sc.producer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500295 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700296 }
297 }
298
299 if sc.consumer != nil {
300 if err := sc.consumer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500301 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700302 }
303 }
304
305 for key, val := range sc.groupConsumers {
khenaidoob332f9b2020-01-16 16:25:26 -0500306 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700307 if err := val.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500308 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700309 }
310 }
311
312 if sc.cAdmin != nil {
313 if err := sc.cAdmin.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500314 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700315 }
316 }
317
318 //TODO: Clear the consumers map
319 //sc.clearConsumerChannelMap()
320
khenaidoob332f9b2020-01-16 16:25:26 -0500321 logger.Info("sarama-client-stopped")
Scott Baker2c1c4822019-10-16 11:02:41 -0700322}
323
324//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
325// the invoking function must hold the lock
326func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
327 // Set the topic details
328 topicDetail := &sarama.TopicDetail{}
329 topicDetail.NumPartitions = int32(numPartition)
330 topicDetail.ReplicationFactor = int16(repFactor)
331 topicDetail.ConfigEntries = make(map[string]*string)
332 topicDetails := make(map[string]*sarama.TopicDetail)
333 topicDetails[topic.Name] = topicDetail
334
335 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
336 if err == sarama.ErrTopicAlreadyExists {
337 // Not an error
khenaidoob332f9b2020-01-16 16:25:26 -0500338 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700339 return nil
340 }
khenaidoob332f9b2020-01-16 16:25:26 -0500341 logger.Errorw("create-topic-failure", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700342 return err
343 }
344 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
345 // do so.
khenaidoob332f9b2020-01-16 16:25:26 -0500346 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
Scott Baker2c1c4822019-10-16 11:02:41 -0700347 return nil
348}
349
350//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
351// ensure no two go routines are performing operations on the same topic
352func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
353 sc.lockTopic(topic)
354 defer sc.unLockTopic(topic)
355
356 return sc.createTopic(topic, numPartition, repFactor)
357}
358
359//DeleteTopic removes a topic from the kafka Broker
360func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
361 sc.lockTopic(topic)
362 defer sc.unLockTopic(topic)
363
364 // Remove the topic from the broker
365 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
366 if err == sarama.ErrUnknownTopicOrPartition {
367 // Not an error as does not exist
khenaidoob332f9b2020-01-16 16:25:26 -0500368 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700369 return nil
370 }
khenaidoob332f9b2020-01-16 16:25:26 -0500371 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700372 return err
373 }
374
375 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
376 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500377 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700378 return err
379 }
380 return nil
381}
382
383// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
384// messages from that topic
385func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
386 sc.lockTopic(topic)
387 defer sc.unLockTopic(topic)
388
khenaidoob332f9b2020-01-16 16:25:26 -0500389 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700390
391 // If a consumers already exist for that topic then resuse it
392 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500393 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700394 // Create a channel specific for that consumers and add it to the consumers channel map
395 ch := make(chan *ic.InterContainerMessage)
396 sc.addChannelToConsumerChannelMap(topic, ch)
397 return ch, nil
398 }
399
400 // Register for the topic and set it up
401 var consumerListeningChannel chan *ic.InterContainerMessage
402 var err error
403
404 // Use the consumerType option to figure out the type of consumer to launch
405 if sc.consumerType == PartitionConsumer {
406 if sc.autoCreateTopic {
407 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500408 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700409 return nil, err
410 }
411 }
412 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500413 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700414 return nil, err
415 }
416 } else if sc.consumerType == GroupCustomer {
417 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
418 // does not consume from a precreated topic in some scenarios
419 //if sc.autoCreateTopic {
420 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500421 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700422 // return nil, err
423 // }
424 //}
425 //groupId := sc.consumerGroupName
426 groupId := getGroupId(kvArgs...)
427 // Include the group prefix
428 if groupId != "" {
429 groupId = sc.consumerGroupPrefix + groupId
430 } else {
431 // Need to use a unique group Id per topic
432 groupId = sc.consumerGroupPrefix + topic.Name
433 }
434 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500435 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700436 return nil, err
437 }
438
439 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500440 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700441 return nil, errors.New("unknown-consumer-type")
442 }
443
444 return consumerListeningChannel, nil
445}
446
447//UnSubscribe unsubscribe a consumer from a given topic
448func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
449 sc.lockTopic(topic)
450 defer sc.unLockTopic(topic)
451
khenaidoob332f9b2020-01-16 16:25:26 -0500452 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700453 var err error
454 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500455 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700456 }
457 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500458 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700459 }
460 return err
461}
462
Scott Baker104b67d2019-10-29 15:56:27 -0700463func (sc *SaramaClient) updateLiveness(alive bool) {
464 // Post a consistent stream of liveness data to the channel,
465 // so that in a live state, the core does not timeout and
466 // send a forced liveness message. Production of liveness
467 // events to the channel is rate-limited by livenessChannelInterval.
468 if sc.liveness != nil {
469 if sc.alive != alive {
khenaidoob332f9b2020-01-16 16:25:26 -0500470 logger.Info("update-liveness-channel-because-change")
Scott Baker104b67d2019-10-29 15:56:27 -0700471 sc.liveness <- alive
472 sc.lastLivenessTime = time.Now()
473 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
khenaidoob332f9b2020-01-16 16:25:26 -0500474 logger.Info("update-liveness-channel-because-interval")
Scott Baker104b67d2019-10-29 15:56:27 -0700475 sc.liveness <- alive
476 sc.lastLivenessTime = time.Now()
477 }
478 }
479
480 // Only emit a log message when the state changes
481 if sc.alive != alive {
khenaidoob332f9b2020-01-16 16:25:26 -0500482 logger.Info("set-client-alive", log.Fields{"alive": alive})
Scott Baker104b67d2019-10-29 15:56:27 -0700483 sc.alive = alive
484 }
485}
486
Scott Baker0fef6982019-12-12 09:49:42 -0800487// Once unhealthy, we never go back
488func (sc *SaramaClient) setUnhealthy() {
489 sc.healthy = false
490 if sc.healthiness != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500491 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker0fef6982019-12-12 09:49:42 -0800492 sc.healthiness <- sc.healthy
493 }
494}
495
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800496func (sc *SaramaClient) isLivenessError(err error) bool {
497 // Sarama producers and consumers encapsulate the error inside
498 // a ProducerError or ConsumerError struct.
499 if prodError, ok := err.(*sarama.ProducerError); ok {
500 err = prodError.Err
501 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
502 err = consumerError.Err
503 }
504
505 // Sarama-Cluster will compose the error into a ClusterError struct,
506 // which we can't do a compare by reference. To handle that, we the
507 // best we can do is compare the error strings.
508
509 switch err.Error() {
510 case context.DeadlineExceeded.Error():
khenaidoob332f9b2020-01-16 16:25:26 -0500511 logger.Info("is-liveness-error-timeout")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800512 return true
513 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
khenaidoob332f9b2020-01-16 16:25:26 -0500514 logger.Info("is-liveness-error-no-brokers")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800515 return true
516 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
khenaidoob332f9b2020-01-16 16:25:26 -0500517 logger.Info("is-liveness-error-shutting-down")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800518 return true
519 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
khenaidoob332f9b2020-01-16 16:25:26 -0500520 logger.Info("is-liveness-error-not-available")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800521 return true
522 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
khenaidoob332f9b2020-01-16 16:25:26 -0500523 logger.Info("is-liveness-error-circuit-breaker-open")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800524 return true
525 }
526
527 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
khenaidoob332f9b2020-01-16 16:25:26 -0500528 logger.Info("is-liveness-error-connection-refused")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800529 return true
530 }
531
Scott Baker718bee02020-01-07 09:52:02 -0800532 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
khenaidoob332f9b2020-01-16 16:25:26 -0500533 logger.Info("is-liveness-error-io-timeout")
Scott Baker718bee02020-01-07 09:52:02 -0800534 return true
535 }
536
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800537 // Other errors shouldn't trigger a loss of liveness
538
khenaidoob332f9b2020-01-16 16:25:26 -0500539 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800540
541 return false
542}
543
Scott Baker2c1c4822019-10-16 11:02:41 -0700544// send formats and sends the request onto the kafka messaging bus.
545func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
546
547 // Assert message is a proto message
548 var protoMsg proto.Message
549 var ok bool
550 // ascertain the value interface type is a proto.Message
551 if protoMsg, ok = msg.(proto.Message); !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500552 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
Scott Baker2c1c4822019-10-16 11:02:41 -0700553 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
554 }
555
556 var marshalled []byte
557 var err error
558 // Create the Sarama producer message
559 if marshalled, err = proto.Marshal(protoMsg); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500560 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700561 return err
562 }
563 key := ""
564 if len(keys) > 0 {
565 key = keys[0] // Only the first key is relevant
566 }
567 kafkaMsg := &sarama.ProducerMessage{
568 Topic: topic.Name,
569 Key: sarama.StringEncoder(key),
570 Value: sarama.ByteEncoder(marshalled),
571 }
572
573 // Send message to kafka
574 sc.producer.Input() <- kafkaMsg
575 // Wait for result
576 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
577 select {
578 case ok := <-sc.producer.Successes():
khenaidoob332f9b2020-01-16 16:25:26 -0500579 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700580 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700581 case notOk := <-sc.producer.Errors():
khenaidoob332f9b2020-01-16 16:25:26 -0500582 logger.Debugw("error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800583 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700584 sc.updateLiveness(false)
585 }
586 return notOk
587 }
588 return nil
589}
590
591// Enable the liveness monitor channel. This channel will report
592// a "true" or "false" on every publish, which indicates whether
593// or not the channel is still live. This channel is then picked up
594// by the service (i.e. rw_core / ro_core) to update readiness status
595// and/or take other actions.
596func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
khenaidoob332f9b2020-01-16 16:25:26 -0500597 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Baker104b67d2019-10-29 15:56:27 -0700598 if enable {
599 if sc.liveness == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500600 logger.Info("kafka-create-liveness-channel")
Scott Baker104b67d2019-10-29 15:56:27 -0700601 // At least 1, so we can immediately post to it without blocking
602 // Setting a bigger number (10) allows the monitor to fall behind
603 // without blocking others. The monitor shouldn't really fall
604 // behind...
605 sc.liveness = make(chan bool, 10)
606 // post intial state to the channel
607 sc.liveness <- sc.alive
608 }
609 } else {
610 // TODO: Think about whether we need the ability to turn off
611 // liveness monitoring
612 panic("Turning off liveness reporting is not supported")
613 }
614 return sc.liveness
615}
616
Scott Baker0fef6982019-12-12 09:49:42 -0800617// Enable the Healthiness monitor channel. This channel will report "false"
618// if the kafka consumers die, or some other problem occurs which is
619// catastrophic that would require re-creating the client.
620func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
khenaidoob332f9b2020-01-16 16:25:26 -0500621 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker0fef6982019-12-12 09:49:42 -0800622 if enable {
623 if sc.healthiness == nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500624 logger.Info("kafka-create-healthiness-channel")
Scott Baker0fef6982019-12-12 09:49:42 -0800625 // At least 1, so we can immediately post to it without blocking
626 // Setting a bigger number (10) allows the monitor to fall behind
627 // without blocking others. The monitor shouldn't really fall
628 // behind...
629 sc.healthiness = make(chan bool, 10)
630 // post intial state to the channel
631 sc.healthiness <- sc.healthy
632 }
633 } else {
634 // TODO: Think about whether we need the ability to turn off
635 // liveness monitoring
636 panic("Turning off healthiness reporting is not supported")
637 }
638 return sc.healthiness
639}
640
Scott Baker104b67d2019-10-29 15:56:27 -0700641// send an empty message on the liveness channel to check whether connectivity has
642// been restored.
643func (sc *SaramaClient) SendLiveness() error {
644 if !sc.started {
645 return fmt.Errorf("SendLiveness() called while not started")
646 }
647
648 kafkaMsg := &sarama.ProducerMessage{
649 Topic: "_liveness_test",
650 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
651 }
652
653 // Send message to kafka
654 sc.producer.Input() <- kafkaMsg
655 // Wait for result
656 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
657 select {
658 case ok := <-sc.producer.Successes():
khenaidoob332f9b2020-01-16 16:25:26 -0500659 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700660 sc.updateLiveness(true)
661 case notOk := <-sc.producer.Errors():
khenaidoob332f9b2020-01-16 16:25:26 -0500662 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800663 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700664 sc.updateLiveness(false)
665 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700666 return notOk
667 }
668 return nil
669}
670
671// getGroupId returns the group id from the key-value args.
672func getGroupId(kvArgs ...*KVArg) string {
673 for _, arg := range kvArgs {
674 if arg.Key == GroupIdKey {
675 return arg.Value.(string)
676 }
677 }
678 return ""
679}
680
681// getOffset returns the offset from the key-value args.
682func getOffset(kvArgs ...*KVArg) int64 {
683 for _, arg := range kvArgs {
684 if arg.Key == Offset {
685 return arg.Value.(int64)
686 }
687 }
688 return sarama.OffsetNewest
689}
690
691func (sc *SaramaClient) createClusterAdmin() error {
692 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
693 config := sarama.NewConfig()
694 config.Version = sarama.V1_0_0_0
695
696 // Create a cluster Admin
697 var cAdmin sarama.ClusterAdmin
698 var err error
699 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500700 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
Scott Baker2c1c4822019-10-16 11:02:41 -0700701 return err
702 }
703 sc.cAdmin = cAdmin
704 return nil
705}
706
707func (sc *SaramaClient) lockTopic(topic *Topic) {
708 sc.lockOfTopicLockMap.Lock()
709 if _, exist := sc.topicLockMap[topic.Name]; exist {
710 sc.lockOfTopicLockMap.Unlock()
711 sc.topicLockMap[topic.Name].Lock()
712 } else {
713 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
714 sc.lockOfTopicLockMap.Unlock()
715 sc.topicLockMap[topic.Name].Lock()
716 }
717}
718
719func (sc *SaramaClient) unLockTopic(topic *Topic) {
720 sc.lockOfTopicLockMap.Lock()
721 defer sc.lockOfTopicLockMap.Unlock()
722 if _, exist := sc.topicLockMap[topic.Name]; exist {
723 sc.topicLockMap[topic.Name].Unlock()
724 }
725}
726
727func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
728 sc.lockTopicToConsumerChannelMap.Lock()
729 defer sc.lockTopicToConsumerChannelMap.Unlock()
730 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
731 sc.topicToConsumerChannelMap[id] = arg
732 }
733}
734
735func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
739 delete(sc.topicToConsumerChannelMap, id)
740 }
741}
742
743func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
744 sc.lockTopicToConsumerChannelMap.RLock()
745 defer sc.lockTopicToConsumerChannelMap.RUnlock()
746
747 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
748 return consumerCh
749 }
750 return nil
751}
752
753func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
754 sc.lockTopicToConsumerChannelMap.Lock()
755 defer sc.lockTopicToConsumerChannelMap.Unlock()
756 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
757 consumerCh.channels = append(consumerCh.channels, ch)
758 return
759 }
khenaidoob332f9b2020-01-16 16:25:26 -0500760 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700761}
762
763//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
764func closeConsumers(consumers []interface{}) error {
765 var err error
766 for _, consumer := range consumers {
767 // Is it a partition consumers?
768 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
769 if errTemp := partionConsumer.Close(); errTemp != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500770 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
Scott Baker2c1c4822019-10-16 11:02:41 -0700771 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
772 // This can occur on race condition
773 err = nil
774 } else {
775 err = errTemp
776 }
777 }
778 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
779 if errTemp := groupConsumer.Close(); errTemp != nil {
780 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
781 // This can occur on race condition
782 err = nil
783 } else {
784 err = errTemp
785 }
786 }
787 }
788 }
789 return err
790}
791
792func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
793 sc.lockTopicToConsumerChannelMap.Lock()
794 defer sc.lockTopicToConsumerChannelMap.Unlock()
795 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
796 // Channel will be closed in the removeChannel method
797 consumerCh.channels = removeChannel(consumerCh.channels, ch)
798 // If there are no more channels then we can close the consumers itself
799 if len(consumerCh.channels) == 0 {
khenaidoob332f9b2020-01-16 16:25:26 -0500800 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700801 err := closeConsumers(consumerCh.consumers)
802 //err := consumerCh.consumers.Close()
803 delete(sc.topicToConsumerChannelMap, topic.Name)
804 return err
805 }
806 return nil
807 }
khenaidoob332f9b2020-01-16 16:25:26 -0500808 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700809 return errors.New("topic-does-not-exist")
810}
811
812func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
813 sc.lockTopicToConsumerChannelMap.Lock()
814 defer sc.lockTopicToConsumerChannelMap.Unlock()
815 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
816 for _, ch := range consumerCh.channels {
817 // Channel will be closed in the removeChannel method
818 removeChannel(consumerCh.channels, ch)
819 }
820 err := closeConsumers(consumerCh.consumers)
821 //if err == sarama.ErrUnknownTopicOrPartition {
822 // // Not an error
823 // err = nil
824 //}
825 //err := consumerCh.consumers.Close()
826 delete(sc.topicToConsumerChannelMap, topic.Name)
827 return err
828 }
khenaidoob332f9b2020-01-16 16:25:26 -0500829 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700830 return nil
831}
832
833func (sc *SaramaClient) clearConsumerChannelMap() error {
834 sc.lockTopicToConsumerChannelMap.Lock()
835 defer sc.lockTopicToConsumerChannelMap.Unlock()
836 var err error
837 for topic, consumerCh := range sc.topicToConsumerChannelMap {
838 for _, ch := range consumerCh.channels {
839 // Channel will be closed in the removeChannel method
840 removeChannel(consumerCh.channels, ch)
841 }
842 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
843 err = errTemp
844 }
845 //err = consumerCh.consumers.Close()
846 delete(sc.topicToConsumerChannelMap, topic)
847 }
848 return err
849}
850
851//createPublisher creates the publisher which is used to send a message onto kafka
852func (sc *SaramaClient) createPublisher() error {
853 // This Creates the publisher
854 config := sarama.NewConfig()
855 config.Producer.Partitioner = sarama.NewRandomPartitioner
856 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
857 config.Producer.Flush.Messages = sc.producerFlushMessages
858 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
859 config.Producer.Return.Errors = sc.producerReturnErrors
860 config.Producer.Return.Successes = sc.producerReturnSuccess
861 //config.Producer.RequiredAcks = sarama.WaitForAll
862 config.Producer.RequiredAcks = sarama.WaitForLocal
863
864 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
865 brokers := []string{kafkaFullAddr}
866
867 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500868 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700869 return err
870 } else {
871 sc.producer = producer
872 }
khenaidoob332f9b2020-01-16 16:25:26 -0500873 logger.Info("Kafka-publisher-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700874 return nil
875}
876
877func (sc *SaramaClient) createConsumer() error {
878 config := sarama.NewConfig()
879 config.Consumer.Return.Errors = true
880 config.Consumer.Fetch.Min = 1
881 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
882 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
883 config.Consumer.Offsets.Initial = sarama.OffsetNewest
884 config.Metadata.Retry.Max = sc.metadataMaxRetry
885 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
886 brokers := []string{kafkaFullAddr}
887
888 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500889 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700890 return err
891 } else {
892 sc.consumer = consumer
893 }
khenaidoob332f9b2020-01-16 16:25:26 -0500894 logger.Info("Kafka-consumers-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700895 return nil
896}
897
898// createGroupConsumer creates a consumers group
899func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
900 config := scc.NewConfig()
901 config.ClientID = uuid.New().String()
902 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700903 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
904 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700905 //config.Group.Return.Notifications = false
906 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
907 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
908 config.Consumer.Offsets.Initial = initialOffset
909 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
910 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
911 brokers := []string{kafkaFullAddr}
912
913 topics := []string{topic.Name}
914 var consumer *scc.Consumer
915 var err error
916
917 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500918 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700919 return nil, err
920 }
khenaidoob332f9b2020-01-16 16:25:26 -0500921 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700922
923 //sc.groupConsumers[topic.Name] = consumer
924 sc.addToGroupConsumers(topic.Name, consumer)
925 return consumer, nil
926}
927
928// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
929// topic via the unique channel each subscriber received during subscription
930func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
931 // Need to go over all channels and publish messages to them - do we need to copy msg?
932 sc.lockTopicToConsumerChannelMap.RLock()
933 defer sc.lockTopicToConsumerChannelMap.RUnlock()
934 for _, ch := range consumerCh.channels {
935 go func(c chan *ic.InterContainerMessage) {
936 c <- protoMessage
937 }(ch)
938 }
939}
940
941func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
khenaidoob332f9b2020-01-16 16:25:26 -0500942 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700943startloop:
944 for {
945 select {
946 case err, ok := <-consumer.Errors():
947 if ok {
cbabud4978652019-12-04 08:04:21 +0100948 if sc.isLivenessError(err) {
949 sc.updateLiveness(false)
khenaidoob332f9b2020-01-16 16:25:26 -0500950 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
cbabud4978652019-12-04 08:04:21 +0100951 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700952 } else {
953 // Channel is closed
954 break startloop
955 }
956 case msg, ok := <-consumer.Messages():
khenaidoob332f9b2020-01-16 16:25:26 -0500957 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700958 if !ok {
959 // channel is closed
960 break startloop
961 }
962 msgBody := msg.Value
cbabud4978652019-12-04 08:04:21 +0100963 sc.updateLiveness(true)
khenaidoob332f9b2020-01-16 16:25:26 -0500964 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700965 icm := &ic.InterContainerMessage{}
966 if err := proto.Unmarshal(msgBody, icm); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -0500967 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700968 continue
969 }
970 go sc.dispatchToConsumers(consumerChnls, icm)
971 case <-sc.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -0500972 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700973 break startloop
974 }
975 }
khenaidoob332f9b2020-01-16 16:25:26 -0500976 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -0800977 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -0700978}
979
980func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
khenaidoob332f9b2020-01-16 16:25:26 -0500981 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700982
983startloop:
984 for {
985 select {
986 case err, ok := <-consumer.Errors():
987 if ok {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800988 if sc.isLivenessError(err) {
989 sc.updateLiveness(false)
990 }
khenaidoob332f9b2020-01-16 16:25:26 -0500991 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700992 } else {
khenaidoob332f9b2020-01-16 16:25:26 -0500993 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700994 // channel is closed
995 break startloop
996 }
997 case msg, ok := <-consumer.Messages():
998 if !ok {
khenaidoob332f9b2020-01-16 16:25:26 -0500999 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001000 // Channel closed
1001 break startloop
1002 }
Scott Baker104b67d2019-10-29 15:56:27 -07001003 sc.updateLiveness(true)
khenaidoob332f9b2020-01-16 16:25:26 -05001004 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001005 msgBody := msg.Value
1006 icm := &ic.InterContainerMessage{}
1007 if err := proto.Unmarshal(msgBody, icm); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001008 logger.Warnw("invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001009 continue
1010 }
1011 go sc.dispatchToConsumers(consumerChnls, icm)
1012 consumer.MarkOffset(msg, "")
1013 case ntf := <-consumer.Notifications():
khenaidoob332f9b2020-01-16 16:25:26 -05001014 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
Scott Baker2c1c4822019-10-16 11:02:41 -07001015 case <-sc.doneCh:
khenaidoob332f9b2020-01-16 16:25:26 -05001016 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001017 break startloop
1018 }
1019 }
khenaidoob332f9b2020-01-16 16:25:26 -05001020 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
Scott Baker0fef6982019-12-12 09:49:42 -08001021 sc.setUnhealthy()
Scott Baker2c1c4822019-10-16 11:02:41 -07001022}
1023
1024func (sc *SaramaClient) startConsumers(topic *Topic) error {
khenaidoob332f9b2020-01-16 16:25:26 -05001025 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001026 var consumerCh *consumerChannels
1027 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001028 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001029 return errors.New("consumers-not-exist")
1030 }
1031 // For each consumer listening for that topic, start a consumption loop
1032 for _, consumer := range consumerCh.consumers {
1033 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1034 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1035 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1036 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1037 } else {
khenaidoob332f9b2020-01-16 16:25:26 -05001038 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001039 return errors.New("invalid-consumer")
1040 }
1041 }
1042 return nil
1043}
1044
1045//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1046//// for that topic. It also starts the routine that listens for messages on that topic.
1047func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1048 var pConsumers []sarama.PartitionConsumer
1049 var err error
1050
1051 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001052 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001053 return nil, err
1054 }
1055
1056 consumersIf := make([]interface{}, 0)
1057 for _, pConsumer := range pConsumers {
1058 consumersIf = append(consumersIf, pConsumer)
1059 }
1060
1061 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1062 // unbuffered to verify race conditions.
1063 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1064 cc := &consumerChannels{
1065 consumers: consumersIf,
1066 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1067 }
1068
1069 // Add the consumers channel to the map
1070 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1071
1072 //Start a consumers to listen on that specific topic
1073 go sc.startConsumers(topic)
1074
1075 return consumerListeningChannel, nil
1076}
1077
1078// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1079// for that topic. It also starts the routine that listens for messages on that topic.
1080func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1081 // TODO: Replace this development partition consumers with a group consumers
1082 var pConsumer *scc.Consumer
1083 var err error
1084 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001085 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001086 return nil, err
1087 }
1088 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1089 // unbuffered to verify race conditions.
1090 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1091 cc := &consumerChannels{
1092 consumers: []interface{}{pConsumer},
1093 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1094 }
1095
1096 // Add the consumers channel to the map
1097 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1098
1099 //Start a consumers to listen on that specific topic
1100 go sc.startConsumers(topic)
1101
1102 return consumerListeningChannel, nil
1103}
1104
1105func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoob332f9b2020-01-16 16:25:26 -05001106 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001107 partitionList, err := sc.consumer.Partitions(topic.Name)
1108 if err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001109 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001110 return nil, err
1111 }
1112
1113 pConsumers := make([]sarama.PartitionConsumer, 0)
1114 for _, partition := range partitionList {
1115 var pConsumer sarama.PartitionConsumer
1116 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001117 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001118 return nil, err
1119 }
1120 pConsumers = append(pConsumers, pConsumer)
1121 }
1122 return pConsumers, nil
1123}
1124
1125func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1126 var i int
1127 var channel chan *ic.InterContainerMessage
1128 for i, channel = range channels {
1129 if channel == ch {
1130 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1131 close(channel)
khenaidoob332f9b2020-01-16 16:25:26 -05001132 logger.Debug("channel-closed")
Scott Baker2c1c4822019-10-16 11:02:41 -07001133 return channels[:len(channels)-1]
1134 }
1135 }
1136 return channels
1137}
1138
1139func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1140 sc.lockOfGroupConsumers.Lock()
1141 defer sc.lockOfGroupConsumers.Unlock()
1142 if _, exist := sc.groupConsumers[topic]; !exist {
1143 sc.groupConsumers[topic] = consumer
1144 }
1145}
1146
1147func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1148 sc.lockOfGroupConsumers.Lock()
1149 defer sc.lockOfGroupConsumers.Unlock()
1150 if _, exist := sc.groupConsumers[topic]; exist {
1151 consumer := sc.groupConsumers[topic]
1152 delete(sc.groupConsumers, topic)
1153 if err := consumer.Close(); err != nil {
khenaidoob332f9b2020-01-16 16:25:26 -05001154 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001155 return err
1156 }
1157 }
1158 return nil
1159}