blob: 468e546ec4222f304dd2a99ab40d454fa34995e5 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "strings"
23 "sync"
24 "time"
25
26 "github.com/Shopify/sarama"
27 scc "github.com/bsm/sarama-cluster"
28 "github.com/eapache/go-resiliency/breaker"
29 "github.com/golang/protobuf/proto"
Matteo Scandolod132c0e2020-04-24 17:06:25 -070030 "github.com/golang/protobuf/ptypes"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000031 "github.com/google/uuid"
32 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
34)
35
Holger Hildebrandtfa074992020-03-27 15:42:06 +000036// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
37// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
38//consumer or a group consumer
39type consumerChannels struct {
40 consumers []interface{}
41 channels []chan *ic.InterContainerMessage
42}
43
44// static check to ensure SaramaClient implements Client
45var _ Client = &SaramaClient{}
46
47// SaramaClient represents the messaging proxy
48type SaramaClient struct {
49 cAdmin sarama.ClusterAdmin
Holger Hildebrandtfa074992020-03-27 15:42:06 +000050 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumers map[string]*scc.Consumer
55 lockOfGroupConsumers sync.RWMutex
56 consumerGroupPrefix string
57 consumerType int
58 consumerGroupName string
59 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
71 doneCh chan int
Matteo Scandolod132c0e2020-04-24 17:06:25 -070072 metadataCallback func(fromTopic string, timestamp time.Time)
Holger Hildebrandtfa074992020-03-27 15:42:06 +000073 topicToConsumerChannelMap map[string]*consumerChannels
74 lockTopicToConsumerChannelMap sync.RWMutex
75 topicLockMap map[string]*sync.RWMutex
76 lockOfTopicLockMap sync.RWMutex
77 metadataMaxRetry int
78 alive bool
79 liveness chan bool
80 livenessChannelInterval time.Duration
81 lastLivenessTime time.Time
82 started bool
83 healthy bool
84 healthiness chan bool
85}
86
87type SaramaClientOption func(*SaramaClient)
88
89func Host(host string) SaramaClientOption {
90 return func(args *SaramaClient) {
91 args.KafkaHost = host
92 }
93}
94
95func Port(port int) SaramaClientOption {
96 return func(args *SaramaClient) {
97 args.KafkaPort = port
98 }
99}
100
101func ConsumerGroupPrefix(prefix string) SaramaClientOption {
102 return func(args *SaramaClient) {
103 args.consumerGroupPrefix = prefix
104 }
105}
106
107func ConsumerGroupName(name string) SaramaClientOption {
108 return func(args *SaramaClient) {
109 args.consumerGroupName = name
110 }
111}
112
113func ConsumerType(consumer int) SaramaClientOption {
114 return func(args *SaramaClient) {
115 args.consumerType = consumer
116 }
117}
118
119func ProducerFlushFrequency(frequency int) SaramaClientOption {
120 return func(args *SaramaClient) {
121 args.producerFlushFrequency = frequency
122 }
123}
124
125func ProducerFlushMessages(num int) SaramaClientOption {
126 return func(args *SaramaClient) {
127 args.producerFlushMessages = num
128 }
129}
130
131func ProducerFlushMaxMessages(num int) SaramaClientOption {
132 return func(args *SaramaClient) {
133 args.producerFlushMaxmessages = num
134 }
135}
136
137func ProducerMaxRetries(num int) SaramaClientOption {
138 return func(args *SaramaClient) {
139 args.producerRetryMax = num
140 }
141}
142
143func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
144 return func(args *SaramaClient) {
145 args.producerRetryBackOff = duration
146 }
147}
148
149func ProducerReturnOnErrors(opt bool) SaramaClientOption {
150 return func(args *SaramaClient) {
151 args.producerReturnErrors = opt
152 }
153}
154
155func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
156 return func(args *SaramaClient) {
157 args.producerReturnSuccess = opt
158 }
159}
160
161func ConsumerMaxWait(wait int) SaramaClientOption {
162 return func(args *SaramaClient) {
163 args.consumerMaxwait = wait
164 }
165}
166
167func MaxProcessingTime(pTime int) SaramaClientOption {
168 return func(args *SaramaClient) {
169 args.maxProcessingTime = pTime
170 }
171}
172
173func NumPartitions(number int) SaramaClientOption {
174 return func(args *SaramaClient) {
175 args.numPartitions = number
176 }
177}
178
179func NumReplicas(number int) SaramaClientOption {
180 return func(args *SaramaClient) {
181 args.numReplicas = number
182 }
183}
184
185func AutoCreateTopic(opt bool) SaramaClientOption {
186 return func(args *SaramaClient) {
187 args.autoCreateTopic = opt
188 }
189}
190
191func MetadatMaxRetries(retry int) SaramaClientOption {
192 return func(args *SaramaClient) {
193 args.metadataMaxRetry = retry
194 }
195}
196
197func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
198 return func(args *SaramaClient) {
199 args.livenessChannelInterval = opt
200 }
201}
202
203func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
204 client := &SaramaClient{
205 KafkaHost: DefaultKafkaHost,
206 KafkaPort: DefaultKafkaPort,
207 }
208 client.consumerType = DefaultConsumerType
209 client.producerFlushFrequency = DefaultProducerFlushFrequency
210 client.producerFlushMessages = DefaultProducerFlushMessages
211 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
212 client.producerReturnErrors = DefaultProducerReturnErrors
213 client.producerReturnSuccess = DefaultProducerReturnSuccess
214 client.producerRetryMax = DefaultProducerRetryMax
215 client.producerRetryBackOff = DefaultProducerRetryBackoff
216 client.consumerMaxwait = DefaultConsumerMaxwait
217 client.maxProcessingTime = DefaultMaxProcessingTime
218 client.numPartitions = DefaultNumberPartitions
219 client.numReplicas = DefaultNumberReplicas
220 client.autoCreateTopic = DefaultAutoCreateTopic
221 client.metadataMaxRetry = DefaultMetadataMaxRetry
222 client.livenessChannelInterval = DefaultLivenessChannelInterval
223
224 for _, option := range opts {
225 option(client)
226 }
227
228 client.groupConsumers = make(map[string]*scc.Consumer)
229
230 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
231 client.topicLockMap = make(map[string]*sync.RWMutex)
232 client.lockOfTopicLockMap = sync.RWMutex{}
233 client.lockOfGroupConsumers = sync.RWMutex{}
234
235 // healthy and alive until proven otherwise
236 client.alive = true
237 client.healthy = true
238
239 return client
240}
241
242func (sc *SaramaClient) Start() error {
243 logger.Info("Starting-kafka-sarama-client")
244
245 // Create the Done channel
246 sc.doneCh = make(chan int, 1)
247
248 var err error
249
250 // Add a cleanup in case of failure to startup
251 defer func() {
252 if err != nil {
253 sc.Stop()
254 }
255 }()
256
257 // Create the Cluster Admin
258 if err = sc.createClusterAdmin(); err != nil {
259 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
260 return err
261 }
262
263 // Create the Publisher
264 if err := sc.createPublisher(); err != nil {
265 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
266 return err
267 }
268
269 if sc.consumerType == DefaultConsumerType {
270 // Create the master consumers
271 if err := sc.createConsumer(); err != nil {
272 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
273 return err
274 }
275 }
276
277 // Create the topic to consumers/channel map
278 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
279
280 logger.Info("kafka-sarama-client-started")
281
282 sc.started = true
283
284 return nil
285}
286
287func (sc *SaramaClient) Stop() {
288 logger.Info("stopping-sarama-client")
289
290 sc.started = false
291
292 //Send a message over the done channel to close all long running routines
293 sc.doneCh <- 1
294
295 if sc.producer != nil {
296 if err := sc.producer.Close(); err != nil {
297 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
298 }
299 }
300
301 if sc.consumer != nil {
302 if err := sc.consumer.Close(); err != nil {
303 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
304 }
305 }
306
307 for key, val := range sc.groupConsumers {
308 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
309 if err := val.Close(); err != nil {
310 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
311 }
312 }
313
314 if sc.cAdmin != nil {
315 if err := sc.cAdmin.Close(); err != nil {
316 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
317 }
318 }
319
320 //TODO: Clear the consumers map
321 //sc.clearConsumerChannelMap()
322
323 logger.Info("sarama-client-stopped")
324}
325
326//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
327// the invoking function must hold the lock
328func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
329 // Set the topic details
330 topicDetail := &sarama.TopicDetail{}
331 topicDetail.NumPartitions = int32(numPartition)
332 topicDetail.ReplicationFactor = int16(repFactor)
333 topicDetail.ConfigEntries = make(map[string]*string)
334 topicDetails := make(map[string]*sarama.TopicDetail)
335 topicDetails[topic.Name] = topicDetail
336
337 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
338 if err == sarama.ErrTopicAlreadyExists {
339 // Not an error
340 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
341 return nil
342 }
343 logger.Errorw("create-topic-failure", log.Fields{"error": err})
344 return err
345 }
346 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
347 // do so.
348 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
349 return nil
350}
351
352//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
353// ensure no two go routines are performing operations on the same topic
354func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
355 sc.lockTopic(topic)
356 defer sc.unLockTopic(topic)
357
358 return sc.createTopic(topic, numPartition, repFactor)
359}
360
361//DeleteTopic removes a topic from the kafka Broker
362func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
363 sc.lockTopic(topic)
364 defer sc.unLockTopic(topic)
365
366 // Remove the topic from the broker
367 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
368 if err == sarama.ErrUnknownTopicOrPartition {
369 // Not an error as does not exist
370 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
371 return nil
372 }
373 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
374 return err
375 }
376
377 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
378 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
379 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
380 return err
381 }
382 return nil
383}
384
385// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
386// messages from that topic
387func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
388 sc.lockTopic(topic)
389 defer sc.unLockTopic(topic)
390
391 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
392
393 // If a consumers already exist for that topic then resuse it
394 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
395 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
396 // Create a channel specific for that consumers and add it to the consumers channel map
397 ch := make(chan *ic.InterContainerMessage)
398 sc.addChannelToConsumerChannelMap(topic, ch)
399 return ch, nil
400 }
401
402 // Register for the topic and set it up
403 var consumerListeningChannel chan *ic.InterContainerMessage
404 var err error
405
406 // Use the consumerType option to figure out the type of consumer to launch
407 if sc.consumerType == PartitionConsumer {
408 if sc.autoCreateTopic {
409 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
410 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
411 return nil, err
412 }
413 }
414 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
415 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
416 return nil, err
417 }
418 } else if sc.consumerType == GroupCustomer {
419 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
420 // does not consume from a precreated topic in some scenarios
421 //if sc.autoCreateTopic {
422 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
423 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
424 // return nil, err
425 // }
426 //}
427 //groupId := sc.consumerGroupName
428 groupId := getGroupId(kvArgs...)
429 // Include the group prefix
430 if groupId != "" {
431 groupId = sc.consumerGroupPrefix + groupId
432 } else {
433 // Need to use a unique group Id per topic
434 groupId = sc.consumerGroupPrefix + topic.Name
435 }
436 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
437 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
438 return nil, err
439 }
440
441 } else {
442 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
443 return nil, errors.New("unknown-consumer-type")
444 }
445
446 return consumerListeningChannel, nil
447}
448
449//UnSubscribe unsubscribe a consumer from a given topic
450func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
451 sc.lockTopic(topic)
452 defer sc.unLockTopic(topic)
453
454 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
455 var err error
456 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
457 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
458 }
459 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
460 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
461 }
462 return err
463}
464
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700465func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp time.Time)) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000466 sc.metadataCallback = callback
467}
468
469func (sc *SaramaClient) updateLiveness(alive bool) {
470 // Post a consistent stream of liveness data to the channel,
471 // so that in a live state, the core does not timeout and
472 // send a forced liveness message. Production of liveness
473 // events to the channel is rate-limited by livenessChannelInterval.
474 if sc.liveness != nil {
475 if sc.alive != alive {
476 logger.Info("update-liveness-channel-because-change")
477 sc.liveness <- alive
478 sc.lastLivenessTime = time.Now()
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700479 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000480 logger.Info("update-liveness-channel-because-interval")
481 sc.liveness <- alive
482 sc.lastLivenessTime = time.Now()
483 }
484 }
485
486 // Only emit a log message when the state changes
487 if sc.alive != alive {
488 logger.Info("set-client-alive", log.Fields{"alive": alive})
489 sc.alive = alive
490 }
491}
492
493// Once unhealthy, we never go back
494func (sc *SaramaClient) setUnhealthy() {
495 sc.healthy = false
496 if sc.healthiness != nil {
497 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
498 sc.healthiness <- sc.healthy
499 }
500}
501
502func (sc *SaramaClient) isLivenessError(err error) bool {
503 // Sarama producers and consumers encapsulate the error inside
504 // a ProducerError or ConsumerError struct.
505 if prodError, ok := err.(*sarama.ProducerError); ok {
506 err = prodError.Err
507 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
508 err = consumerError.Err
509 }
510
511 // Sarama-Cluster will compose the error into a ClusterError struct,
512 // which we can't do a compare by reference. To handle that, we the
513 // best we can do is compare the error strings.
514
515 switch err.Error() {
516 case context.DeadlineExceeded.Error():
517 logger.Info("is-liveness-error-timeout")
518 return true
519 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
520 logger.Info("is-liveness-error-no-brokers")
521 return true
522 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
523 logger.Info("is-liveness-error-shutting-down")
524 return true
525 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
526 logger.Info("is-liveness-error-not-available")
527 return true
528 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
529 logger.Info("is-liveness-error-circuit-breaker-open")
530 return true
531 }
532
533 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
534 logger.Info("is-liveness-error-connection-refused")
535 return true
536 }
537
538 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
539 logger.Info("is-liveness-error-io-timeout")
540 return true
541 }
542
543 // Other errors shouldn't trigger a loss of liveness
544
545 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
546
547 return false
548}
549
550// send formats and sends the request onto the kafka messaging bus.
551func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
552
553 // Assert message is a proto message
554 var protoMsg proto.Message
555 var ok bool
556 // ascertain the value interface type is a proto.Message
557 if protoMsg, ok = msg.(proto.Message); !ok {
558 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700559 return fmt.Errorf("not-a-proto-msg-%s", msg)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000560 }
561
562 var marshalled []byte
563 var err error
564 // Create the Sarama producer message
565 if marshalled, err = proto.Marshal(protoMsg); err != nil {
566 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
567 return err
568 }
569 key := ""
570 if len(keys) > 0 {
571 key = keys[0] // Only the first key is relevant
572 }
573 kafkaMsg := &sarama.ProducerMessage{
574 Topic: topic.Name,
575 Key: sarama.StringEncoder(key),
576 Value: sarama.ByteEncoder(marshalled),
577 }
578
579 // Send message to kafka
580 sc.producer.Input() <- kafkaMsg
581 // Wait for result
582 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
583 select {
584 case ok := <-sc.producer.Successes():
585 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
586 sc.updateLiveness(true)
587 case notOk := <-sc.producer.Errors():
588 logger.Debugw("error-sending", log.Fields{"status": notOk})
589 if sc.isLivenessError(notOk) {
590 sc.updateLiveness(false)
591 }
592 return notOk
593 }
594 return nil
595}
596
597// Enable the liveness monitor channel. This channel will report
598// a "true" or "false" on every publish, which indicates whether
599// or not the channel is still live. This channel is then picked up
600// by the service (i.e. rw_core / ro_core) to update readiness status
601// and/or take other actions.
602func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
603 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
604 if enable {
605 if sc.liveness == nil {
606 logger.Info("kafka-create-liveness-channel")
607 // At least 1, so we can immediately post to it without blocking
608 // Setting a bigger number (10) allows the monitor to fall behind
609 // without blocking others. The monitor shouldn't really fall
610 // behind...
611 sc.liveness = make(chan bool, 10)
612 // post intial state to the channel
613 sc.liveness <- sc.alive
614 }
615 } else {
616 // TODO: Think about whether we need the ability to turn off
617 // liveness monitoring
618 panic("Turning off liveness reporting is not supported")
619 }
620 return sc.liveness
621}
622
623// Enable the Healthiness monitor channel. This channel will report "false"
624// if the kafka consumers die, or some other problem occurs which is
625// catastrophic that would require re-creating the client.
626func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
627 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
628 if enable {
629 if sc.healthiness == nil {
630 logger.Info("kafka-create-healthiness-channel")
631 // At least 1, so we can immediately post to it without blocking
632 // Setting a bigger number (10) allows the monitor to fall behind
633 // without blocking others. The monitor shouldn't really fall
634 // behind...
635 sc.healthiness = make(chan bool, 10)
636 // post intial state to the channel
637 sc.healthiness <- sc.healthy
638 }
639 } else {
640 // TODO: Think about whether we need the ability to turn off
641 // liveness monitoring
642 panic("Turning off healthiness reporting is not supported")
643 }
644 return sc.healthiness
645}
646
647// send an empty message on the liveness channel to check whether connectivity has
648// been restored.
649func (sc *SaramaClient) SendLiveness() error {
650 if !sc.started {
651 return fmt.Errorf("SendLiveness() called while not started")
652 }
653
654 kafkaMsg := &sarama.ProducerMessage{
655 Topic: "_liveness_test",
656 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
657 }
658
659 // Send message to kafka
660 sc.producer.Input() <- kafkaMsg
661 // Wait for result
662 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
663 select {
664 case ok := <-sc.producer.Successes():
665 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
666 sc.updateLiveness(true)
667 case notOk := <-sc.producer.Errors():
668 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
669 if sc.isLivenessError(notOk) {
670 sc.updateLiveness(false)
671 }
672 return notOk
673 }
674 return nil
675}
676
677// getGroupId returns the group id from the key-value args.
678func getGroupId(kvArgs ...*KVArg) string {
679 for _, arg := range kvArgs {
680 if arg.Key == GroupIdKey {
681 return arg.Value.(string)
682 }
683 }
684 return ""
685}
686
687// getOffset returns the offset from the key-value args.
688func getOffset(kvArgs ...*KVArg) int64 {
689 for _, arg := range kvArgs {
690 if arg.Key == Offset {
691 return arg.Value.(int64)
692 }
693 }
694 return sarama.OffsetNewest
695}
696
697func (sc *SaramaClient) createClusterAdmin() error {
698 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
699 config := sarama.NewConfig()
700 config.Version = sarama.V1_0_0_0
701
702 // Create a cluster Admin
703 var cAdmin sarama.ClusterAdmin
704 var err error
705 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
706 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
707 return err
708 }
709 sc.cAdmin = cAdmin
710 return nil
711}
712
713func (sc *SaramaClient) lockTopic(topic *Topic) {
714 sc.lockOfTopicLockMap.Lock()
715 if _, exist := sc.topicLockMap[topic.Name]; exist {
716 sc.lockOfTopicLockMap.Unlock()
717 sc.topicLockMap[topic.Name].Lock()
718 } else {
719 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
720 sc.lockOfTopicLockMap.Unlock()
721 sc.topicLockMap[topic.Name].Lock()
722 }
723}
724
725func (sc *SaramaClient) unLockTopic(topic *Topic) {
726 sc.lockOfTopicLockMap.Lock()
727 defer sc.lockOfTopicLockMap.Unlock()
728 if _, exist := sc.topicLockMap[topic.Name]; exist {
729 sc.topicLockMap[topic.Name].Unlock()
730 }
731}
732
733func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
734 sc.lockTopicToConsumerChannelMap.Lock()
735 defer sc.lockTopicToConsumerChannelMap.Unlock()
736 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
737 sc.topicToConsumerChannelMap[id] = arg
738 }
739}
740
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000741func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
742 sc.lockTopicToConsumerChannelMap.RLock()
743 defer sc.lockTopicToConsumerChannelMap.RUnlock()
744
745 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
746 return consumerCh
747 }
748 return nil
749}
750
751func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
752 sc.lockTopicToConsumerChannelMap.Lock()
753 defer sc.lockTopicToConsumerChannelMap.Unlock()
754 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
755 consumerCh.channels = append(consumerCh.channels, ch)
756 return
757 }
758 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
759}
760
761//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
762func closeConsumers(consumers []interface{}) error {
763 var err error
764 for _, consumer := range consumers {
765 // Is it a partition consumers?
766 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
767 if errTemp := partionConsumer.Close(); errTemp != nil {
768 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
769 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
770 // This can occur on race condition
771 err = nil
772 } else {
773 err = errTemp
774 }
775 }
776 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
777 if errTemp := groupConsumer.Close(); errTemp != nil {
778 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
779 // This can occur on race condition
780 err = nil
781 } else {
782 err = errTemp
783 }
784 }
785 }
786 }
787 return err
788}
789
790func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
791 sc.lockTopicToConsumerChannelMap.Lock()
792 defer sc.lockTopicToConsumerChannelMap.Unlock()
793 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
794 // Channel will be closed in the removeChannel method
795 consumerCh.channels = removeChannel(consumerCh.channels, ch)
796 // If there are no more channels then we can close the consumers itself
797 if len(consumerCh.channels) == 0 {
798 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
799 err := closeConsumers(consumerCh.consumers)
800 //err := consumerCh.consumers.Close()
801 delete(sc.topicToConsumerChannelMap, topic.Name)
802 return err
803 }
804 return nil
805 }
806 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
807 return errors.New("topic-does-not-exist")
808}
809
810func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
811 sc.lockTopicToConsumerChannelMap.Lock()
812 defer sc.lockTopicToConsumerChannelMap.Unlock()
813 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
814 for _, ch := range consumerCh.channels {
815 // Channel will be closed in the removeChannel method
816 removeChannel(consumerCh.channels, ch)
817 }
818 err := closeConsumers(consumerCh.consumers)
819 //if err == sarama.ErrUnknownTopicOrPartition {
820 // // Not an error
821 // err = nil
822 //}
823 //err := consumerCh.consumers.Close()
824 delete(sc.topicToConsumerChannelMap, topic.Name)
825 return err
826 }
827 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
828 return nil
829}
830
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000831//createPublisher creates the publisher which is used to send a message onto kafka
832func (sc *SaramaClient) createPublisher() error {
833 // This Creates the publisher
834 config := sarama.NewConfig()
835 config.Producer.Partitioner = sarama.NewRandomPartitioner
836 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
837 config.Producer.Flush.Messages = sc.producerFlushMessages
838 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
839 config.Producer.Return.Errors = sc.producerReturnErrors
840 config.Producer.Return.Successes = sc.producerReturnSuccess
841 //config.Producer.RequiredAcks = sarama.WaitForAll
842 config.Producer.RequiredAcks = sarama.WaitForLocal
843
844 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
845 brokers := []string{kafkaFullAddr}
846
847 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
848 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
849 return err
850 } else {
851 sc.producer = producer
852 }
853 logger.Info("Kafka-publisher-created")
854 return nil
855}
856
857func (sc *SaramaClient) createConsumer() error {
858 config := sarama.NewConfig()
859 config.Consumer.Return.Errors = true
860 config.Consumer.Fetch.Min = 1
861 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
862 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
863 config.Consumer.Offsets.Initial = sarama.OffsetNewest
864 config.Metadata.Retry.Max = sc.metadataMaxRetry
865 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
866 brokers := []string{kafkaFullAddr}
867
868 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
869 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
870 return err
871 } else {
872 sc.consumer = consumer
873 }
874 logger.Info("Kafka-consumers-created")
875 return nil
876}
877
878// createGroupConsumer creates a consumers group
879func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
880 config := scc.NewConfig()
881 config.ClientID = uuid.New().String()
882 config.Group.Mode = scc.ConsumerModeMultiplex
883 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
884 config.Consumer.Return.Errors = true
885 //config.Group.Return.Notifications = false
886 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
887 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
888 config.Consumer.Offsets.Initial = initialOffset
889 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
890 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
891 brokers := []string{kafkaFullAddr}
892
893 topics := []string{topic.Name}
894 var consumer *scc.Consumer
895 var err error
896
897 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
898 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
899 return nil, err
900 }
901 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
902
903 //sc.groupConsumers[topic.Name] = consumer
904 sc.addToGroupConsumers(topic.Name, consumer)
905 return consumer, nil
906}
907
908// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
909// topic via the unique channel each subscriber received during subscription
910func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
911 // Need to go over all channels and publish messages to them - do we need to copy msg?
912 sc.lockTopicToConsumerChannelMap.RLock()
913 for _, ch := range consumerCh.channels {
914 go func(c chan *ic.InterContainerMessage) {
915 c <- protoMessage
916 }(ch)
917 }
918 sc.lockTopicToConsumerChannelMap.RUnlock()
919
920 if callback := sc.metadataCallback; callback != nil {
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700921 ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
922 callback(protoMessage.Header.FromTopic, ts)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000923 }
924}
925
926func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
927 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
928startloop:
929 for {
930 select {
931 case err, ok := <-consumer.Errors():
932 if ok {
933 if sc.isLivenessError(err) {
934 sc.updateLiveness(false)
935 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
936 }
937 } else {
938 // Channel is closed
939 break startloop
940 }
941 case msg, ok := <-consumer.Messages():
942 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
943 if !ok {
944 // channel is closed
945 break startloop
946 }
947 msgBody := msg.Value
948 sc.updateLiveness(true)
949 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
950 icm := &ic.InterContainerMessage{}
951 if err := proto.Unmarshal(msgBody, icm); err != nil {
952 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
953 continue
954 }
955 go sc.dispatchToConsumers(consumerChnls, icm)
956 case <-sc.doneCh:
957 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
958 break startloop
959 }
960 }
961 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
962 sc.setUnhealthy()
963}
964
965func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
966 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
967
968startloop:
969 for {
970 select {
971 case err, ok := <-consumer.Errors():
972 if ok {
973 if sc.isLivenessError(err) {
974 sc.updateLiveness(false)
975 }
976 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
977 } else {
978 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
979 // channel is closed
980 break startloop
981 }
982 case msg, ok := <-consumer.Messages():
983 if !ok {
984 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
985 // Channel closed
986 break startloop
987 }
988 sc.updateLiveness(true)
989 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
990 msgBody := msg.Value
991 icm := &ic.InterContainerMessage{}
992 if err := proto.Unmarshal(msgBody, icm); err != nil {
993 logger.Warnw("invalid-message", log.Fields{"error": err})
994 continue
995 }
996 go sc.dispatchToConsumers(consumerChnls, icm)
997 consumer.MarkOffset(msg, "")
998 case ntf := <-consumer.Notifications():
999 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
1000 case <-sc.doneCh:
1001 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
1002 break startloop
1003 }
1004 }
1005 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
1006 sc.setUnhealthy()
1007}
1008
1009func (sc *SaramaClient) startConsumers(topic *Topic) error {
1010 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
1011 var consumerCh *consumerChannels
1012 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
1013 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
1014 return errors.New("consumers-not-exist")
1015 }
1016 // For each consumer listening for that topic, start a consumption loop
1017 for _, consumer := range consumerCh.consumers {
1018 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1019 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1020 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1021 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1022 } else {
1023 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
1024 return errors.New("invalid-consumer")
1025 }
1026 }
1027 return nil
1028}
1029
1030//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1031//// for that topic. It also starts the routine that listens for messages on that topic.
1032func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1033 var pConsumers []sarama.PartitionConsumer
1034 var err error
1035
1036 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
1037 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1038 return nil, err
1039 }
1040
1041 consumersIf := make([]interface{}, 0)
1042 for _, pConsumer := range pConsumers {
1043 consumersIf = append(consumersIf, pConsumer)
1044 }
1045
1046 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1047 // unbuffered to verify race conditions.
1048 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1049 cc := &consumerChannels{
1050 consumers: consumersIf,
1051 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1052 }
1053
1054 // Add the consumers channel to the map
1055 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1056
1057 //Start a consumers to listen on that specific topic
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -07001058 go func() {
1059 if err := sc.startConsumers(topic); err != nil {
1060 logger.Errorw("start-consumers-failed", log.Fields{
1061 "topic": topic,
1062 "error": err})
1063 }
1064 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001065
1066 return consumerListeningChannel, nil
1067}
1068
1069// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1070// for that topic. It also starts the routine that listens for messages on that topic.
1071func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1072 // TODO: Replace this development partition consumers with a group consumers
1073 var pConsumer *scc.Consumer
1074 var err error
1075 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1076 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1077 return nil, err
1078 }
1079 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1080 // unbuffered to verify race conditions.
1081 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1082 cc := &consumerChannels{
1083 consumers: []interface{}{pConsumer},
1084 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1085 }
1086
1087 // Add the consumers channel to the map
1088 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1089
1090 //Start a consumers to listen on that specific topic
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -07001091 go func() {
1092 if err := sc.startConsumers(topic); err != nil {
1093 logger.Errorw("start-consumers-failed", log.Fields{
1094 "topic": topic,
1095 "error": err})
1096 }
1097 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001098
1099 return consumerListeningChannel, nil
1100}
1101
1102func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1103 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
1104 partitionList, err := sc.consumer.Partitions(topic.Name)
1105 if err != nil {
1106 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1107 return nil, err
1108 }
1109
1110 pConsumers := make([]sarama.PartitionConsumer, 0)
1111 for _, partition := range partitionList {
1112 var pConsumer sarama.PartitionConsumer
1113 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1114 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1115 return nil, err
1116 }
1117 pConsumers = append(pConsumers, pConsumer)
1118 }
1119 return pConsumers, nil
1120}
1121
1122func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1123 var i int
1124 var channel chan *ic.InterContainerMessage
1125 for i, channel = range channels {
1126 if channel == ch {
1127 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1128 close(channel)
1129 logger.Debug("channel-closed")
1130 return channels[:len(channels)-1]
1131 }
1132 }
1133 return channels
1134}
1135
1136func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1137 sc.lockOfGroupConsumers.Lock()
1138 defer sc.lockOfGroupConsumers.Unlock()
1139 if _, exist := sc.groupConsumers[topic]; !exist {
1140 sc.groupConsumers[topic] = consumer
1141 }
1142}
1143
1144func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1145 sc.lockOfGroupConsumers.Lock()
1146 defer sc.lockOfGroupConsumers.Unlock()
1147 if _, exist := sc.groupConsumers[topic]; exist {
1148 consumer := sc.groupConsumers[topic]
1149 delete(sc.groupConsumers, topic)
1150 if err := consumer.Close(); err != nil {
1151 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
1152 return err
1153 }
1154 }
1155 return nil
1156}