blob: 185f6ec6f247d1718fe0dd4cbba7d3b3a996fa81 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "strings"
23 "sync"
24 "time"
25
26 "github.com/Shopify/sarama"
27 scc "github.com/bsm/sarama-cluster"
28 "github.com/eapache/go-resiliency/breaker"
29 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
31 "github.com/opencord/voltha-lib-go/v7/pkg/log"
32)
33
34// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
35// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
36//consumer or a group consumer
37type consumerChannels struct {
38 consumers []interface{}
39 channels []chan proto.Message
40}
41
42// static check to ensure SaramaClient implements Client
43var _ Client = &SaramaClient{}
44
45// SaramaClient represents the messaging proxy
46type SaramaClient struct {
47 cAdmin sarama.ClusterAdmin
48 KafkaAddress string
49 producer sarama.AsyncProducer
50 consumer sarama.Consumer
51 groupConsumers map[string]*scc.Consumer
52 lockOfGroupConsumers sync.RWMutex
53 consumerGroupPrefix string
54 consumerType int
55 consumerGroupName string
56 producerFlushFrequency int
57 producerFlushMessages int
58 producerFlushMaxmessages int
59 producerRetryMax int
60 producerRetryBackOff time.Duration
61 producerReturnSuccess bool
62 producerReturnErrors bool
63 consumerMaxwait int
64 maxProcessingTime int
65 numPartitions int
66 numReplicas int
67 autoCreateTopic bool
68 doneCh chan int
69 metadataCallback func(fromTopic string, timestamp time.Time)
70 topicToConsumerChannelMap map[string]*consumerChannels
71 lockTopicToConsumerChannelMap sync.RWMutex
72 topicLockMap map[string]*sync.RWMutex
73 lockOfTopicLockMap sync.RWMutex
74 metadataMaxRetry int
75 alive bool
76 livenessMutex sync.Mutex
77 liveness chan bool
78 livenessChannelInterval time.Duration
79 lastLivenessTime time.Time
80 started bool
81 healthinessMutex sync.Mutex
82 healthy bool
83 healthiness chan bool
84}
85
86type SaramaClientOption func(*SaramaClient)
87
88func Address(address string) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaAddress = address
91 }
92}
93
94func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
190func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.livenessChannelInterval = opt
193 }
194}
195
196func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
197 client := &SaramaClient{
198 KafkaAddress: DefaultKafkaAddress,
199 }
200 client.consumerType = DefaultConsumerType
201 client.producerFlushFrequency = DefaultProducerFlushFrequency
202 client.producerFlushMessages = DefaultProducerFlushMessages
203 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
204 client.producerReturnErrors = DefaultProducerReturnErrors
205 client.producerReturnSuccess = DefaultProducerReturnSuccess
206 client.producerRetryMax = DefaultProducerRetryMax
207 client.producerRetryBackOff = DefaultProducerRetryBackoff
208 client.consumerMaxwait = DefaultConsumerMaxwait
209 client.maxProcessingTime = DefaultMaxProcessingTime
210 client.numPartitions = DefaultNumberPartitions
211 client.numReplicas = DefaultNumberReplicas
212 client.autoCreateTopic = DefaultAutoCreateTopic
213 client.metadataMaxRetry = DefaultMetadataMaxRetry
214 client.livenessChannelInterval = DefaultLivenessChannelInterval
215
216 for _, option := range opts {
217 option(client)
218 }
219
220 client.groupConsumers = make(map[string]*scc.Consumer)
221
222 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
223 client.topicLockMap = make(map[string]*sync.RWMutex)
224 client.lockOfTopicLockMap = sync.RWMutex{}
225 client.lockOfGroupConsumers = sync.RWMutex{}
226
227 // healthy and alive until proven otherwise
228 client.alive = true
229 client.healthy = true
230
231 return client
232}
233
234func (sc *SaramaClient) Start(ctx context.Context) error {
235 logger.Info(ctx, "Starting-kafka-sarama-client")
236
237 // Create the Done channel
238 sc.doneCh = make(chan int, 1)
239
240 var err error
241
242 // Add a cleanup in case of failure to startup
243 defer func() {
244 if err != nil {
245 sc.Stop(ctx)
246 }
247 }()
248
249 // Create the Cluster Admin
250 if err = sc.createClusterAdmin(ctx); err != nil {
251 logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
252 return err
253 }
254
255 // Create the Publisher
256 if err := sc.createPublisher(ctx); err != nil {
257 logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
258 return err
259 }
260
261 if sc.consumerType == DefaultConsumerType {
262 // Create the master consumers
263 if err := sc.createConsumer(ctx); err != nil {
264 logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
265 return err
266 }
267 }
268
269 // Create the topic to consumers/channel map
270 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
271
272 logger.Info(ctx, "kafka-sarama-client-started")
273
274 sc.started = true
275
276 return nil
277}
278
279func (sc *SaramaClient) Stop(ctx context.Context) {
280 logger.Info(ctx, "stopping-sarama-client")
281
282 sc.started = false
283
284 //Send a message over the done channel to close all long running routines
285 sc.doneCh <- 1
286
287 if sc.producer != nil {
288 if err := sc.producer.Close(); err != nil {
289 logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
290 }
291 }
292
293 if sc.consumer != nil {
294 if err := sc.consumer.Close(); err != nil {
295 logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
296 }
297 }
298
299 for key, val := range sc.groupConsumers {
300 logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
301 if err := val.Close(); err != nil {
302 logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
303 }
304 }
305
306 if sc.cAdmin != nil {
307 if err := sc.cAdmin.Close(); err != nil {
308 logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
309 }
310 }
311
312 //TODO: Clear the consumers map
313 //sc.clearConsumerChannelMap()
314
315 logger.Info(ctx, "sarama-client-stopped")
316}
317
318//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
319// the invoking function must hold the lock
320func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
321 // Set the topic details
322 topicDetail := &sarama.TopicDetail{}
323 topicDetail.NumPartitions = int32(numPartition)
324 topicDetail.ReplicationFactor = int16(repFactor)
325 topicDetail.ConfigEntries = make(map[string]*string)
326 topicDetails := make(map[string]*sarama.TopicDetail)
327 topicDetails[topic.Name] = topicDetail
328
329 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
330 if err == sarama.ErrTopicAlreadyExists {
331 // Not an error
332 logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
333 return nil
334 }
335 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
336 return err
337 }
338 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
339 // do so.
340 logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
341 return nil
342}
343
344//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
345// ensure no two go routines are performing operations on the same topic
346func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
347 sc.lockTopic(topic)
348 defer sc.unLockTopic(topic)
349
350 return sc.createTopic(ctx, topic, numPartition, repFactor)
351}
352
353//DeleteTopic removes a topic from the kafka Broker
354func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
355 sc.lockTopic(topic)
356 defer sc.unLockTopic(topic)
357
358 // Remove the topic from the broker
359 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
360 if err == sarama.ErrUnknownTopicOrPartition {
361 // Not an error as does not exist
362 logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
363 return nil
364 }
365 logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
366 return err
367 }
368
369 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
370 if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
371 logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
372 return err
373 }
374 return nil
375}
376
377// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
378// messages from that topic
379func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
380 sc.lockTopic(topic)
381 defer sc.unLockTopic(topic)
382
383 logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
384
385 // If a consumers already exist for that topic then resuse it
386 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
387 logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
388 // Create a channel specific for that consumers and add it to the consumers channel map
389 ch := make(chan proto.Message)
390 sc.addChannelToConsumerChannelMap(ctx, topic, ch)
391 return ch, nil
392 }
393
394 // Register for the topic and set it up
395 var consumerListeningChannel chan proto.Message
396 var err error
397
398 // Use the consumerType option to figure out the type of consumer to launch
399 if sc.consumerType == PartitionConsumer {
400 if sc.autoCreateTopic {
401 if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
402 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
403 return nil, err
404 }
405 }
406 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
407 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
408 return nil, err
409 }
410 } else if sc.consumerType == GroupCustomer {
411 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
412 // does not consume from a precreated topic in some scenarios
413 //if sc.autoCreateTopic {
414 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
415 // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
416 // return nil, err
417 // }
418 //}
419 //groupId := sc.consumerGroupName
420 groupId := getGroupId(kvArgs...)
421 // Include the group prefix
422 if groupId != "" {
423 groupId = sc.consumerGroupPrefix + groupId
424 } else {
425 // Need to use a unique group Id per topic
426 groupId = sc.consumerGroupPrefix + topic.Name
427 }
428 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
429 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
430 return nil, err
431 }
432
433 } else {
434 logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
435 return nil, errors.New("unknown-consumer-type")
436 }
437
438 return consumerListeningChannel, nil
439}
440
441//UnSubscribe unsubscribe a consumer from a given topic
442func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
443 sc.lockTopic(topic)
444 defer sc.unLockTopic(topic)
445
446 logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
447 var err error
448 if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
449 logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
450 }
451 if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
452 logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
453 }
454 return err
455}
456
457func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
458 sc.metadataCallback = callback
459}
460
461func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
462 // Post a consistent stream of liveness data to the channel,
463 // so that in a live state, the core does not timeout and
464 // send a forced liveness message. Production of liveness
465 // events to the channel is rate-limited by livenessChannelInterval.
466 sc.livenessMutex.Lock()
467 defer sc.livenessMutex.Unlock()
468 if sc.liveness != nil {
469 if sc.alive != alive {
470 logger.Info(ctx, "update-liveness-channel-because-change")
471 sc.liveness <- alive
472 sc.lastLivenessTime = time.Now()
473 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
474 logger.Info(ctx, "update-liveness-channel-because-interval")
475 sc.liveness <- alive
476 sc.lastLivenessTime = time.Now()
477 }
478 }
479
480 // Only emit a log message when the state changes
481 if sc.alive != alive {
482 logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
483 sc.alive = alive
484 }
485}
486
487// Once unhealthy, we never go back
488func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
489 sc.healthy = false
490 sc.healthinessMutex.Lock()
491 defer sc.healthinessMutex.Unlock()
492 if sc.healthiness != nil {
493 logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
494 sc.healthiness <- sc.healthy
495 }
496}
497
498func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
499 // Sarama producers and consumers encapsulate the error inside
500 // a ProducerError or ConsumerError struct.
501 if prodError, ok := err.(*sarama.ProducerError); ok {
502 err = prodError.Err
503 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
504 err = consumerError.Err
505 }
506
507 // Sarama-Cluster will compose the error into a ClusterError struct,
508 // which we can't do a compare by reference. To handle that, we the
509 // best we can do is compare the error strings.
510
511 switch err.Error() {
512 case context.DeadlineExceeded.Error():
513 logger.Info(ctx, "is-liveness-error-timeout")
514 return true
515 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
516 logger.Info(ctx, "is-liveness-error-no-brokers")
517 return true
518 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
519 logger.Info(ctx, "is-liveness-error-shutting-down")
520 return true
521 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
522 logger.Info(ctx, "is-liveness-error-not-available")
523 return true
524 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
525 logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
526 return true
527 }
528
529 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
530 logger.Info(ctx, "is-liveness-error-connection-refused")
531 return true
532 }
533
534 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
535 logger.Info(ctx, "is-liveness-error-io-timeout")
536 return true
537 }
538
539 // Other errors shouldn't trigger a loss of liveness
540
541 logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
542
543 return false
544}
545
546// send formats and sends the request onto the kafka messaging bus.
547func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
548
549 // Assert message is a proto message
550 var protoMsg proto.Message
551 var ok bool
552 // ascertain the value interface type is a proto.Message
553 if protoMsg, ok = msg.(proto.Message); !ok {
554 logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
555 return fmt.Errorf("not-a-proto-msg-%s", msg)
556 }
557
558 var marshalled []byte
559 var err error
560 // Create the Sarama producer message
561 if marshalled, err = proto.Marshal(protoMsg); err != nil {
562 logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
563 return err
564 }
565 key := ""
566 if len(keys) > 0 {
567 key = keys[0] // Only the first key is relevant
568 }
569 kafkaMsg := &sarama.ProducerMessage{
570 Topic: topic.Name,
571 Key: sarama.StringEncoder(key),
572 Value: sarama.ByteEncoder(marshalled),
573 }
574
575 // Send message to kafka
576 sc.producer.Input() <- kafkaMsg
577 // Wait for result
578 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
579 select {
580 case ok := <-sc.producer.Successes():
581 logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
582 sc.updateLiveness(ctx, true)
583 case notOk := <-sc.producer.Errors():
584 logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
585 if sc.isLivenessError(ctx, notOk) {
586 sc.updateLiveness(ctx, false)
587 }
588 return notOk
589 }
590 return nil
591}
592
593// Enable the liveness monitor channel. This channel will report
594// a "true" or "false" on every publish, which indicates whether
595// or not the channel is still live. This channel is then picked up
596// by the service (i.e. rw_core / ro_core) to update readiness status
597// and/or take other actions.
598func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
599 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
600 if enable {
601 sc.livenessMutex.Lock()
602 defer sc.livenessMutex.Unlock()
603 if sc.liveness == nil {
604 logger.Info(ctx, "kafka-create-liveness-channel")
605 // At least 1, so we can immediately post to it without blocking
606 // Setting a bigger number (10) allows the monitor to fall behind
607 // without blocking others. The monitor shouldn't really fall
608 // behind...
609 sc.liveness = make(chan bool, 10)
610 // post initial state to the channel
611 sc.liveness <- sc.alive
612 }
613 } else {
614 // TODO: Think about whether we need the ability to turn off
615 // liveness monitoring
616 panic("Turning off liveness reporting is not supported")
617 }
618 return sc.liveness
619}
620
621// Enable the Healthiness monitor channel. This channel will report "false"
622// if the kafka consumers die, or some other problem occurs which is
623// catastrophic that would require re-creating the client.
624func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
625 logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
626 if enable {
627 sc.healthinessMutex.Lock()
628 defer sc.healthinessMutex.Unlock()
629 if sc.healthiness == nil {
630 logger.Info(ctx, "kafka-create-healthiness-channel")
631 // At least 1, so we can immediately post to it without blocking
632 // Setting a bigger number (10) allows the monitor to fall behind
633 // without blocking others. The monitor shouldn't really fall
634 // behind...
635 sc.healthiness = make(chan bool, 10)
636 // post initial state to the channel
637 sc.healthiness <- sc.healthy
638 }
639 } else {
640 // TODO: Think about whether we need the ability to turn off
641 // liveness monitoring
642 panic("Turning off healthiness reporting is not supported")
643 }
644 return sc.healthiness
645}
646
647// send an empty message on the liveness channel to check whether connectivity has
648// been restored.
649func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
650 if !sc.started {
651 return fmt.Errorf("SendLiveness() called while not started")
652 }
653
654 kafkaMsg := &sarama.ProducerMessage{
655 Topic: "_liveness_test",
656 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
657 }
658
659 // Send message to kafka
660 sc.producer.Input() <- kafkaMsg
661 // Wait for result
662 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
663 select {
664 case ok := <-sc.producer.Successes():
665 logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
666 sc.updateLiveness(ctx, true)
667 case notOk := <-sc.producer.Errors():
668 logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
669 if sc.isLivenessError(ctx, notOk) {
670 sc.updateLiveness(ctx, false)
671 }
672 return notOk
673 }
674 return nil
675}
676
677// getGroupId returns the group id from the key-value args.
678func getGroupId(kvArgs ...*KVArg) string {
679 for _, arg := range kvArgs {
680 if arg.Key == GroupIdKey {
681 return arg.Value.(string)
682 }
683 }
684 return ""
685}
686
687// getOffset returns the offset from the key-value args.
688func getOffset(kvArgs ...*KVArg) int64 {
689 for _, arg := range kvArgs {
690 if arg.Key == Offset {
691 return arg.Value.(int64)
692 }
693 }
694 return sarama.OffsetNewest
695}
696
697func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
698 config := sarama.NewConfig()
699 config.Version = sarama.V1_0_0_0
700
701 // Create a cluster Admin
702 var cAdmin sarama.ClusterAdmin
703 var err error
704 if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
705 logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
706 return err
707 }
708 sc.cAdmin = cAdmin
709 return nil
710}
711
712func (sc *SaramaClient) lockTopic(topic *Topic) {
713 sc.lockOfTopicLockMap.Lock()
714 if _, exist := sc.topicLockMap[topic.Name]; exist {
715 sc.lockOfTopicLockMap.Unlock()
716 sc.topicLockMap[topic.Name].Lock()
717 } else {
718 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
719 sc.lockOfTopicLockMap.Unlock()
720 sc.topicLockMap[topic.Name].Lock()
721 }
722}
723
724func (sc *SaramaClient) unLockTopic(topic *Topic) {
725 sc.lockOfTopicLockMap.Lock()
726 defer sc.lockOfTopicLockMap.Unlock()
727 if _, exist := sc.topicLockMap[topic.Name]; exist {
728 sc.topicLockMap[topic.Name].Unlock()
729 }
730}
731
732func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
733 sc.lockTopicToConsumerChannelMap.Lock()
734 defer sc.lockTopicToConsumerChannelMap.Unlock()
735 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
736 sc.topicToConsumerChannelMap[id] = arg
737 }
738}
739
740func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
741 sc.lockTopicToConsumerChannelMap.RLock()
742 defer sc.lockTopicToConsumerChannelMap.RUnlock()
743
744 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
745 return consumerCh
746 }
747 return nil
748}
749
750func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
751 sc.lockTopicToConsumerChannelMap.Lock()
752 defer sc.lockTopicToConsumerChannelMap.Unlock()
753 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
754 consumerCh.channels = append(consumerCh.channels, ch)
755 return
756 }
757 logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
758}
759
760//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
761func closeConsumers(ctx context.Context, consumers []interface{}) error {
762 var err error
763 for _, consumer := range consumers {
764 // Is it a partition consumers?
765 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
766 if errTemp := partionConsumer.Close(); errTemp != nil {
767 logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
768 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
769 // This can occur on race condition
770 err = nil
771 } else {
772 err = errTemp
773 }
774 }
775 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
776 if errTemp := groupConsumer.Close(); errTemp != nil {
777 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
778 // This can occur on race condition
779 err = nil
780 } else {
781 err = errTemp
782 }
783 }
784 }
785 }
786 return err
787}
788
789func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
790 sc.lockTopicToConsumerChannelMap.Lock()
791 defer sc.lockTopicToConsumerChannelMap.Unlock()
792 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
793 // Channel will be closed in the removeChannel method
794 consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
795 // If there are no more channels then we can close the consumers itself
796 if len(consumerCh.channels) == 0 {
797 logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
798 err := closeConsumers(ctx, consumerCh.consumers)
799 //err := consumerCh.consumers.Close()
800 delete(sc.topicToConsumerChannelMap, topic.Name)
801 return err
802 }
803 return nil
804 }
805 logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
806 return errors.New("topic-does-not-exist")
807}
808
809func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
810 sc.lockTopicToConsumerChannelMap.Lock()
811 defer sc.lockTopicToConsumerChannelMap.Unlock()
812 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
813 for _, ch := range consumerCh.channels {
814 // Channel will be closed in the removeChannel method
815 removeChannel(ctx, consumerCh.channels, ch)
816 }
817 err := closeConsumers(ctx, consumerCh.consumers)
818 //if err == sarama.ErrUnknownTopicOrPartition {
819 // // Not an error
820 // err = nil
821 //}
822 //err := consumerCh.consumers.Close()
823 delete(sc.topicToConsumerChannelMap, topic.Name)
824 return err
825 }
826 logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
827 return nil
828}
829
830//createPublisher creates the publisher which is used to send a message onto kafka
831func (sc *SaramaClient) createPublisher(ctx context.Context) error {
832 // This Creates the publisher
833 config := sarama.NewConfig()
834 config.Version = sarama.V1_0_0_0
835 config.Producer.Partitioner = sarama.NewRandomPartitioner
836 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
837 config.Producer.Flush.Messages = sc.producerFlushMessages
838 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
839 config.Producer.Return.Errors = sc.producerReturnErrors
840 config.Producer.Return.Successes = sc.producerReturnSuccess
841 //config.Producer.RequiredAcks = sarama.WaitForAll
842 config.Producer.RequiredAcks = sarama.WaitForLocal
843
844 brokers := []string{sc.KafkaAddress}
845
846 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
847 logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
848 return err
849 } else {
850 sc.producer = producer
851 }
852 logger.Info(ctx, "Kafka-publisher-created")
853 return nil
854}
855
856func (sc *SaramaClient) createConsumer(ctx context.Context) error {
857 config := sarama.NewConfig()
858 config.Version = sarama.V1_0_0_0
859 config.Consumer.Return.Errors = true
860 config.Consumer.Fetch.Min = 1
861 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
862 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
863 config.Consumer.Offsets.Initial = sarama.OffsetNewest
864 config.Metadata.Retry.Max = sc.metadataMaxRetry
865 brokers := []string{sc.KafkaAddress}
866
867 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
868 logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
869 return err
870 } else {
871 sc.consumer = consumer
872 }
873 logger.Info(ctx, "Kafka-consumers-created")
874 return nil
875}
876
877// createGroupConsumer creates a consumers group
878func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
879 config := scc.NewConfig()
880 config.Version = sarama.V1_0_0_0
881 config.ClientID = uuid.New().String()
882 config.Group.Mode = scc.ConsumerModeMultiplex
883 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
884 config.Consumer.Return.Errors = true
885 //config.Group.Return.Notifications = false
886 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
887 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
888 config.Consumer.Offsets.Initial = initialOffset
889 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
890 brokers := []string{sc.KafkaAddress}
891
892 topics := []string{topic.Name}
893 var consumer *scc.Consumer
894 var err error
895
896 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
897 logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
898 return nil, err
899 }
900 logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
901
902 //sc.groupConsumers[topic.Name] = consumer
903 sc.addToGroupConsumers(topic.Name, consumer)
904 return consumer, nil
905}
906
907// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
908// topic via the unique channel each subscriber received during subscription
909func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
910 // Need to go over all channels and publish messages to them - do we need to copy msg?
911 sc.lockTopicToConsumerChannelMap.RLock()
912 for _, ch := range consumerCh.channels {
913 go func(c chan proto.Message) {
914 c <- protoMessage
915 }(ch)
916 }
917 sc.lockTopicToConsumerChannelMap.RUnlock()
918
919 if callback := sc.metadataCallback; callback != nil {
920 callback(fromTopic, ts)
921 }
922}
923
924func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
925 logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
926startloop:
927 for {
928 select {
929 case err, ok := <-consumer.Errors():
930 if ok {
931 if sc.isLivenessError(ctx, err) {
932 sc.updateLiveness(ctx, false)
933 logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
934 }
935 } else {
936 // Channel is closed
937 break startloop
938 }
939 case msg, ok := <-consumer.Messages():
940 //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
941 if !ok {
942 // channel is closed
943 break startloop
944 }
945 msgBody := msg.Value
946 sc.updateLiveness(ctx, true)
947 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
948 var protoMsg proto.Message
949 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
950 logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
951 continue
952 }
953 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
954 case <-sc.doneCh:
955 logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
956 break startloop
957 }
958 }
959 logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
960 sc.setUnhealthy(ctx)
961}
962
963func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
964 logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
965
966startloop:
967 for {
968 select {
969 case err, ok := <-consumer.Errors():
970 if ok {
971 if sc.isLivenessError(ctx, err) {
972 sc.updateLiveness(ctx, false)
973 }
974 logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
975 } else {
976 logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
977 // channel is closed
978 break startloop
979 }
980 case msg, ok := <-consumer.Messages():
981 if !ok {
982 logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
983 // Channel closed
984 break startloop
985 }
986 sc.updateLiveness(ctx, true)
987 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
988 msgBody := msg.Value
989 var protoMsg proto.Message
990 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
991 logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
992 continue
993 }
994 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
995 consumer.MarkOffset(msg, "")
996 case ntf := <-consumer.Notifications():
997 logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
998 case <-sc.doneCh:
999 logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
1000 break startloop
1001 }
1002 }
1003 logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
1004 sc.setUnhealthy(ctx)
1005}
1006
1007func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
1008 logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
1009 var consumerCh *consumerChannels
1010 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
1011 logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
1012 return errors.New("consumers-not-exist")
1013 }
1014 // For each consumer listening for that topic, start a consumption loop
1015 for _, consumer := range consumerCh.consumers {
1016 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1017 go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
1018 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1019 go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
1020 } else {
1021 logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
1022 return errors.New("invalid-consumer")
1023 }
1024 }
1025 return nil
1026}
1027
1028//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1029//// for that topic. It also starts the routine that listens for messages on that topic.
1030func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
1031 var pConsumers []sarama.PartitionConsumer
1032 var err error
1033
1034 if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
1035 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1036 return nil, err
1037 }
1038
1039 consumersIf := make([]interface{}, 0)
1040 for _, pConsumer := range pConsumers {
1041 consumersIf = append(consumersIf, pConsumer)
1042 }
1043
1044 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1045 // unbuffered to verify race conditions.
1046 consumerListeningChannel := make(chan proto.Message)
1047 cc := &consumerChannels{
1048 consumers: consumersIf,
1049 channels: []chan proto.Message{consumerListeningChannel},
1050 }
1051
1052 // Add the consumers channel to the map
1053 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1054
1055 //Start a consumers to listen on that specific topic
1056 go func() {
1057 if err := sc.startConsumers(ctx, topic); err != nil {
1058 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
1059 "topic": topic,
1060 "error": err})
1061 }
1062 }()
1063
1064 return consumerListeningChannel, nil
1065}
1066
1067// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1068// for that topic. It also starts the routine that listens for messages on that topic.
1069func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
1070 // TODO: Replace this development partition consumers with a group consumers
1071 var pConsumer *scc.Consumer
1072 var err error
1073 if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1074 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1075 return nil, err
1076 }
1077 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1078 // unbuffered to verify race conditions.
1079 consumerListeningChannel := make(chan proto.Message)
1080 cc := &consumerChannels{
1081 consumers: []interface{}{pConsumer},
1082 channels: []chan proto.Message{consumerListeningChannel},
1083 }
1084
1085 // Add the consumers channel to the map
1086 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1087
1088 //Start a consumers to listen on that specific topic
1089 go func() {
1090 if err := sc.startConsumers(ctx, topic); err != nil {
1091 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
1092 "topic": topic,
1093 "error": err})
1094 }
1095 }()
1096
1097 return consumerListeningChannel, nil
1098}
1099
1100func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1101 logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
1102 partitionList, err := sc.consumer.Partitions(topic.Name)
1103 if err != nil {
1104 logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1105 return nil, err
1106 }
1107
1108 pConsumers := make([]sarama.PartitionConsumer, 0)
1109 for _, partition := range partitionList {
1110 var pConsumer sarama.PartitionConsumer
1111 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1112 logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1113 return nil, err
1114 }
1115 pConsumers = append(pConsumers, pConsumer)
1116 }
1117 return pConsumers, nil
1118}
1119
1120func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
1121 var i int
1122 var channel chan proto.Message
1123 for i, channel = range channels {
1124 if channel == ch {
1125 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1126 close(channel)
1127 logger.Debug(ctx, "channel-closed")
1128 return channels[:len(channels)-1]
1129 }
1130 }
1131 return channels
1132}
1133
1134func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1135 sc.lockOfGroupConsumers.Lock()
1136 defer sc.lockOfGroupConsumers.Unlock()
1137 if _, exist := sc.groupConsumers[topic]; !exist {
1138 sc.groupConsumers[topic] = consumer
1139 }
1140}
1141
1142func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
1143 sc.lockOfGroupConsumers.Lock()
1144 defer sc.lockOfGroupConsumers.Unlock()
1145 if _, exist := sc.groupConsumers[topic]; exist {
1146 consumer := sc.groupConsumers[topic]
1147 delete(sc.groupConsumers, topic)
1148 if err := consumer.Close(); err != nil {
1149 logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
1150 return err
1151 }
1152 }
1153 return nil
1154}