blob: ff521a734f3888ffd1efc919c06d6b6520da2755 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080019 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070020 "errors"
21 "fmt"
22 "github.com/Shopify/sarama"
23 scc "github.com/bsm/sarama-cluster"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080024 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070025 "github.com/golang/protobuf/proto"
26 "github.com/google/uuid"
Scott Bakerce767002019-10-23 13:30:24 -070027 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Bakerf1b096c2019-11-01 12:36:30 -070028 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070029 "strings"
30 "sync"
31 "time"
32)
33
34func init() {
35 log.AddPackage(log.JSON, log.DebugLevel, nil)
36}
37
38type returnErrorFunction func() error
39
40// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
41// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
42//consumer or a group consumer
43type consumerChannels struct {
44 consumers []interface{}
45 channels []chan *ic.InterContainerMessage
46}
47
48// SaramaClient represents the messaging proxy
49type SaramaClient struct {
50 cAdmin sarama.ClusterAdmin
51 client sarama.Client
52 KafkaHost string
53 KafkaPort int
54 producer sarama.AsyncProducer
55 consumer sarama.Consumer
56 groupConsumers map[string]*scc.Consumer
57 lockOfGroupConsumers sync.RWMutex
58 consumerGroupPrefix string
59 consumerType int
60 consumerGroupName string
61 producerFlushFrequency int
62 producerFlushMessages int
63 producerFlushMaxmessages int
64 producerRetryMax int
65 producerRetryBackOff time.Duration
66 producerReturnSuccess bool
67 producerReturnErrors bool
68 consumerMaxwait int
69 maxProcessingTime int
70 numPartitions int
71 numReplicas int
72 autoCreateTopic bool
73 doneCh chan int
74 topicToConsumerChannelMap map[string]*consumerChannels
75 lockTopicToConsumerChannelMap sync.RWMutex
76 topicLockMap map[string]*sync.RWMutex
77 lockOfTopicLockMap sync.RWMutex
78 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070079 alive bool
80 liveness chan bool
81 livenessChannelInterval time.Duration
82 lastLivenessTime time.Time
83 started bool
Scott Baker2c1c4822019-10-16 11:02:41 -070084}
85
86type SaramaClientOption func(*SaramaClient)
87
88func Host(host string) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaHost = host
91 }
92}
93
94func Port(port int) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.KafkaPort = port
97 }
98}
99
100func ConsumerGroupPrefix(prefix string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupPrefix = prefix
103 }
104}
105
106func ConsumerGroupName(name string) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerGroupName = name
109 }
110}
111
112func ConsumerType(consumer int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.consumerType = consumer
115 }
116}
117
118func ProducerFlushFrequency(frequency int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushFrequency = frequency
121 }
122}
123
124func ProducerFlushMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMessages = num
127 }
128}
129
130func ProducerFlushMaxMessages(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerFlushMaxmessages = num
133 }
134}
135
136func ProducerMaxRetries(num int) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryMax = num
139 }
140}
141
142func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerRetryBackOff = duration
145 }
146}
147
148func ProducerReturnOnErrors(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.producerReturnErrors = opt
151 }
152}
153
154func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.producerReturnSuccess = opt
157 }
158}
159
160func ConsumerMaxWait(wait int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.consumerMaxwait = wait
163 }
164}
165
166func MaxProcessingTime(pTime int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.maxProcessingTime = pTime
169 }
170}
171
172func NumPartitions(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numPartitions = number
175 }
176}
177
178func NumReplicas(number int) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.numReplicas = number
181 }
182}
183
184func AutoCreateTopic(opt bool) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.autoCreateTopic = opt
187 }
188}
189
190func MetadatMaxRetries(retry int) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.metadataMaxRetry = retry
193 }
194}
195
Scott Baker104b67d2019-10-29 15:56:27 -0700196func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
197 return func(args *SaramaClient) {
198 args.livenessChannelInterval = opt
199 }
200}
201
Scott Baker2c1c4822019-10-16 11:02:41 -0700202func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
203 client := &SaramaClient{
204 KafkaHost: DefaultKafkaHost,
205 KafkaPort: DefaultKafkaPort,
206 }
207 client.consumerType = DefaultConsumerType
208 client.producerFlushFrequency = DefaultProducerFlushFrequency
209 client.producerFlushMessages = DefaultProducerFlushMessages
210 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
211 client.producerReturnErrors = DefaultProducerReturnErrors
212 client.producerReturnSuccess = DefaultProducerReturnSuccess
213 client.producerRetryMax = DefaultProducerRetryMax
214 client.producerRetryBackOff = DefaultProducerRetryBackoff
215 client.consumerMaxwait = DefaultConsumerMaxwait
216 client.maxProcessingTime = DefaultMaxProcessingTime
217 client.numPartitions = DefaultNumberPartitions
218 client.numReplicas = DefaultNumberReplicas
219 client.autoCreateTopic = DefaultAutoCreateTopic
220 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700221 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700222
223 for _, option := range opts {
224 option(client)
225 }
226
227 client.groupConsumers = make(map[string]*scc.Consumer)
228
229 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
230 client.topicLockMap = make(map[string]*sync.RWMutex)
231 client.lockOfTopicLockMap = sync.RWMutex{}
232 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700233
234 // alive until proven otherwise
235 client.alive = true
236
Scott Baker2c1c4822019-10-16 11:02:41 -0700237 return client
238}
239
240func (sc *SaramaClient) Start() error {
241 log.Info("Starting-kafka-sarama-client")
242
243 // Create the Done channel
244 sc.doneCh = make(chan int, 1)
245
246 var err error
247
248 // Add a cleanup in case of failure to startup
249 defer func() {
250 if err != nil {
251 sc.Stop()
252 }
253 }()
254
255 // Create the Cluster Admin
256 if err = sc.createClusterAdmin(); err != nil {
257 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
258 return err
259 }
260
261 // Create the Publisher
262 if err := sc.createPublisher(); err != nil {
263 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
264 return err
265 }
266
267 if sc.consumerType == DefaultConsumerType {
268 // Create the master consumers
269 if err := sc.createConsumer(); err != nil {
270 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
271 return err
272 }
273 }
274
275 // Create the topic to consumers/channel map
276 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
277
278 log.Info("kafka-sarama-client-started")
279
Scott Baker104b67d2019-10-29 15:56:27 -0700280 sc.started = true
281
Scott Baker2c1c4822019-10-16 11:02:41 -0700282 return nil
283}
284
285func (sc *SaramaClient) Stop() {
286 log.Info("stopping-sarama-client")
287
Scott Baker104b67d2019-10-29 15:56:27 -0700288 sc.started = false
289
Scott Baker2c1c4822019-10-16 11:02:41 -0700290 //Send a message over the done channel to close all long running routines
291 sc.doneCh <- 1
292
293 if sc.producer != nil {
294 if err := sc.producer.Close(); err != nil {
295 log.Errorw("closing-producer-failed", log.Fields{"error": err})
296 }
297 }
298
299 if sc.consumer != nil {
300 if err := sc.consumer.Close(); err != nil {
301 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
302 }
303 }
304
305 for key, val := range sc.groupConsumers {
306 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
307 if err := val.Close(); err != nil {
308 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
309 }
310 }
311
312 if sc.cAdmin != nil {
313 if err := sc.cAdmin.Close(); err != nil {
314 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
315 }
316 }
317
318 //TODO: Clear the consumers map
319 //sc.clearConsumerChannelMap()
320
321 log.Info("sarama-client-stopped")
322}
323
324//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
325// the invoking function must hold the lock
326func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
327 // Set the topic details
328 topicDetail := &sarama.TopicDetail{}
329 topicDetail.NumPartitions = int32(numPartition)
330 topicDetail.ReplicationFactor = int16(repFactor)
331 topicDetail.ConfigEntries = make(map[string]*string)
332 topicDetails := make(map[string]*sarama.TopicDetail)
333 topicDetails[topic.Name] = topicDetail
334
335 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
336 if err == sarama.ErrTopicAlreadyExists {
337 // Not an error
338 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
339 return nil
340 }
341 log.Errorw("create-topic-failure", log.Fields{"error": err})
342 return err
343 }
344 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
345 // do so.
346 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
347 return nil
348}
349
350//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
351// ensure no two go routines are performing operations on the same topic
352func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
353 sc.lockTopic(topic)
354 defer sc.unLockTopic(topic)
355
356 return sc.createTopic(topic, numPartition, repFactor)
357}
358
359//DeleteTopic removes a topic from the kafka Broker
360func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
361 sc.lockTopic(topic)
362 defer sc.unLockTopic(topic)
363
364 // Remove the topic from the broker
365 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
366 if err == sarama.ErrUnknownTopicOrPartition {
367 // Not an error as does not exist
368 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
369 return nil
370 }
371 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
372 return err
373 }
374
375 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
376 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
377 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
378 return err
379 }
380 return nil
381}
382
383// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
384// messages from that topic
385func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
386 sc.lockTopic(topic)
387 defer sc.unLockTopic(topic)
388
389 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
390
391 // If a consumers already exist for that topic then resuse it
392 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
393 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
394 // Create a channel specific for that consumers and add it to the consumers channel map
395 ch := make(chan *ic.InterContainerMessage)
396 sc.addChannelToConsumerChannelMap(topic, ch)
397 return ch, nil
398 }
399
400 // Register for the topic and set it up
401 var consumerListeningChannel chan *ic.InterContainerMessage
402 var err error
403
404 // Use the consumerType option to figure out the type of consumer to launch
405 if sc.consumerType == PartitionConsumer {
406 if sc.autoCreateTopic {
407 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
408 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
409 return nil, err
410 }
411 }
412 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
413 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
414 return nil, err
415 }
416 } else if sc.consumerType == GroupCustomer {
417 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
418 // does not consume from a precreated topic in some scenarios
419 //if sc.autoCreateTopic {
420 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
421 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
422 // return nil, err
423 // }
424 //}
425 //groupId := sc.consumerGroupName
426 groupId := getGroupId(kvArgs...)
427 // Include the group prefix
428 if groupId != "" {
429 groupId = sc.consumerGroupPrefix + groupId
430 } else {
431 // Need to use a unique group Id per topic
432 groupId = sc.consumerGroupPrefix + topic.Name
433 }
434 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
435 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
436 return nil, err
437 }
438
439 } else {
440 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
441 return nil, errors.New("unknown-consumer-type")
442 }
443
444 return consumerListeningChannel, nil
445}
446
447//UnSubscribe unsubscribe a consumer from a given topic
448func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
449 sc.lockTopic(topic)
450 defer sc.unLockTopic(topic)
451
452 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
453 var err error
454 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
455 log.Errorw("failed-removing-channel", log.Fields{"error": err})
456 }
457 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
458 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
459 }
460 return err
461}
462
Scott Baker104b67d2019-10-29 15:56:27 -0700463func (sc *SaramaClient) updateLiveness(alive bool) {
464 // Post a consistent stream of liveness data to the channel,
465 // so that in a live state, the core does not timeout and
466 // send a forced liveness message. Production of liveness
467 // events to the channel is rate-limited by livenessChannelInterval.
468 if sc.liveness != nil {
469 if sc.alive != alive {
470 log.Info("update-liveness-channel-because-change")
471 sc.liveness <- alive
472 sc.lastLivenessTime = time.Now()
473 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
474 log.Info("update-liveness-channel-because-interval")
475 sc.liveness <- alive
476 sc.lastLivenessTime = time.Now()
477 }
478 }
479
480 // Only emit a log message when the state changes
481 if sc.alive != alive {
482 log.Info("set-client-alive", log.Fields{"alive": alive})
483 sc.alive = alive
484 }
485}
486
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800487func (sc *SaramaClient) isLivenessError(err error) bool {
488 // Sarama producers and consumers encapsulate the error inside
489 // a ProducerError or ConsumerError struct.
490 if prodError, ok := err.(*sarama.ProducerError); ok {
491 err = prodError.Err
492 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
493 err = consumerError.Err
494 }
495
496 // Sarama-Cluster will compose the error into a ClusterError struct,
497 // which we can't do a compare by reference. To handle that, we the
498 // best we can do is compare the error strings.
499
500 switch err.Error() {
501 case context.DeadlineExceeded.Error():
502 log.Info("is-liveness-error-timeout")
503 return true
504 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
505 log.Info("is-liveness-error-no-brokers")
506 return true
507 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
508 log.Info("is-liveness-error-shutting-down")
509 return true
510 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
511 log.Info("is-liveness-error-not-available")
512 return true
513 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
514 log.Info("is-liveness-error-circuit-breaker-open")
515 return true
516 }
517
518 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
519 log.Info("is-liveness-error-connection-refused")
520 return true
521 }
522
523 // Other errors shouldn't trigger a loss of liveness
524
525 log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
526
527 return false
528}
529
Scott Baker2c1c4822019-10-16 11:02:41 -0700530// send formats and sends the request onto the kafka messaging bus.
531func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
532
533 // Assert message is a proto message
534 var protoMsg proto.Message
535 var ok bool
536 // ascertain the value interface type is a proto.Message
537 if protoMsg, ok = msg.(proto.Message); !ok {
538 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
539 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
540 }
541
542 var marshalled []byte
543 var err error
544 // Create the Sarama producer message
545 if marshalled, err = proto.Marshal(protoMsg); err != nil {
546 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
547 return err
548 }
549 key := ""
550 if len(keys) > 0 {
551 key = keys[0] // Only the first key is relevant
552 }
553 kafkaMsg := &sarama.ProducerMessage{
554 Topic: topic.Name,
555 Key: sarama.StringEncoder(key),
556 Value: sarama.ByteEncoder(marshalled),
557 }
558
559 // Send message to kafka
560 sc.producer.Input() <- kafkaMsg
561 // Wait for result
562 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
563 select {
564 case ok := <-sc.producer.Successes():
565 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700566 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700567 case notOk := <-sc.producer.Errors():
568 log.Debugw("error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800569 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700570 sc.updateLiveness(false)
571 }
572 return notOk
573 }
574 return nil
575}
576
577// Enable the liveness monitor channel. This channel will report
578// a "true" or "false" on every publish, which indicates whether
579// or not the channel is still live. This channel is then picked up
580// by the service (i.e. rw_core / ro_core) to update readiness status
581// and/or take other actions.
582func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
583 log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
584 if enable {
585 if sc.liveness == nil {
586 log.Info("kafka-create-liveness-channel")
587 // At least 1, so we can immediately post to it without blocking
588 // Setting a bigger number (10) allows the monitor to fall behind
589 // without blocking others. The monitor shouldn't really fall
590 // behind...
591 sc.liveness = make(chan bool, 10)
592 // post intial state to the channel
593 sc.liveness <- sc.alive
594 }
595 } else {
596 // TODO: Think about whether we need the ability to turn off
597 // liveness monitoring
598 panic("Turning off liveness reporting is not supported")
599 }
600 return sc.liveness
601}
602
603// send an empty message on the liveness channel to check whether connectivity has
604// been restored.
605func (sc *SaramaClient) SendLiveness() error {
606 if !sc.started {
607 return fmt.Errorf("SendLiveness() called while not started")
608 }
609
610 kafkaMsg := &sarama.ProducerMessage{
611 Topic: "_liveness_test",
612 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
613 }
614
615 // Send message to kafka
616 sc.producer.Input() <- kafkaMsg
617 // Wait for result
618 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
619 select {
620 case ok := <-sc.producer.Successes():
621 log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
622 sc.updateLiveness(true)
623 case notOk := <-sc.producer.Errors():
624 log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800625 if sc.isLivenessError(notOk) {
Scott Baker104b67d2019-10-29 15:56:27 -0700626 sc.updateLiveness(false)
627 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700628 return notOk
629 }
630 return nil
631}
632
633// getGroupId returns the group id from the key-value args.
634func getGroupId(kvArgs ...*KVArg) string {
635 for _, arg := range kvArgs {
636 if arg.Key == GroupIdKey {
637 return arg.Value.(string)
638 }
639 }
640 return ""
641}
642
643// getOffset returns the offset from the key-value args.
644func getOffset(kvArgs ...*KVArg) int64 {
645 for _, arg := range kvArgs {
646 if arg.Key == Offset {
647 return arg.Value.(int64)
648 }
649 }
650 return sarama.OffsetNewest
651}
652
653func (sc *SaramaClient) createClusterAdmin() error {
654 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
655 config := sarama.NewConfig()
656 config.Version = sarama.V1_0_0_0
657
658 // Create a cluster Admin
659 var cAdmin sarama.ClusterAdmin
660 var err error
661 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
662 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
663 return err
664 }
665 sc.cAdmin = cAdmin
666 return nil
667}
668
669func (sc *SaramaClient) lockTopic(topic *Topic) {
670 sc.lockOfTopicLockMap.Lock()
671 if _, exist := sc.topicLockMap[topic.Name]; exist {
672 sc.lockOfTopicLockMap.Unlock()
673 sc.topicLockMap[topic.Name].Lock()
674 } else {
675 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
676 sc.lockOfTopicLockMap.Unlock()
677 sc.topicLockMap[topic.Name].Lock()
678 }
679}
680
681func (sc *SaramaClient) unLockTopic(topic *Topic) {
682 sc.lockOfTopicLockMap.Lock()
683 defer sc.lockOfTopicLockMap.Unlock()
684 if _, exist := sc.topicLockMap[topic.Name]; exist {
685 sc.topicLockMap[topic.Name].Unlock()
686 }
687}
688
689func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
690 sc.lockTopicToConsumerChannelMap.Lock()
691 defer sc.lockTopicToConsumerChannelMap.Unlock()
692 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
693 sc.topicToConsumerChannelMap[id] = arg
694 }
695}
696
697func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
698 sc.lockTopicToConsumerChannelMap.Lock()
699 defer sc.lockTopicToConsumerChannelMap.Unlock()
700 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
701 delete(sc.topicToConsumerChannelMap, id)
702 }
703}
704
705func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
706 sc.lockTopicToConsumerChannelMap.RLock()
707 defer sc.lockTopicToConsumerChannelMap.RUnlock()
708
709 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
710 return consumerCh
711 }
712 return nil
713}
714
715func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
716 sc.lockTopicToConsumerChannelMap.Lock()
717 defer sc.lockTopicToConsumerChannelMap.Unlock()
718 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
719 consumerCh.channels = append(consumerCh.channels, ch)
720 return
721 }
722 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
723}
724
725//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
726func closeConsumers(consumers []interface{}) error {
727 var err error
728 for _, consumer := range consumers {
729 // Is it a partition consumers?
730 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
731 if errTemp := partionConsumer.Close(); errTemp != nil {
732 log.Debugw("partition!!!", log.Fields{"err": errTemp})
733 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
734 // This can occur on race condition
735 err = nil
736 } else {
737 err = errTemp
738 }
739 }
740 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
741 if errTemp := groupConsumer.Close(); errTemp != nil {
742 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
743 // This can occur on race condition
744 err = nil
745 } else {
746 err = errTemp
747 }
748 }
749 }
750 }
751 return err
752}
753
754func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
755 sc.lockTopicToConsumerChannelMap.Lock()
756 defer sc.lockTopicToConsumerChannelMap.Unlock()
757 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
758 // Channel will be closed in the removeChannel method
759 consumerCh.channels = removeChannel(consumerCh.channels, ch)
760 // If there are no more channels then we can close the consumers itself
761 if len(consumerCh.channels) == 0 {
762 log.Debugw("closing-consumers", log.Fields{"topic": topic})
763 err := closeConsumers(consumerCh.consumers)
764 //err := consumerCh.consumers.Close()
765 delete(sc.topicToConsumerChannelMap, topic.Name)
766 return err
767 }
768 return nil
769 }
770 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
771 return errors.New("topic-does-not-exist")
772}
773
774func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
775 sc.lockTopicToConsumerChannelMap.Lock()
776 defer sc.lockTopicToConsumerChannelMap.Unlock()
777 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
778 for _, ch := range consumerCh.channels {
779 // Channel will be closed in the removeChannel method
780 removeChannel(consumerCh.channels, ch)
781 }
782 err := closeConsumers(consumerCh.consumers)
783 //if err == sarama.ErrUnknownTopicOrPartition {
784 // // Not an error
785 // err = nil
786 //}
787 //err := consumerCh.consumers.Close()
788 delete(sc.topicToConsumerChannelMap, topic.Name)
789 return err
790 }
791 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
792 return nil
793}
794
795func (sc *SaramaClient) clearConsumerChannelMap() error {
796 sc.lockTopicToConsumerChannelMap.Lock()
797 defer sc.lockTopicToConsumerChannelMap.Unlock()
798 var err error
799 for topic, consumerCh := range sc.topicToConsumerChannelMap {
800 for _, ch := range consumerCh.channels {
801 // Channel will be closed in the removeChannel method
802 removeChannel(consumerCh.channels, ch)
803 }
804 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
805 err = errTemp
806 }
807 //err = consumerCh.consumers.Close()
808 delete(sc.topicToConsumerChannelMap, topic)
809 }
810 return err
811}
812
813//createPublisher creates the publisher which is used to send a message onto kafka
814func (sc *SaramaClient) createPublisher() error {
815 // This Creates the publisher
816 config := sarama.NewConfig()
817 config.Producer.Partitioner = sarama.NewRandomPartitioner
818 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
819 config.Producer.Flush.Messages = sc.producerFlushMessages
820 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
821 config.Producer.Return.Errors = sc.producerReturnErrors
822 config.Producer.Return.Successes = sc.producerReturnSuccess
823 //config.Producer.RequiredAcks = sarama.WaitForAll
824 config.Producer.RequiredAcks = sarama.WaitForLocal
825
826 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
827 brokers := []string{kafkaFullAddr}
828
829 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
830 log.Errorw("error-starting-publisher", log.Fields{"error": err})
831 return err
832 } else {
833 sc.producer = producer
834 }
835 log.Info("Kafka-publisher-created")
836 return nil
837}
838
839func (sc *SaramaClient) createConsumer() error {
840 config := sarama.NewConfig()
841 config.Consumer.Return.Errors = true
842 config.Consumer.Fetch.Min = 1
843 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
844 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
845 config.Consumer.Offsets.Initial = sarama.OffsetNewest
846 config.Metadata.Retry.Max = sc.metadataMaxRetry
847 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
848 brokers := []string{kafkaFullAddr}
849
850 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
851 log.Errorw("error-starting-consumers", log.Fields{"error": err})
852 return err
853 } else {
854 sc.consumer = consumer
855 }
856 log.Info("Kafka-consumers-created")
857 return nil
858}
859
860// createGroupConsumer creates a consumers group
861func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
862 config := scc.NewConfig()
863 config.ClientID = uuid.New().String()
864 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700865 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
866 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700867 //config.Group.Return.Notifications = false
868 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
869 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
870 config.Consumer.Offsets.Initial = initialOffset
871 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
872 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
873 brokers := []string{kafkaFullAddr}
874
875 topics := []string{topic.Name}
876 var consumer *scc.Consumer
877 var err error
878
879 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
880 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
881 return nil, err
882 }
883 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
884
885 //sc.groupConsumers[topic.Name] = consumer
886 sc.addToGroupConsumers(topic.Name, consumer)
887 return consumer, nil
888}
889
890// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
891// topic via the unique channel each subscriber received during subscription
892func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
893 // Need to go over all channels and publish messages to them - do we need to copy msg?
894 sc.lockTopicToConsumerChannelMap.RLock()
895 defer sc.lockTopicToConsumerChannelMap.RUnlock()
896 for _, ch := range consumerCh.channels {
897 go func(c chan *ic.InterContainerMessage) {
898 c <- protoMessage
899 }(ch)
900 }
901}
902
903func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
904 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
905startloop:
906 for {
907 select {
908 case err, ok := <-consumer.Errors():
909 if ok {
cbabud4978652019-12-04 08:04:21 +0100910 if sc.isLivenessError(err) {
911 sc.updateLiveness(false)
912 log.Warnw("partition-consumers-error", log.Fields{"error": err})
913 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700914 } else {
915 // Channel is closed
916 break startloop
917 }
918 case msg, ok := <-consumer.Messages():
919 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
920 if !ok {
921 // channel is closed
922 break startloop
923 }
924 msgBody := msg.Value
cbabud4978652019-12-04 08:04:21 +0100925 sc.updateLiveness(true)
926 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700927 icm := &ic.InterContainerMessage{}
928 if err := proto.Unmarshal(msgBody, icm); err != nil {
929 log.Warnw("partition-invalid-message", log.Fields{"error": err})
930 continue
931 }
932 go sc.dispatchToConsumers(consumerChnls, icm)
933 case <-sc.doneCh:
934 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
935 break startloop
936 }
937 }
938 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
939}
940
941func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
942 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
943
944startloop:
945 for {
946 select {
947 case err, ok := <-consumer.Errors():
948 if ok {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800949 if sc.isLivenessError(err) {
950 sc.updateLiveness(false)
951 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700952 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
953 } else {
Scott Baker104b67d2019-10-29 15:56:27 -0700954 log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700955 // channel is closed
956 break startloop
957 }
958 case msg, ok := <-consumer.Messages():
959 if !ok {
Scott Baker104b67d2019-10-29 15:56:27 -0700960 log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700961 // Channel closed
962 break startloop
963 }
Scott Baker104b67d2019-10-29 15:56:27 -0700964 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700965 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
966 msgBody := msg.Value
967 icm := &ic.InterContainerMessage{}
968 if err := proto.Unmarshal(msgBody, icm); err != nil {
969 log.Warnw("invalid-message", log.Fields{"error": err})
970 continue
971 }
972 go sc.dispatchToConsumers(consumerChnls, icm)
973 consumer.MarkOffset(msg, "")
974 case ntf := <-consumer.Notifications():
975 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
976 case <-sc.doneCh:
977 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
978 break startloop
979 }
980 }
981 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
982}
983
984func (sc *SaramaClient) startConsumers(topic *Topic) error {
985 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
986 var consumerCh *consumerChannels
987 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
988 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
989 return errors.New("consumers-not-exist")
990 }
991 // For each consumer listening for that topic, start a consumption loop
992 for _, consumer := range consumerCh.consumers {
993 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
994 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
995 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
996 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
997 } else {
998 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
999 return errors.New("invalid-consumer")
1000 }
1001 }
1002 return nil
1003}
1004
1005//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1006//// for that topic. It also starts the routine that listens for messages on that topic.
1007func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1008 var pConsumers []sarama.PartitionConsumer
1009 var err error
1010
1011 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
1012 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1013 return nil, err
1014 }
1015
1016 consumersIf := make([]interface{}, 0)
1017 for _, pConsumer := range pConsumers {
1018 consumersIf = append(consumersIf, pConsumer)
1019 }
1020
1021 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1022 // unbuffered to verify race conditions.
1023 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1024 cc := &consumerChannels{
1025 consumers: consumersIf,
1026 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1027 }
1028
1029 // Add the consumers channel to the map
1030 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1031
1032 //Start a consumers to listen on that specific topic
1033 go sc.startConsumers(topic)
1034
1035 return consumerListeningChannel, nil
1036}
1037
1038// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1039// for that topic. It also starts the routine that listens for messages on that topic.
1040func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
1041 // TODO: Replace this development partition consumers with a group consumers
1042 var pConsumer *scc.Consumer
1043 var err error
1044 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1045 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1046 return nil, err
1047 }
1048 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1049 // unbuffered to verify race conditions.
1050 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1051 cc := &consumerChannels{
1052 consumers: []interface{}{pConsumer},
1053 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1054 }
1055
1056 // Add the consumers channel to the map
1057 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1058
1059 //Start a consumers to listen on that specific topic
1060 go sc.startConsumers(topic)
1061
1062 return consumerListeningChannel, nil
1063}
1064
1065func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1066 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
1067 partitionList, err := sc.consumer.Partitions(topic.Name)
1068 if err != nil {
1069 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1070 return nil, err
1071 }
1072
1073 pConsumers := make([]sarama.PartitionConsumer, 0)
1074 for _, partition := range partitionList {
1075 var pConsumer sarama.PartitionConsumer
1076 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1077 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1078 return nil, err
1079 }
1080 pConsumers = append(pConsumers, pConsumer)
1081 }
1082 return pConsumers, nil
1083}
1084
1085func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1086 var i int
1087 var channel chan *ic.InterContainerMessage
1088 for i, channel = range channels {
1089 if channel == ch {
1090 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1091 close(channel)
1092 log.Debug("channel-closed")
1093 return channels[:len(channels)-1]
1094 }
1095 }
1096 return channels
1097}
1098
1099func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1100 sc.lockOfGroupConsumers.Lock()
1101 defer sc.lockOfGroupConsumers.Unlock()
1102 if _, exist := sc.groupConsumers[topic]; !exist {
1103 sc.groupConsumers[topic] = consumer
1104 }
1105}
1106
1107func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1108 sc.lockOfGroupConsumers.Lock()
1109 defer sc.lockOfGroupConsumers.Unlock()
1110 if _, exist := sc.groupConsumers[topic]; exist {
1111 consumer := sc.groupConsumers[topic]
1112 delete(sc.groupConsumers, topic)
1113 if err := consumer.Close(); err != nil {
1114 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
1115 return err
1116 }
1117 }
1118 return nil
1119}