blob: a251c566a7c59b800967d77dd07ec30d66ab2556 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
21 "github.com/Shopify/sarama"
22 scc "github.com/bsm/sarama-cluster"
23 "github.com/golang/protobuf/proto"
24 "github.com/google/uuid"
Scott Bakerce767002019-10-23 13:30:24 -070025 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Bakerf1b096c2019-11-01 12:36:30 -070026 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070027 "strings"
28 "sync"
29 "time"
30)
31
32func init() {
33 log.AddPackage(log.JSON, log.DebugLevel, nil)
34}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
41type consumerChannels struct {
42 consumers []interface{}
43 channels []chan *ic.InterContainerMessage
44}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
48 cAdmin sarama.ClusterAdmin
49 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumers map[string]*scc.Consumer
55 lockOfGroupConsumers sync.RWMutex
56 consumerGroupPrefix string
57 consumerType int
58 consumerGroupName string
59 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
71 doneCh chan int
72 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
74 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
76 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070077 alive bool
78 liveness chan bool
79 livenessChannelInterval time.Duration
80 lastLivenessTime time.Time
81 started bool
Scott Baker2c1c4822019-10-16 11:02:41 -070082}
83
84type SaramaClientOption func(*SaramaClient)
85
86func Host(host string) SaramaClientOption {
87 return func(args *SaramaClient) {
88 args.KafkaHost = host
89 }
90}
91
92func Port(port int) SaramaClientOption {
93 return func(args *SaramaClient) {
94 args.KafkaPort = port
95 }
96}
97
98func ConsumerGroupPrefix(prefix string) SaramaClientOption {
99 return func(args *SaramaClient) {
100 args.consumerGroupPrefix = prefix
101 }
102}
103
104func ConsumerGroupName(name string) SaramaClientOption {
105 return func(args *SaramaClient) {
106 args.consumerGroupName = name
107 }
108}
109
110func ConsumerType(consumer int) SaramaClientOption {
111 return func(args *SaramaClient) {
112 args.consumerType = consumer
113 }
114}
115
116func ProducerFlushFrequency(frequency int) SaramaClientOption {
117 return func(args *SaramaClient) {
118 args.producerFlushFrequency = frequency
119 }
120}
121
122func ProducerFlushMessages(num int) SaramaClientOption {
123 return func(args *SaramaClient) {
124 args.producerFlushMessages = num
125 }
126}
127
128func ProducerFlushMaxMessages(num int) SaramaClientOption {
129 return func(args *SaramaClient) {
130 args.producerFlushMaxmessages = num
131 }
132}
133
134func ProducerMaxRetries(num int) SaramaClientOption {
135 return func(args *SaramaClient) {
136 args.producerRetryMax = num
137 }
138}
139
140func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
141 return func(args *SaramaClient) {
142 args.producerRetryBackOff = duration
143 }
144}
145
146func ProducerReturnOnErrors(opt bool) SaramaClientOption {
147 return func(args *SaramaClient) {
148 args.producerReturnErrors = opt
149 }
150}
151
152func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
153 return func(args *SaramaClient) {
154 args.producerReturnSuccess = opt
155 }
156}
157
158func ConsumerMaxWait(wait int) SaramaClientOption {
159 return func(args *SaramaClient) {
160 args.consumerMaxwait = wait
161 }
162}
163
164func MaxProcessingTime(pTime int) SaramaClientOption {
165 return func(args *SaramaClient) {
166 args.maxProcessingTime = pTime
167 }
168}
169
170func NumPartitions(number int) SaramaClientOption {
171 return func(args *SaramaClient) {
172 args.numPartitions = number
173 }
174}
175
176func NumReplicas(number int) SaramaClientOption {
177 return func(args *SaramaClient) {
178 args.numReplicas = number
179 }
180}
181
182func AutoCreateTopic(opt bool) SaramaClientOption {
183 return func(args *SaramaClient) {
184 args.autoCreateTopic = opt
185 }
186}
187
188func MetadatMaxRetries(retry int) SaramaClientOption {
189 return func(args *SaramaClient) {
190 args.metadataMaxRetry = retry
191 }
192}
193
Scott Baker104b67d2019-10-29 15:56:27 -0700194func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
195 return func(args *SaramaClient) {
196 args.livenessChannelInterval = opt
197 }
198}
199
Scott Baker2c1c4822019-10-16 11:02:41 -0700200func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
201 client := &SaramaClient{
202 KafkaHost: DefaultKafkaHost,
203 KafkaPort: DefaultKafkaPort,
204 }
205 client.consumerType = DefaultConsumerType
206 client.producerFlushFrequency = DefaultProducerFlushFrequency
207 client.producerFlushMessages = DefaultProducerFlushMessages
208 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
209 client.producerReturnErrors = DefaultProducerReturnErrors
210 client.producerReturnSuccess = DefaultProducerReturnSuccess
211 client.producerRetryMax = DefaultProducerRetryMax
212 client.producerRetryBackOff = DefaultProducerRetryBackoff
213 client.consumerMaxwait = DefaultConsumerMaxwait
214 client.maxProcessingTime = DefaultMaxProcessingTime
215 client.numPartitions = DefaultNumberPartitions
216 client.numReplicas = DefaultNumberReplicas
217 client.autoCreateTopic = DefaultAutoCreateTopic
218 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700219 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700220
221 for _, option := range opts {
222 option(client)
223 }
224
225 client.groupConsumers = make(map[string]*scc.Consumer)
226
227 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
228 client.topicLockMap = make(map[string]*sync.RWMutex)
229 client.lockOfTopicLockMap = sync.RWMutex{}
230 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700231
232 // alive until proven otherwise
233 client.alive = true
234
Scott Baker2c1c4822019-10-16 11:02:41 -0700235 return client
236}
237
238func (sc *SaramaClient) Start() error {
239 log.Info("Starting-kafka-sarama-client")
240
241 // Create the Done channel
242 sc.doneCh = make(chan int, 1)
243
244 var err error
245
246 // Add a cleanup in case of failure to startup
247 defer func() {
248 if err != nil {
249 sc.Stop()
250 }
251 }()
252
253 // Create the Cluster Admin
254 if err = sc.createClusterAdmin(); err != nil {
255 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
256 return err
257 }
258
259 // Create the Publisher
260 if err := sc.createPublisher(); err != nil {
261 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
262 return err
263 }
264
265 if sc.consumerType == DefaultConsumerType {
266 // Create the master consumers
267 if err := sc.createConsumer(); err != nil {
268 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
269 return err
270 }
271 }
272
273 // Create the topic to consumers/channel map
274 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
275
276 log.Info("kafka-sarama-client-started")
277
Scott Baker104b67d2019-10-29 15:56:27 -0700278 sc.started = true
279
Scott Baker2c1c4822019-10-16 11:02:41 -0700280 return nil
281}
282
283func (sc *SaramaClient) Stop() {
284 log.Info("stopping-sarama-client")
285
Scott Baker104b67d2019-10-29 15:56:27 -0700286 sc.started = false
287
Scott Baker2c1c4822019-10-16 11:02:41 -0700288 //Send a message over the done channel to close all long running routines
289 sc.doneCh <- 1
290
291 if sc.producer != nil {
292 if err := sc.producer.Close(); err != nil {
293 log.Errorw("closing-producer-failed", log.Fields{"error": err})
294 }
295 }
296
297 if sc.consumer != nil {
298 if err := sc.consumer.Close(); err != nil {
299 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
300 }
301 }
302
303 for key, val := range sc.groupConsumers {
304 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
305 if err := val.Close(); err != nil {
306 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
307 }
308 }
309
310 if sc.cAdmin != nil {
311 if err := sc.cAdmin.Close(); err != nil {
312 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
313 }
314 }
315
316 //TODO: Clear the consumers map
317 //sc.clearConsumerChannelMap()
318
319 log.Info("sarama-client-stopped")
320}
321
322//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
323// the invoking function must hold the lock
324func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
325 // Set the topic details
326 topicDetail := &sarama.TopicDetail{}
327 topicDetail.NumPartitions = int32(numPartition)
328 topicDetail.ReplicationFactor = int16(repFactor)
329 topicDetail.ConfigEntries = make(map[string]*string)
330 topicDetails := make(map[string]*sarama.TopicDetail)
331 topicDetails[topic.Name] = topicDetail
332
333 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
334 if err == sarama.ErrTopicAlreadyExists {
335 // Not an error
336 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
337 return nil
338 }
339 log.Errorw("create-topic-failure", log.Fields{"error": err})
340 return err
341 }
342 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
343 // do so.
344 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
345 return nil
346}
347
348//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
349// ensure no two go routines are performing operations on the same topic
350func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
351 sc.lockTopic(topic)
352 defer sc.unLockTopic(topic)
353
354 return sc.createTopic(topic, numPartition, repFactor)
355}
356
357//DeleteTopic removes a topic from the kafka Broker
358func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
359 sc.lockTopic(topic)
360 defer sc.unLockTopic(topic)
361
362 // Remove the topic from the broker
363 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
364 if err == sarama.ErrUnknownTopicOrPartition {
365 // Not an error as does not exist
366 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
367 return nil
368 }
369 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
370 return err
371 }
372
373 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
374 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
375 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
376 return err
377 }
378 return nil
379}
380
381// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
382// messages from that topic
383func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
384 sc.lockTopic(topic)
385 defer sc.unLockTopic(topic)
386
387 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
388
389 // If a consumers already exist for that topic then resuse it
390 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
391 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
392 // Create a channel specific for that consumers and add it to the consumers channel map
393 ch := make(chan *ic.InterContainerMessage)
394 sc.addChannelToConsumerChannelMap(topic, ch)
395 return ch, nil
396 }
397
398 // Register for the topic and set it up
399 var consumerListeningChannel chan *ic.InterContainerMessage
400 var err error
401
402 // Use the consumerType option to figure out the type of consumer to launch
403 if sc.consumerType == PartitionConsumer {
404 if sc.autoCreateTopic {
405 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
406 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
407 return nil, err
408 }
409 }
410 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
411 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
412 return nil, err
413 }
414 } else if sc.consumerType == GroupCustomer {
415 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
416 // does not consume from a precreated topic in some scenarios
417 //if sc.autoCreateTopic {
418 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
419 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
420 // return nil, err
421 // }
422 //}
423 //groupId := sc.consumerGroupName
424 groupId := getGroupId(kvArgs...)
425 // Include the group prefix
426 if groupId != "" {
427 groupId = sc.consumerGroupPrefix + groupId
428 } else {
429 // Need to use a unique group Id per topic
430 groupId = sc.consumerGroupPrefix + topic.Name
431 }
432 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
433 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
434 return nil, err
435 }
436
437 } else {
438 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
439 return nil, errors.New("unknown-consumer-type")
440 }
441
442 return consumerListeningChannel, nil
443}
444
445//UnSubscribe unsubscribe a consumer from a given topic
446func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
447 sc.lockTopic(topic)
448 defer sc.unLockTopic(topic)
449
450 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
451 var err error
452 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
453 log.Errorw("failed-removing-channel", log.Fields{"error": err})
454 }
455 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
456 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
457 }
458 return err
459}
460
Scott Baker104b67d2019-10-29 15:56:27 -0700461func (sc *SaramaClient) updateLiveness(alive bool) {
462 // Post a consistent stream of liveness data to the channel,
463 // so that in a live state, the core does not timeout and
464 // send a forced liveness message. Production of liveness
465 // events to the channel is rate-limited by livenessChannelInterval.
466 if sc.liveness != nil {
467 if sc.alive != alive {
468 log.Info("update-liveness-channel-because-change")
469 sc.liveness <- alive
470 sc.lastLivenessTime = time.Now()
471 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
472 log.Info("update-liveness-channel-because-interval")
473 sc.liveness <- alive
474 sc.lastLivenessTime = time.Now()
475 }
476 }
477
478 // Only emit a log message when the state changes
479 if sc.alive != alive {
480 log.Info("set-client-alive", log.Fields{"alive": alive})
481 sc.alive = alive
482 }
483}
484
Scott Baker2c1c4822019-10-16 11:02:41 -0700485// send formats and sends the request onto the kafka messaging bus.
486func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
487
488 // Assert message is a proto message
489 var protoMsg proto.Message
490 var ok bool
491 // ascertain the value interface type is a proto.Message
492 if protoMsg, ok = msg.(proto.Message); !ok {
493 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
494 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
495 }
496
497 var marshalled []byte
498 var err error
499 // Create the Sarama producer message
500 if marshalled, err = proto.Marshal(protoMsg); err != nil {
501 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
502 return err
503 }
504 key := ""
505 if len(keys) > 0 {
506 key = keys[0] // Only the first key is relevant
507 }
508 kafkaMsg := &sarama.ProducerMessage{
509 Topic: topic.Name,
510 Key: sarama.StringEncoder(key),
511 Value: sarama.ByteEncoder(marshalled),
512 }
513
514 // Send message to kafka
515 sc.producer.Input() <- kafkaMsg
516 // Wait for result
517 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
518 select {
519 case ok := <-sc.producer.Successes():
520 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Baker104b67d2019-10-29 15:56:27 -0700521 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700522 case notOk := <-sc.producer.Errors():
523 log.Debugw("error-sending", log.Fields{"status": notOk})
Scott Baker104b67d2019-10-29 15:56:27 -0700524 if strings.Contains(notOk.Error(), "Failed to produce") {
525 sc.updateLiveness(false)
526 }
527 return notOk
528 }
529 return nil
530}
531
532// Enable the liveness monitor channel. This channel will report
533// a "true" or "false" on every publish, which indicates whether
534// or not the channel is still live. This channel is then picked up
535// by the service (i.e. rw_core / ro_core) to update readiness status
536// and/or take other actions.
537func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
538 log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
539 if enable {
540 if sc.liveness == nil {
541 log.Info("kafka-create-liveness-channel")
542 // At least 1, so we can immediately post to it without blocking
543 // Setting a bigger number (10) allows the monitor to fall behind
544 // without blocking others. The monitor shouldn't really fall
545 // behind...
546 sc.liveness = make(chan bool, 10)
547 // post intial state to the channel
548 sc.liveness <- sc.alive
549 }
550 } else {
551 // TODO: Think about whether we need the ability to turn off
552 // liveness monitoring
553 panic("Turning off liveness reporting is not supported")
554 }
555 return sc.liveness
556}
557
558// send an empty message on the liveness channel to check whether connectivity has
559// been restored.
560func (sc *SaramaClient) SendLiveness() error {
561 if !sc.started {
562 return fmt.Errorf("SendLiveness() called while not started")
563 }
564
565 kafkaMsg := &sarama.ProducerMessage{
566 Topic: "_liveness_test",
567 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
568 }
569
570 // Send message to kafka
571 sc.producer.Input() <- kafkaMsg
572 // Wait for result
573 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
574 select {
575 case ok := <-sc.producer.Successes():
576 log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
577 sc.updateLiveness(true)
578 case notOk := <-sc.producer.Errors():
579 log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
580 if strings.Contains(notOk.Error(), "Failed to produce") {
581 sc.updateLiveness(false)
582 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700583 return notOk
584 }
585 return nil
586}
587
588// getGroupId returns the group id from the key-value args.
589func getGroupId(kvArgs ...*KVArg) string {
590 for _, arg := range kvArgs {
591 if arg.Key == GroupIdKey {
592 return arg.Value.(string)
593 }
594 }
595 return ""
596}
597
598// getOffset returns the offset from the key-value args.
599func getOffset(kvArgs ...*KVArg) int64 {
600 for _, arg := range kvArgs {
601 if arg.Key == Offset {
602 return arg.Value.(int64)
603 }
604 }
605 return sarama.OffsetNewest
606}
607
608func (sc *SaramaClient) createClusterAdmin() error {
609 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
610 config := sarama.NewConfig()
611 config.Version = sarama.V1_0_0_0
612
613 // Create a cluster Admin
614 var cAdmin sarama.ClusterAdmin
615 var err error
616 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
617 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
618 return err
619 }
620 sc.cAdmin = cAdmin
621 return nil
622}
623
624func (sc *SaramaClient) lockTopic(topic *Topic) {
625 sc.lockOfTopicLockMap.Lock()
626 if _, exist := sc.topicLockMap[topic.Name]; exist {
627 sc.lockOfTopicLockMap.Unlock()
628 sc.topicLockMap[topic.Name].Lock()
629 } else {
630 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
631 sc.lockOfTopicLockMap.Unlock()
632 sc.topicLockMap[topic.Name].Lock()
633 }
634}
635
636func (sc *SaramaClient) unLockTopic(topic *Topic) {
637 sc.lockOfTopicLockMap.Lock()
638 defer sc.lockOfTopicLockMap.Unlock()
639 if _, exist := sc.topicLockMap[topic.Name]; exist {
640 sc.topicLockMap[topic.Name].Unlock()
641 }
642}
643
644func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
645 sc.lockTopicToConsumerChannelMap.Lock()
646 defer sc.lockTopicToConsumerChannelMap.Unlock()
647 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
648 sc.topicToConsumerChannelMap[id] = arg
649 }
650}
651
652func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
653 sc.lockTopicToConsumerChannelMap.Lock()
654 defer sc.lockTopicToConsumerChannelMap.Unlock()
655 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
656 delete(sc.topicToConsumerChannelMap, id)
657 }
658}
659
660func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
661 sc.lockTopicToConsumerChannelMap.RLock()
662 defer sc.lockTopicToConsumerChannelMap.RUnlock()
663
664 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
665 return consumerCh
666 }
667 return nil
668}
669
670func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
671 sc.lockTopicToConsumerChannelMap.Lock()
672 defer sc.lockTopicToConsumerChannelMap.Unlock()
673 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
674 consumerCh.channels = append(consumerCh.channels, ch)
675 return
676 }
677 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
678}
679
680//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
681func closeConsumers(consumers []interface{}) error {
682 var err error
683 for _, consumer := range consumers {
684 // Is it a partition consumers?
685 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
686 if errTemp := partionConsumer.Close(); errTemp != nil {
687 log.Debugw("partition!!!", log.Fields{"err": errTemp})
688 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
689 // This can occur on race condition
690 err = nil
691 } else {
692 err = errTemp
693 }
694 }
695 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
696 if errTemp := groupConsumer.Close(); errTemp != nil {
697 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
698 // This can occur on race condition
699 err = nil
700 } else {
701 err = errTemp
702 }
703 }
704 }
705 }
706 return err
707}
708
709func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
710 sc.lockTopicToConsumerChannelMap.Lock()
711 defer sc.lockTopicToConsumerChannelMap.Unlock()
712 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
713 // Channel will be closed in the removeChannel method
714 consumerCh.channels = removeChannel(consumerCh.channels, ch)
715 // If there are no more channels then we can close the consumers itself
716 if len(consumerCh.channels) == 0 {
717 log.Debugw("closing-consumers", log.Fields{"topic": topic})
718 err := closeConsumers(consumerCh.consumers)
719 //err := consumerCh.consumers.Close()
720 delete(sc.topicToConsumerChannelMap, topic.Name)
721 return err
722 }
723 return nil
724 }
725 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
726 return errors.New("topic-does-not-exist")
727}
728
729func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
730 sc.lockTopicToConsumerChannelMap.Lock()
731 defer sc.lockTopicToConsumerChannelMap.Unlock()
732 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
733 for _, ch := range consumerCh.channels {
734 // Channel will be closed in the removeChannel method
735 removeChannel(consumerCh.channels, ch)
736 }
737 err := closeConsumers(consumerCh.consumers)
738 //if err == sarama.ErrUnknownTopicOrPartition {
739 // // Not an error
740 // err = nil
741 //}
742 //err := consumerCh.consumers.Close()
743 delete(sc.topicToConsumerChannelMap, topic.Name)
744 return err
745 }
746 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
747 return nil
748}
749
750func (sc *SaramaClient) clearConsumerChannelMap() error {
751 sc.lockTopicToConsumerChannelMap.Lock()
752 defer sc.lockTopicToConsumerChannelMap.Unlock()
753 var err error
754 for topic, consumerCh := range sc.topicToConsumerChannelMap {
755 for _, ch := range consumerCh.channels {
756 // Channel will be closed in the removeChannel method
757 removeChannel(consumerCh.channels, ch)
758 }
759 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
760 err = errTemp
761 }
762 //err = consumerCh.consumers.Close()
763 delete(sc.topicToConsumerChannelMap, topic)
764 }
765 return err
766}
767
768//createPublisher creates the publisher which is used to send a message onto kafka
769func (sc *SaramaClient) createPublisher() error {
770 // This Creates the publisher
771 config := sarama.NewConfig()
772 config.Producer.Partitioner = sarama.NewRandomPartitioner
773 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
774 config.Producer.Flush.Messages = sc.producerFlushMessages
775 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
776 config.Producer.Return.Errors = sc.producerReturnErrors
777 config.Producer.Return.Successes = sc.producerReturnSuccess
778 //config.Producer.RequiredAcks = sarama.WaitForAll
779 config.Producer.RequiredAcks = sarama.WaitForLocal
780
781 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
782 brokers := []string{kafkaFullAddr}
783
784 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
785 log.Errorw("error-starting-publisher", log.Fields{"error": err})
786 return err
787 } else {
788 sc.producer = producer
789 }
790 log.Info("Kafka-publisher-created")
791 return nil
792}
793
794func (sc *SaramaClient) createConsumer() error {
795 config := sarama.NewConfig()
796 config.Consumer.Return.Errors = true
797 config.Consumer.Fetch.Min = 1
798 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
799 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
800 config.Consumer.Offsets.Initial = sarama.OffsetNewest
801 config.Metadata.Retry.Max = sc.metadataMaxRetry
802 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
803 brokers := []string{kafkaFullAddr}
804
805 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
806 log.Errorw("error-starting-consumers", log.Fields{"error": err})
807 return err
808 } else {
809 sc.consumer = consumer
810 }
811 log.Info("Kafka-consumers-created")
812 return nil
813}
814
815// createGroupConsumer creates a consumers group
816func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
817 config := scc.NewConfig()
818 config.ClientID = uuid.New().String()
819 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Baker104b67d2019-10-29 15:56:27 -0700820 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
821 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700822 //config.Group.Return.Notifications = false
823 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
824 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
825 config.Consumer.Offsets.Initial = initialOffset
826 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
827 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
828 brokers := []string{kafkaFullAddr}
829
830 topics := []string{topic.Name}
831 var consumer *scc.Consumer
832 var err error
833
834 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
835 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
836 return nil, err
837 }
838 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
839
840 //sc.groupConsumers[topic.Name] = consumer
841 sc.addToGroupConsumers(topic.Name, consumer)
842 return consumer, nil
843}
844
845// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
846// topic via the unique channel each subscriber received during subscription
847func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
848 // Need to go over all channels and publish messages to them - do we need to copy msg?
849 sc.lockTopicToConsumerChannelMap.RLock()
850 defer sc.lockTopicToConsumerChannelMap.RUnlock()
851 for _, ch := range consumerCh.channels {
852 go func(c chan *ic.InterContainerMessage) {
853 c <- protoMessage
854 }(ch)
855 }
856}
857
858func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
859 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
860startloop:
861 for {
862 select {
863 case err, ok := <-consumer.Errors():
864 if ok {
865 log.Warnw("partition-consumers-error", log.Fields{"error": err})
866 } else {
867 // Channel is closed
868 break startloop
869 }
870 case msg, ok := <-consumer.Messages():
871 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
872 if !ok {
873 // channel is closed
874 break startloop
875 }
876 msgBody := msg.Value
877 icm := &ic.InterContainerMessage{}
878 if err := proto.Unmarshal(msgBody, icm); err != nil {
879 log.Warnw("partition-invalid-message", log.Fields{"error": err})
880 continue
881 }
882 go sc.dispatchToConsumers(consumerChnls, icm)
883 case <-sc.doneCh:
884 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
885 break startloop
886 }
887 }
888 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
889}
890
891func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
892 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
893
894startloop:
895 for {
896 select {
897 case err, ok := <-consumer.Errors():
898 if ok {
Scott Baker104b67d2019-10-29 15:56:27 -0700899 sc.updateLiveness(false)
Scott Baker2c1c4822019-10-16 11:02:41 -0700900 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
901 } else {
Scott Baker104b67d2019-10-29 15:56:27 -0700902 log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700903 // channel is closed
904 break startloop
905 }
906 case msg, ok := <-consumer.Messages():
907 if !ok {
Scott Baker104b67d2019-10-29 15:56:27 -0700908 log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700909 // Channel closed
910 break startloop
911 }
Scott Baker104b67d2019-10-29 15:56:27 -0700912 sc.updateLiveness(true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700913 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
914 msgBody := msg.Value
915 icm := &ic.InterContainerMessage{}
916 if err := proto.Unmarshal(msgBody, icm); err != nil {
917 log.Warnw("invalid-message", log.Fields{"error": err})
918 continue
919 }
920 go sc.dispatchToConsumers(consumerChnls, icm)
921 consumer.MarkOffset(msg, "")
922 case ntf := <-consumer.Notifications():
923 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
924 case <-sc.doneCh:
925 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
926 break startloop
927 }
928 }
929 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
930}
931
932func (sc *SaramaClient) startConsumers(topic *Topic) error {
933 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
934 var consumerCh *consumerChannels
935 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
936 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
937 return errors.New("consumers-not-exist")
938 }
939 // For each consumer listening for that topic, start a consumption loop
940 for _, consumer := range consumerCh.consumers {
941 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
942 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
943 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
944 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
945 } else {
946 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
947 return errors.New("invalid-consumer")
948 }
949 }
950 return nil
951}
952
953//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
954//// for that topic. It also starts the routine that listens for messages on that topic.
955func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
956 var pConsumers []sarama.PartitionConsumer
957 var err error
958
959 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
960 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
961 return nil, err
962 }
963
964 consumersIf := make([]interface{}, 0)
965 for _, pConsumer := range pConsumers {
966 consumersIf = append(consumersIf, pConsumer)
967 }
968
969 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
970 // unbuffered to verify race conditions.
971 consumerListeningChannel := make(chan *ic.InterContainerMessage)
972 cc := &consumerChannels{
973 consumers: consumersIf,
974 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
975 }
976
977 // Add the consumers channel to the map
978 sc.addTopicToConsumerChannelMap(topic.Name, cc)
979
980 //Start a consumers to listen on that specific topic
981 go sc.startConsumers(topic)
982
983 return consumerListeningChannel, nil
984}
985
986// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
987// for that topic. It also starts the routine that listens for messages on that topic.
988func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
989 // TODO: Replace this development partition consumers with a group consumers
990 var pConsumer *scc.Consumer
991 var err error
992 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
993 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
994 return nil, err
995 }
996 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
997 // unbuffered to verify race conditions.
998 consumerListeningChannel := make(chan *ic.InterContainerMessage)
999 cc := &consumerChannels{
1000 consumers: []interface{}{pConsumer},
1001 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1002 }
1003
1004 // Add the consumers channel to the map
1005 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1006
1007 //Start a consumers to listen on that specific topic
1008 go sc.startConsumers(topic)
1009
1010 return consumerListeningChannel, nil
1011}
1012
1013func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1014 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
1015 partitionList, err := sc.consumer.Partitions(topic.Name)
1016 if err != nil {
1017 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1018 return nil, err
1019 }
1020
1021 pConsumers := make([]sarama.PartitionConsumer, 0)
1022 for _, partition := range partitionList {
1023 var pConsumer sarama.PartitionConsumer
1024 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1025 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1026 return nil, err
1027 }
1028 pConsumers = append(pConsumers, pConsumer)
1029 }
1030 return pConsumers, nil
1031}
1032
1033func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
1034 var i int
1035 var channel chan *ic.InterContainerMessage
1036 for i, channel = range channels {
1037 if channel == ch {
1038 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1039 close(channel)
1040 log.Debug("channel-closed")
1041 return channels[:len(channels)-1]
1042 }
1043 }
1044 return channels
1045}
1046
1047func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1048 sc.lockOfGroupConsumers.Lock()
1049 defer sc.lockOfGroupConsumers.Unlock()
1050 if _, exist := sc.groupConsumers[topic]; !exist {
1051 sc.groupConsumers[topic] = consumer
1052 }
1053}
1054
1055func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1056 sc.lockOfGroupConsumers.Lock()
1057 defer sc.lockOfGroupConsumers.Unlock()
1058 if _, exist := sc.groupConsumers[topic]; exist {
1059 consumer := sc.groupConsumers[topic]
1060 delete(sc.groupConsumers, topic)
1061 if err := consumer.Close(); err != nil {
1062 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
1063 return err
1064 }
1065 }
1066 return nil
1067}