blob: c0c16f94bb7764216c38311008dd33ba38118871 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "strings"
23 "sync"
24 "time"
25
26 "github.com/Shopify/sarama"
27 scc "github.com/bsm/sarama-cluster"
28 "github.com/eapache/go-resiliency/breaker"
29 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
33)
34
35type returnErrorFunction func() error
36
37// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
38// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
39//consumer or a group consumer
40type consumerChannels struct {
41 consumers []interface{}
42 channels []chan *ic.InterContainerMessage
43}
44
45// static check to ensure SaramaClient implements Client
46var _ Client = &SaramaClient{}
47
48// SaramaClient represents the messaging proxy
49type SaramaClient struct {
50 cAdmin sarama.ClusterAdmin
51 client sarama.Client
52 KafkaHost string
53 KafkaPort int
54 producer sarama.AsyncProducer
55 consumer sarama.Consumer
56 groupConsumers map[string]*scc.Consumer
57 lockOfGroupConsumers sync.RWMutex
58 consumerGroupPrefix string
59 consumerType int
60 consumerGroupName string
61 producerFlushFrequency int
62 producerFlushMessages int
63 producerFlushMaxmessages int
64 producerRetryMax int
65 producerRetryBackOff time.Duration
66 producerReturnSuccess bool
67 producerReturnErrors bool
68 consumerMaxwait int
69 maxProcessingTime int
70 numPartitions int
71 numReplicas int
72 autoCreateTopic bool
73 doneCh chan int
74 metadataCallback func(fromTopic string, timestamp int64)
75 topicToConsumerChannelMap map[string]*consumerChannels
76 lockTopicToConsumerChannelMap sync.RWMutex
77 topicLockMap map[string]*sync.RWMutex
78 lockOfTopicLockMap sync.RWMutex
79 metadataMaxRetry int
80 alive bool
81 liveness chan bool
82 livenessChannelInterval time.Duration
83 lastLivenessTime time.Time
84 started bool
85 healthy bool
86 healthiness chan bool
87}
88
89type SaramaClientOption func(*SaramaClient)
90
91func Host(host string) SaramaClientOption {
92 return func(args *SaramaClient) {
93 args.KafkaHost = host
94 }
95}
96
97func Port(port int) SaramaClientOption {
98 return func(args *SaramaClient) {
99 args.KafkaPort = port
100 }
101}
102
103func ConsumerGroupPrefix(prefix string) SaramaClientOption {
104 return func(args *SaramaClient) {
105 args.consumerGroupPrefix = prefix
106 }
107}
108
109func ConsumerGroupName(name string) SaramaClientOption {
110 return func(args *SaramaClient) {
111 args.consumerGroupName = name
112 }
113}
114
115func ConsumerType(consumer int) SaramaClientOption {
116 return func(args *SaramaClient) {
117 args.consumerType = consumer
118 }
119}
120
121func ProducerFlushFrequency(frequency int) SaramaClientOption {
122 return func(args *SaramaClient) {
123 args.producerFlushFrequency = frequency
124 }
125}
126
127func ProducerFlushMessages(num int) SaramaClientOption {
128 return func(args *SaramaClient) {
129 args.producerFlushMessages = num
130 }
131}
132
133func ProducerFlushMaxMessages(num int) SaramaClientOption {
134 return func(args *SaramaClient) {
135 args.producerFlushMaxmessages = num
136 }
137}
138
139func ProducerMaxRetries(num int) SaramaClientOption {
140 return func(args *SaramaClient) {
141 args.producerRetryMax = num
142 }
143}
144
145func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
146 return func(args *SaramaClient) {
147 args.producerRetryBackOff = duration
148 }
149}
150
151func ProducerReturnOnErrors(opt bool) SaramaClientOption {
152 return func(args *SaramaClient) {
153 args.producerReturnErrors = opt
154 }
155}
156
157func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
158 return func(args *SaramaClient) {
159 args.producerReturnSuccess = opt
160 }
161}
162
163func ConsumerMaxWait(wait int) SaramaClientOption {
164 return func(args *SaramaClient) {
165 args.consumerMaxwait = wait
166 }
167}
168
169func MaxProcessingTime(pTime int) SaramaClientOption {
170 return func(args *SaramaClient) {
171 args.maxProcessingTime = pTime
172 }
173}
174
175func NumPartitions(number int) SaramaClientOption {
176 return func(args *SaramaClient) {
177 args.numPartitions = number
178 }
179}
180
181func NumReplicas(number int) SaramaClientOption {
182 return func(args *SaramaClient) {
183 args.numReplicas = number
184 }
185}
186
187func AutoCreateTopic(opt bool) SaramaClientOption {
188 return func(args *SaramaClient) {
189 args.autoCreateTopic = opt
190 }
191}
192
193func MetadatMaxRetries(retry int) SaramaClientOption {
194 return func(args *SaramaClient) {
195 args.metadataMaxRetry = retry
196 }
197}
198
199func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
200 return func(args *SaramaClient) {
201 args.livenessChannelInterval = opt
202 }
203}
204
205func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
206 client := &SaramaClient{
207 KafkaHost: DefaultKafkaHost,
208 KafkaPort: DefaultKafkaPort,
209 }
210 client.consumerType = DefaultConsumerType
211 client.producerFlushFrequency = DefaultProducerFlushFrequency
212 client.producerFlushMessages = DefaultProducerFlushMessages
213 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
214 client.producerReturnErrors = DefaultProducerReturnErrors
215 client.producerReturnSuccess = DefaultProducerReturnSuccess
216 client.producerRetryMax = DefaultProducerRetryMax
217 client.producerRetryBackOff = DefaultProducerRetryBackoff
218 client.consumerMaxwait = DefaultConsumerMaxwait
219 client.maxProcessingTime = DefaultMaxProcessingTime
220 client.numPartitions = DefaultNumberPartitions
221 client.numReplicas = DefaultNumberReplicas
222 client.autoCreateTopic = DefaultAutoCreateTopic
223 client.metadataMaxRetry = DefaultMetadataMaxRetry
224 client.livenessChannelInterval = DefaultLivenessChannelInterval
225
226 for _, option := range opts {
227 option(client)
228 }
229
230 client.groupConsumers = make(map[string]*scc.Consumer)
231
232 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
233 client.topicLockMap = make(map[string]*sync.RWMutex)
234 client.lockOfTopicLockMap = sync.RWMutex{}
235 client.lockOfGroupConsumers = sync.RWMutex{}
236
237 // healthy and alive until proven otherwise
238 client.alive = true
239 client.healthy = true
240
241 return client
242}
243
244func (sc *SaramaClient) Start() error {
245 logger.Info("Starting-kafka-sarama-client")
246
247 // Create the Done channel
248 sc.doneCh = make(chan int, 1)
249
250 var err error
251
252 // Add a cleanup in case of failure to startup
253 defer func() {
254 if err != nil {
255 sc.Stop()
256 }
257 }()
258
259 // Create the Cluster Admin
260 if err = sc.createClusterAdmin(); err != nil {
261 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
262 return err
263 }
264
265 // Create the Publisher
266 if err := sc.createPublisher(); err != nil {
267 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
268 return err
269 }
270
271 if sc.consumerType == DefaultConsumerType {
272 // Create the master consumers
273 if err := sc.createConsumer(); err != nil {
274 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
275 return err
276 }
277 }
278
279 // Create the topic to consumers/channel map
280 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
281
282 logger.Info("kafka-sarama-client-started")
283
284 sc.started = true
285
286 return nil
287}
288
289func (sc *SaramaClient) Stop() {
290 logger.Info("stopping-sarama-client")
291
292 sc.started = false
293
294 //Send a message over the done channel to close all long running routines
295 sc.doneCh <- 1
296
297 if sc.producer != nil {
298 if err := sc.producer.Close(); err != nil {
299 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
300 }
301 }
302
303 if sc.consumer != nil {
304 if err := sc.consumer.Close(); err != nil {
305 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
306 }
307 }
308
309 for key, val := range sc.groupConsumers {
310 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
311 if err := val.Close(); err != nil {
312 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
313 }
314 }
315
316 if sc.cAdmin != nil {
317 if err := sc.cAdmin.Close(); err != nil {
318 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
319 }
320 }
321
322 //TODO: Clear the consumers map
323 //sc.clearConsumerChannelMap()
324
325 logger.Info("sarama-client-stopped")
326}
327
328//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
329// the invoking function must hold the lock
330func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
331 // Set the topic details
332 topicDetail := &sarama.TopicDetail{}
333 topicDetail.NumPartitions = int32(numPartition)
334 topicDetail.ReplicationFactor = int16(repFactor)
335 topicDetail.ConfigEntries = make(map[string]*string)
336 topicDetails := make(map[string]*sarama.TopicDetail)
337 topicDetails[topic.Name] = topicDetail
338
339 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
340 if err == sarama.ErrTopicAlreadyExists {
341 // Not an error
342 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
343 return nil
344 }
345 logger.Errorw("create-topic-failure", log.Fields{"error": err})
346 return err
347 }
348 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
349 // do so.
350 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
351 return nil
352}
353
354//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
355// ensure no two go routines are performing operations on the same topic
356func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
357 sc.lockTopic(topic)
358 defer sc.unLockTopic(topic)
359
360 return sc.createTopic(topic, numPartition, repFactor)
361}
362
363//DeleteTopic removes a topic from the kafka Broker
364func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
365 sc.lockTopic(topic)
366 defer sc.unLockTopic(topic)
367
368 // Remove the topic from the broker
369 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
370 if err == sarama.ErrUnknownTopicOrPartition {
371 // Not an error as does not exist
372 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
373 return nil
374 }
375 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
376 return err
377 }
378
379 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
380 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
381 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
382 return err
383 }
384 return nil
385}
386
387// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
388// messages from that topic
389func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
390 sc.lockTopic(topic)
391 defer sc.unLockTopic(topic)
392
393 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
394
395 // If a consumers already exist for that topic then resuse it
396 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
397 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
398 // Create a channel specific for that consumers and add it to the consumers channel map
399 ch := make(chan *ic.InterContainerMessage)
400 sc.addChannelToConsumerChannelMap(topic, ch)
401 return ch, nil
402 }
403
404 // Register for the topic and set it up
405 var consumerListeningChannel chan *ic.InterContainerMessage
406 var err error
407
408 // Use the consumerType option to figure out the type of consumer to launch
409 if sc.consumerType == PartitionConsumer {
410 if sc.autoCreateTopic {
411 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
412 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
413 return nil, err
414 }
415 }
416 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
417 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
418 return nil, err
419 }
420 } else if sc.consumerType == GroupCustomer {
421 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
422 // does not consume from a precreated topic in some scenarios
423 //if sc.autoCreateTopic {
424 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
425 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
426 // return nil, err
427 // }
428 //}
429 //groupId := sc.consumerGroupName
430 groupId := getGroupId(kvArgs...)
431 // Include the group prefix
432 if groupId != "" {
433 groupId = sc.consumerGroupPrefix + groupId
434 } else {
435 // Need to use a unique group Id per topic
436 groupId = sc.consumerGroupPrefix + topic.Name
437 }
438 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
439 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
440 return nil, err
441 }
442
443 } else {
444 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
445 return nil, errors.New("unknown-consumer-type")
446 }
447
448 return consumerListeningChannel, nil
449}
450
451//UnSubscribe unsubscribe a consumer from a given topic
452func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
453 sc.lockTopic(topic)
454 defer sc.unLockTopic(topic)
455
456 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
457 var err error
458 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
459 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
460 }
461 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
462 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
463 }
464 return err
465}
466
467func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
468 sc.metadataCallback = callback
469}
470
471func (sc *SaramaClient) updateLiveness(alive bool) {
472 // Post a consistent stream of liveness data to the channel,
473 // so that in a live state, the core does not timeout and
474 // send a forced liveness message. Production of liveness
475 // events to the channel is rate-limited by livenessChannelInterval.
476 if sc.liveness != nil {
477 if sc.alive != alive {
478 logger.Info("update-liveness-channel-because-change")
479 sc.liveness <- alive
480 sc.lastLivenessTime = time.Now()
481 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
482 logger.Info("update-liveness-channel-because-interval")
483 sc.liveness <- alive
484 sc.lastLivenessTime = time.Now()
485 }
486 }
487
488 // Only emit a log message when the state changes
489 if sc.alive != alive {
490 logger.Info("set-client-alive", log.Fields{"alive": alive})
491 sc.alive = alive
492 }
493}
494
495// Once unhealthy, we never go back
496func (sc *SaramaClient) setUnhealthy() {
497 sc.healthy = false
498 if sc.healthiness != nil {
499 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
500 sc.healthiness <- sc.healthy
501 }
502}
503
504func (sc *SaramaClient) isLivenessError(err error) bool {
505 // Sarama producers and consumers encapsulate the error inside
506 // a ProducerError or ConsumerError struct.
507 if prodError, ok := err.(*sarama.ProducerError); ok {
508 err = prodError.Err
509 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
510 err = consumerError.Err
511 }
512
513 // Sarama-Cluster will compose the error into a ClusterError struct,
514 // which we can't do a compare by reference. To handle that, we the
515 // best we can do is compare the error strings.
516
517 switch err.Error() {
518 case context.DeadlineExceeded.Error():
519 logger.Info("is-liveness-error-timeout")
520 return true
521 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
522 logger.Info("is-liveness-error-no-brokers")
523 return true
524 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
525 logger.Info("is-liveness-error-shutting-down")
526 return true
527 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
528 logger.Info("is-liveness-error-not-available")
529 return true
530 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
531 logger.Info("is-liveness-error-circuit-breaker-open")
532 return true
533 }
534
535 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
536 logger.Info("is-liveness-error-connection-refused")
537 return true
538 }
539
540 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
541 logger.Info("is-liveness-error-io-timeout")
542 return true
543 }
544
545 // Other errors shouldn't trigger a loss of liveness
546
547 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
548
549 return false
550}
551
552// send formats and sends the request onto the kafka messaging bus.
553func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
554
555 // Assert message is a proto message
556 var protoMsg proto.Message
557 var ok bool
558 // ascertain the value interface type is a proto.Message
559 if protoMsg, ok = msg.(proto.Message); !ok {
560 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
561 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
562 }
563
564 var marshalled []byte
565 var err error
566 // Create the Sarama producer message
567 if marshalled, err = proto.Marshal(protoMsg); err != nil {
568 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
569 return err
570 }
571 key := ""
572 if len(keys) > 0 {
573 key = keys[0] // Only the first key is relevant
574 }
575 kafkaMsg := &sarama.ProducerMessage{
576 Topic: topic.Name,
577 Key: sarama.StringEncoder(key),
578 Value: sarama.ByteEncoder(marshalled),
579 }
580
581 // Send message to kafka
582 sc.producer.Input() <- kafkaMsg
583 // Wait for result
584 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
585 select {
586 case ok := <-sc.producer.Successes():
587 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
588 sc.updateLiveness(true)
589 case notOk := <-sc.producer.Errors():
590 logger.Debugw("error-sending", log.Fields{"status": notOk})
591 if sc.isLivenessError(notOk) {
592 sc.updateLiveness(false)
593 }
594 return notOk
595 }
596 return nil
597}
598
599// Enable the liveness monitor channel. This channel will report
600// a "true" or "false" on every publish, which indicates whether
601// or not the channel is still live. This channel is then picked up
602// by the service (i.e. rw_core / ro_core) to update readiness status
603// and/or take other actions.
604func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
605 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
606 if enable {
607 if sc.liveness == nil {
608 logger.Info("kafka-create-liveness-channel")
609 // At least 1, so we can immediately post to it without blocking
610 // Setting a bigger number (10) allows the monitor to fall behind
611 // without blocking others. The monitor shouldn't really fall
612 // behind...
613 sc.liveness = make(chan bool, 10)
614 // post intial state to the channel
615 sc.liveness <- sc.alive
616 }
617 } else {
618 // TODO: Think about whether we need the ability to turn off
619 // liveness monitoring
620 panic("Turning off liveness reporting is not supported")
621 }
622 return sc.liveness
623}
624
625// Enable the Healthiness monitor channel. This channel will report "false"
626// if the kafka consumers die, or some other problem occurs which is
627// catastrophic that would require re-creating the client.
628func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
629 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
630 if enable {
631 if sc.healthiness == nil {
632 logger.Info("kafka-create-healthiness-channel")
633 // At least 1, so we can immediately post to it without blocking
634 // Setting a bigger number (10) allows the monitor to fall behind
635 // without blocking others. The monitor shouldn't really fall
636 // behind...
637 sc.healthiness = make(chan bool, 10)
638 // post intial state to the channel
639 sc.healthiness <- sc.healthy
640 }
641 } else {
642 // TODO: Think about whether we need the ability to turn off
643 // liveness monitoring
644 panic("Turning off healthiness reporting is not supported")
645 }
646 return sc.healthiness
647}
648
649// send an empty message on the liveness channel to check whether connectivity has
650// been restored.
651func (sc *SaramaClient) SendLiveness() error {
652 if !sc.started {
653 return fmt.Errorf("SendLiveness() called while not started")
654 }
655
656 kafkaMsg := &sarama.ProducerMessage{
657 Topic: "_liveness_test",
658 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
659 }
660
661 // Send message to kafka
662 sc.producer.Input() <- kafkaMsg
663 // Wait for result
664 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
665 select {
666 case ok := <-sc.producer.Successes():
667 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
668 sc.updateLiveness(true)
669 case notOk := <-sc.producer.Errors():
670 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
671 if sc.isLivenessError(notOk) {
672 sc.updateLiveness(false)
673 }
674 return notOk
675 }
676 return nil
677}
678
679// getGroupId returns the group id from the key-value args.
680func getGroupId(kvArgs ...*KVArg) string {
681 for _, arg := range kvArgs {
682 if arg.Key == GroupIdKey {
683 return arg.Value.(string)
684 }
685 }
686 return ""
687}
688
689// getOffset returns the offset from the key-value args.
690func getOffset(kvArgs ...*KVArg) int64 {
691 for _, arg := range kvArgs {
692 if arg.Key == Offset {
693 return arg.Value.(int64)
694 }
695 }
696 return sarama.OffsetNewest
697}
698
699func (sc *SaramaClient) createClusterAdmin() error {
700 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
701 config := sarama.NewConfig()
702 config.Version = sarama.V1_0_0_0
703
704 // Create a cluster Admin
705 var cAdmin sarama.ClusterAdmin
706 var err error
707 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
708 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
709 return err
710 }
711 sc.cAdmin = cAdmin
712 return nil
713}
714
715func (sc *SaramaClient) lockTopic(topic *Topic) {
716 sc.lockOfTopicLockMap.Lock()
717 if _, exist := sc.topicLockMap[topic.Name]; exist {
718 sc.lockOfTopicLockMap.Unlock()
719 sc.topicLockMap[topic.Name].Lock()
720 } else {
721 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
722 sc.lockOfTopicLockMap.Unlock()
723 sc.topicLockMap[topic.Name].Lock()
724 }
725}
726
727func (sc *SaramaClient) unLockTopic(topic *Topic) {
728 sc.lockOfTopicLockMap.Lock()
729 defer sc.lockOfTopicLockMap.Unlock()
730 if _, exist := sc.topicLockMap[topic.Name]; exist {
731 sc.topicLockMap[topic.Name].Unlock()
732 }
733}
734
735func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
739 sc.topicToConsumerChannelMap[id] = arg
740 }
741}
742
743func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
744 sc.lockTopicToConsumerChannelMap.Lock()
745 defer sc.lockTopicToConsumerChannelMap.Unlock()
746 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
747 delete(sc.topicToConsumerChannelMap, id)
748 }
749}
750
751func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
752 sc.lockTopicToConsumerChannelMap.RLock()
753 defer sc.lockTopicToConsumerChannelMap.RUnlock()
754
755 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
756 return consumerCh
757 }
758 return nil
759}
760
761func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
762 sc.lockTopicToConsumerChannelMap.Lock()
763 defer sc.lockTopicToConsumerChannelMap.Unlock()
764 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
765 consumerCh.channels = append(consumerCh.channels, ch)
766 return
767 }
768 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
769}
770
771//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
772func closeConsumers(consumers []interface{}) error {
773 var err error
774 for _, consumer := range consumers {
775 // Is it a partition consumers?
776 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
777 if errTemp := partionConsumer.Close(); errTemp != nil {
778 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
779 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
780 // This can occur on race condition
781 err = nil
782 } else {
783 err = errTemp
784 }
785 }
786 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
787 if errTemp := groupConsumer.Close(); errTemp != nil {
788 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
789 // This can occur on race condition
790 err = nil
791 } else {
792 err = errTemp
793 }
794 }
795 }
796 }
797 return err
798}
799
800func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
801 sc.lockTopicToConsumerChannelMap.Lock()
802 defer sc.lockTopicToConsumerChannelMap.Unlock()
803 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
804 // Channel will be closed in the removeChannel method
805 consumerCh.channels = removeChannel(consumerCh.channels, ch)
806 // If there are no more channels then we can close the consumers itself
807 if len(consumerCh.channels) == 0 {
808 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
809 err := closeConsumers(consumerCh.consumers)
810 //err := consumerCh.consumers.Close()
811 delete(sc.topicToConsumerChannelMap, topic.Name)
812 return err
813 }
814 return nil
815 }
816 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
817 return errors.New("topic-does-not-exist")
818}
819
820func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
821 sc.lockTopicToConsumerChannelMap.Lock()
822 defer sc.lockTopicToConsumerChannelMap.Unlock()
823 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
824 for _, ch := range consumerCh.channels {
825 // Channel will be closed in the removeChannel method
826 removeChannel(consumerCh.channels, ch)
827 }
828 err := closeConsumers(consumerCh.consumers)
829 //if err == sarama.ErrUnknownTopicOrPartition {
830 // // Not an error
831 // err = nil
832 //}
833 //err := consumerCh.consumers.Close()
834 delete(sc.topicToConsumerChannelMap, topic.Name)
835 return err
836 }
837 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
838 return nil
839}
840
841func (sc *SaramaClient) clearConsumerChannelMap() error {
842 sc.lockTopicToConsumerChannelMap.Lock()
843 defer sc.lockTopicToConsumerChannelMap.Unlock()
844 var err error
845 for topic, consumerCh := range sc.topicToConsumerChannelMap {
846 for _, ch := range consumerCh.channels {
847 // Channel will be closed in the removeChannel method
848 removeChannel(consumerCh.channels, ch)
849 }
850 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
851 err = errTemp
852 }
853 //err = consumerCh.consumers.Close()
854 delete(sc.topicToConsumerChannelMap, topic)
855 }
856 return err
857}
858
859//createPublisher creates the publisher which is used to send a message onto kafka
860func (sc *SaramaClient) createPublisher() error {
861 // This Creates the publisher
862 config := sarama.NewConfig()
863 config.Producer.Partitioner = sarama.NewRandomPartitioner
864 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
865 config.Producer.Flush.Messages = sc.producerFlushMessages
866 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
867 config.Producer.Return.Errors = sc.producerReturnErrors
868 config.Producer.Return.Successes = sc.producerReturnSuccess
869 //config.Producer.RequiredAcks = sarama.WaitForAll
870 config.Producer.RequiredAcks = sarama.WaitForLocal
871
872 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
873 brokers := []string{kafkaFullAddr}
874
875 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
876 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
877 return err
878 } else {
879 sc.producer = producer
880 }
881 logger.Info("Kafka-publisher-created")
882 return nil
883}
884
885func (sc *SaramaClient) createConsumer() error {
886 config := sarama.NewConfig()
887 config.Consumer.Return.Errors = true
888 config.Consumer.Fetch.Min = 1
889 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
890 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
891 config.Consumer.Offsets.Initial = sarama.OffsetNewest
892 config.Metadata.Retry.Max = sc.metadataMaxRetry
893 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
894 brokers := []string{kafkaFullAddr}
895
896 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
897 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
898 return err
899 } else {
900 sc.consumer = consumer
901 }
902 logger.Info("Kafka-consumers-created")
903 return nil
904}
905
906// createGroupConsumer creates a consumers group
907func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
908 config := scc.NewConfig()
909 config.ClientID = uuid.New().String()
910 config.Group.Mode = scc.ConsumerModeMultiplex
911 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
912 config.Consumer.Return.Errors = true
913 //config.Group.Return.Notifications = false
914 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
915 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
916 config.Consumer.Offsets.Initial = initialOffset
917 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
918 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
919 brokers := []string{kafkaFullAddr}
920
921 topics := []string{topic.Name}
922 var consumer *scc.Consumer
923 var err error
924
925 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
926 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
927 return nil, err
928 }
929 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
930
931 //sc.groupConsumers[topic.Name] = consumer
932 sc.addToGroupConsumers(topic.Name, consumer)
933 return consumer, nil
934}
935
936// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
937// topic via the unique channel each subscriber received during subscription
938func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
939 // Need to go over all channels and publish messages to them - do we need to copy msg?
940 sc.lockTopicToConsumerChannelMap.RLock()
941 for _, ch := range consumerCh.channels {
942 go func(c chan *ic.InterContainerMessage) {
943 c <- protoMessage
944 }(ch)
945 }
946 sc.lockTopicToConsumerChannelMap.RUnlock()
947
948 if callback := sc.metadataCallback; callback != nil {
949 callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
950 }
951}
952
953func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
954 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
955startloop:
956 for {
957 select {
958 case err, ok := <-consumer.Errors():
959 if ok {
960 if sc.isLivenessError(err) {
961 sc.updateLiveness(false)
962 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
963 }
964 } else {
965 // Channel is closed
966 break startloop
967 }
968 case msg, ok := <-consumer.Messages():
969 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
970 if !ok {
971 // channel is closed
972 break startloop
973 }
974 msgBody := msg.Value
975 sc.updateLiveness(true)
976 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
977 icm := &ic.InterContainerMessage{}
978 if err := proto.Unmarshal(msgBody, icm); err != nil {
979 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
980 continue
981 }
982 go sc.dispatchToConsumers(consumerChnls, icm)
983 case <-sc.doneCh:
984 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
985 break startloop
986 }
987 }
988 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
989 sc.setUnhealthy()
990}
991
992func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
993 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
994
995startloop:
996 for {
997 select {
998 case err, ok := <-consumer.Errors():
999 if ok {
1000 if sc.isLivenessError(err) {
1001 sc.updateLiveness(false)
1002 }
1003 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
1004 } else {
1005 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
1006 // channel is closed
1007 break startloop
1008 }
1009 case msg, ok := <-consumer.Messages():
1010 if !ok {
1011 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
1012 // Channel closed
1013 break startloop
1014 }
1015 sc.updateLiveness(true)
1016 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
1017 msgBody := msg.Value
1018 icm := &ic.InterContainerMessage{}
1019 if err := proto.Unmarshal(msgBody, icm); err != nil {
1020 logger.Warnw("invalid-message", log.Fields{"error": err})
1021 continue
1022 }
1023 go sc.dispatchToConsumers(consumerChnls, icm)
1024 consumer.MarkOffset(msg, "")
1025 case ntf := <-consumer.Notifications():
1026 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
1027 case <-sc.doneCh:
1028 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
1029 break startloop
1030 }
1031 }
1032 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
1033 sc.setUnhealthy()
1034}
1035
1036func (sc *SaramaClient) startConsumers(topic *Topic) error {
1037 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
1038 var consumerCh *consumerChannels
1039 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
1040 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
1041 return errors.New("consumers-not-exist")
1042 }
1043 // For each consumer listening for that topic, start a consumption loop
1044 for _, consumer := range consumerCh.consumers {
1045 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1046 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1047 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1048 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1049 } else {
1050 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
1051 return errors.New("invalid-consumer")
1052 }
1053 }
1054 return nil
1055}
1056
1057//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1058//// for that topic. It also starts the routine that listens for messages on that topic.
1059func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1060 var pConsumers []sarama.PartitionConsumer
1061 var err error
1062
1063 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
1064 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1065 return nil, err
1066 }
1067
1068 consumersIf := make([]interface{}, 0)
1069 for _, pConsumer := range pConsumers {
1070 consumersIf = append(consumersIf, pConsumer)
1071 }
1072
1073 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1074 // unbuffered to verify race conditions.
1075 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1076 cc := &consumerChannels{
1077 consumers: consumersIf,
1078 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1079 }
1080
1081 // Add the consumers channel to the map
1082 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1083
1084 //Start a consumers to listen on that specific topic
1085 go sc.startConsumers(topic)
1086
1087 return consumerListeningChannel, nil
1088}
1089
1090// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1091// for that topic. It also starts the routine that listens for messages on that topic.
1092func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1093 // TODO: Replace this development partition consumers with a group consumers
1094 var pConsumer *scc.Consumer
1095 var err error
1096 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1097 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1098 return nil, err
1099 }
1100 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1101 // unbuffered to verify race conditions.
1102 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1103 cc := &consumerChannels{
1104 consumers: []interface{}{pConsumer},
1105 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1106 }
1107
1108 // Add the consumers channel to the map
1109 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1110
1111 //Start a consumers to listen on that specific topic
1112 go sc.startConsumers(topic)
1113
1114 return consumerListeningChannel, nil
1115}
1116
1117func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1118 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
1119 partitionList, err := sc.consumer.Partitions(topic.Name)
1120 if err != nil {
1121 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1122 return nil, err
1123 }
1124
1125 pConsumers := make([]sarama.PartitionConsumer, 0)
1126 for _, partition := range partitionList {
1127 var pConsumer sarama.PartitionConsumer
1128 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1129 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1130 return nil, err
1131 }
1132 pConsumers = append(pConsumers, pConsumer)
1133 }
1134 return pConsumers, nil
1135}
1136
1137func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1138 var i int
1139 var channel chan *ic.InterContainerMessage
1140 for i, channel = range channels {
1141 if channel == ch {
1142 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1143 close(channel)
1144 logger.Debug("channel-closed")
1145 return channels[:len(channels)-1]
1146 }
1147 }
1148 return channels
1149}
1150
1151func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1152 sc.lockOfGroupConsumers.Lock()
1153 defer sc.lockOfGroupConsumers.Unlock()
1154 if _, exist := sc.groupConsumers[topic]; !exist {
1155 sc.groupConsumers[topic] = consumer
1156 }
1157}
1158
1159func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1160 sc.lockOfGroupConsumers.Lock()
1161 defer sc.lockOfGroupConsumers.Unlock()
1162 if _, exist := sc.groupConsumers[topic]; exist {
1163 consumer := sc.groupConsumers[topic]
1164 delete(sc.groupConsumers, topic)
1165 if err := consumer.Close(); err != nil {
1166 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
1167 return err
1168 }
1169 }
1170 return nil
1171}