blob: fc750269bfc8d8b475626dc15a28b245855a2abe [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
21 "github.com/Shopify/sarama"
22 scc "github.com/bsm/sarama-cluster"
23 "github.com/golang/protobuf/proto"
24 "github.com/google/uuid"
Scott Bakerce767002019-10-23 13:30:24 -070025 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Bakerf1b096c2019-11-01 12:36:30 -070026 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070027 "strings"
28 "sync"
29 "time"
30)
31
32func init() {
33 log.AddPackage(log.JSON, log.DebugLevel, nil)
34}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
41type consumerChannels struct {
42 consumers []interface{}
43 channels []chan *ic.InterContainerMessage
44}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
48 cAdmin sarama.ClusterAdmin
49 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumers map[string]*scc.Consumer
55 lockOfGroupConsumers sync.RWMutex
56 consumerGroupPrefix string
57 consumerType int
58 consumerGroupName string
59 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
71 doneCh chan int
72 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
74 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
76 metadataMaxRetry int
77}
78
79type SaramaClientOption func(*SaramaClient)
80
81func Host(host string) SaramaClientOption {
82 return func(args *SaramaClient) {
83 args.KafkaHost = host
84 }
85}
86
87func Port(port int) SaramaClientOption {
88 return func(args *SaramaClient) {
89 args.KafkaPort = port
90 }
91}
92
93func ConsumerGroupPrefix(prefix string) SaramaClientOption {
94 return func(args *SaramaClient) {
95 args.consumerGroupPrefix = prefix
96 }
97}
98
99func ConsumerGroupName(name string) SaramaClientOption {
100 return func(args *SaramaClient) {
101 args.consumerGroupName = name
102 }
103}
104
105func ConsumerType(consumer int) SaramaClientOption {
106 return func(args *SaramaClient) {
107 args.consumerType = consumer
108 }
109}
110
111func ProducerFlushFrequency(frequency int) SaramaClientOption {
112 return func(args *SaramaClient) {
113 args.producerFlushFrequency = frequency
114 }
115}
116
117func ProducerFlushMessages(num int) SaramaClientOption {
118 return func(args *SaramaClient) {
119 args.producerFlushMessages = num
120 }
121}
122
123func ProducerFlushMaxMessages(num int) SaramaClientOption {
124 return func(args *SaramaClient) {
125 args.producerFlushMaxmessages = num
126 }
127}
128
129func ProducerMaxRetries(num int) SaramaClientOption {
130 return func(args *SaramaClient) {
131 args.producerRetryMax = num
132 }
133}
134
135func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
136 return func(args *SaramaClient) {
137 args.producerRetryBackOff = duration
138 }
139}
140
141func ProducerReturnOnErrors(opt bool) SaramaClientOption {
142 return func(args *SaramaClient) {
143 args.producerReturnErrors = opt
144 }
145}
146
147func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
148 return func(args *SaramaClient) {
149 args.producerReturnSuccess = opt
150 }
151}
152
153func ConsumerMaxWait(wait int) SaramaClientOption {
154 return func(args *SaramaClient) {
155 args.consumerMaxwait = wait
156 }
157}
158
159func MaxProcessingTime(pTime int) SaramaClientOption {
160 return func(args *SaramaClient) {
161 args.maxProcessingTime = pTime
162 }
163}
164
165func NumPartitions(number int) SaramaClientOption {
166 return func(args *SaramaClient) {
167 args.numPartitions = number
168 }
169}
170
171func NumReplicas(number int) SaramaClientOption {
172 return func(args *SaramaClient) {
173 args.numReplicas = number
174 }
175}
176
177func AutoCreateTopic(opt bool) SaramaClientOption {
178 return func(args *SaramaClient) {
179 args.autoCreateTopic = opt
180 }
181}
182
183func MetadatMaxRetries(retry int) SaramaClientOption {
184 return func(args *SaramaClient) {
185 args.metadataMaxRetry = retry
186 }
187}
188
189func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
190 client := &SaramaClient{
191 KafkaHost: DefaultKafkaHost,
192 KafkaPort: DefaultKafkaPort,
193 }
194 client.consumerType = DefaultConsumerType
195 client.producerFlushFrequency = DefaultProducerFlushFrequency
196 client.producerFlushMessages = DefaultProducerFlushMessages
197 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
198 client.producerReturnErrors = DefaultProducerReturnErrors
199 client.producerReturnSuccess = DefaultProducerReturnSuccess
200 client.producerRetryMax = DefaultProducerRetryMax
201 client.producerRetryBackOff = DefaultProducerRetryBackoff
202 client.consumerMaxwait = DefaultConsumerMaxwait
203 client.maxProcessingTime = DefaultMaxProcessingTime
204 client.numPartitions = DefaultNumberPartitions
205 client.numReplicas = DefaultNumberReplicas
206 client.autoCreateTopic = DefaultAutoCreateTopic
207 client.metadataMaxRetry = DefaultMetadataMaxRetry
208
209 for _, option := range opts {
210 option(client)
211 }
212
213 client.groupConsumers = make(map[string]*scc.Consumer)
214
215 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
216 client.topicLockMap = make(map[string]*sync.RWMutex)
217 client.lockOfTopicLockMap = sync.RWMutex{}
218 client.lockOfGroupConsumers = sync.RWMutex{}
219 return client
220}
221
222func (sc *SaramaClient) Start() error {
223 log.Info("Starting-kafka-sarama-client")
224
225 // Create the Done channel
226 sc.doneCh = make(chan int, 1)
227
228 var err error
229
230 // Add a cleanup in case of failure to startup
231 defer func() {
232 if err != nil {
233 sc.Stop()
234 }
235 }()
236
237 // Create the Cluster Admin
238 if err = sc.createClusterAdmin(); err != nil {
239 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
240 return err
241 }
242
243 // Create the Publisher
244 if err := sc.createPublisher(); err != nil {
245 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
246 return err
247 }
248
249 if sc.consumerType == DefaultConsumerType {
250 // Create the master consumers
251 if err := sc.createConsumer(); err != nil {
252 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
253 return err
254 }
255 }
256
257 // Create the topic to consumers/channel map
258 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
259
260 log.Info("kafka-sarama-client-started")
261
262 return nil
263}
264
265func (sc *SaramaClient) Stop() {
266 log.Info("stopping-sarama-client")
267
268 //Send a message over the done channel to close all long running routines
269 sc.doneCh <- 1
270
271 if sc.producer != nil {
272 if err := sc.producer.Close(); err != nil {
273 log.Errorw("closing-producer-failed", log.Fields{"error": err})
274 }
275 }
276
277 if sc.consumer != nil {
278 if err := sc.consumer.Close(); err != nil {
279 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
280 }
281 }
282
283 for key, val := range sc.groupConsumers {
284 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
285 if err := val.Close(); err != nil {
286 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
287 }
288 }
289
290 if sc.cAdmin != nil {
291 if err := sc.cAdmin.Close(); err != nil {
292 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
293 }
294 }
295
296 //TODO: Clear the consumers map
297 //sc.clearConsumerChannelMap()
298
299 log.Info("sarama-client-stopped")
300}
301
302//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
303// the invoking function must hold the lock
304func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
305 // Set the topic details
306 topicDetail := &sarama.TopicDetail{}
307 topicDetail.NumPartitions = int32(numPartition)
308 topicDetail.ReplicationFactor = int16(repFactor)
309 topicDetail.ConfigEntries = make(map[string]*string)
310 topicDetails := make(map[string]*sarama.TopicDetail)
311 topicDetails[topic.Name] = topicDetail
312
313 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
314 if err == sarama.ErrTopicAlreadyExists {
315 // Not an error
316 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
317 return nil
318 }
319 log.Errorw("create-topic-failure", log.Fields{"error": err})
320 return err
321 }
322 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
323 // do so.
324 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
325 return nil
326}
327
328//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
329// ensure no two go routines are performing operations on the same topic
330func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
331 sc.lockTopic(topic)
332 defer sc.unLockTopic(topic)
333
334 return sc.createTopic(topic, numPartition, repFactor)
335}
336
337//DeleteTopic removes a topic from the kafka Broker
338func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
339 sc.lockTopic(topic)
340 defer sc.unLockTopic(topic)
341
342 // Remove the topic from the broker
343 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
344 if err == sarama.ErrUnknownTopicOrPartition {
345 // Not an error as does not exist
346 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
347 return nil
348 }
349 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
350 return err
351 }
352
353 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
354 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
355 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
356 return err
357 }
358 return nil
359}
360
361// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
362// messages from that topic
363func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
364 sc.lockTopic(topic)
365 defer sc.unLockTopic(topic)
366
367 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
368
369 // If a consumers already exist for that topic then resuse it
370 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
371 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
372 // Create a channel specific for that consumers and add it to the consumers channel map
373 ch := make(chan *ic.InterContainerMessage)
374 sc.addChannelToConsumerChannelMap(topic, ch)
375 return ch, nil
376 }
377
378 // Register for the topic and set it up
379 var consumerListeningChannel chan *ic.InterContainerMessage
380 var err error
381
382 // Use the consumerType option to figure out the type of consumer to launch
383 if sc.consumerType == PartitionConsumer {
384 if sc.autoCreateTopic {
385 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
386 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
387 return nil, err
388 }
389 }
390 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
391 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
392 return nil, err
393 }
394 } else if sc.consumerType == GroupCustomer {
395 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
396 // does not consume from a precreated topic in some scenarios
397 //if sc.autoCreateTopic {
398 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
399 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
400 // return nil, err
401 // }
402 //}
403 //groupId := sc.consumerGroupName
404 groupId := getGroupId(kvArgs...)
405 // Include the group prefix
406 if groupId != "" {
407 groupId = sc.consumerGroupPrefix + groupId
408 } else {
409 // Need to use a unique group Id per topic
410 groupId = sc.consumerGroupPrefix + topic.Name
411 }
412 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
413 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
414 return nil, err
415 }
416
417 } else {
418 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
419 return nil, errors.New("unknown-consumer-type")
420 }
421
422 return consumerListeningChannel, nil
423}
424
425//UnSubscribe unsubscribe a consumer from a given topic
426func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
427 sc.lockTopic(topic)
428 defer sc.unLockTopic(topic)
429
430 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
431 var err error
432 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
433 log.Errorw("failed-removing-channel", log.Fields{"error": err})
434 }
435 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
436 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
437 }
438 return err
439}
440
441// send formats and sends the request onto the kafka messaging bus.
442func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
443
444 // Assert message is a proto message
445 var protoMsg proto.Message
446 var ok bool
447 // ascertain the value interface type is a proto.Message
448 if protoMsg, ok = msg.(proto.Message); !ok {
449 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
450 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
451 }
452
453 var marshalled []byte
454 var err error
455 // Create the Sarama producer message
456 if marshalled, err = proto.Marshal(protoMsg); err != nil {
457 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
458 return err
459 }
460 key := ""
461 if len(keys) > 0 {
462 key = keys[0] // Only the first key is relevant
463 }
464 kafkaMsg := &sarama.ProducerMessage{
465 Topic: topic.Name,
466 Key: sarama.StringEncoder(key),
467 Value: sarama.ByteEncoder(marshalled),
468 }
469
470 // Send message to kafka
471 sc.producer.Input() <- kafkaMsg
472 // Wait for result
473 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
474 select {
475 case ok := <-sc.producer.Successes():
476 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
477 case notOk := <-sc.producer.Errors():
478 log.Debugw("error-sending", log.Fields{"status": notOk})
479 return notOk
480 }
481 return nil
482}
483
484// getGroupId returns the group id from the key-value args.
485func getGroupId(kvArgs ...*KVArg) string {
486 for _, arg := range kvArgs {
487 if arg.Key == GroupIdKey {
488 return arg.Value.(string)
489 }
490 }
491 return ""
492}
493
494// getOffset returns the offset from the key-value args.
495func getOffset(kvArgs ...*KVArg) int64 {
496 for _, arg := range kvArgs {
497 if arg.Key == Offset {
498 return arg.Value.(int64)
499 }
500 }
501 return sarama.OffsetNewest
502}
503
504func (sc *SaramaClient) createClusterAdmin() error {
505 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
506 config := sarama.NewConfig()
507 config.Version = sarama.V1_0_0_0
508
509 // Create a cluster Admin
510 var cAdmin sarama.ClusterAdmin
511 var err error
512 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
513 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
514 return err
515 }
516 sc.cAdmin = cAdmin
517 return nil
518}
519
520func (sc *SaramaClient) lockTopic(topic *Topic) {
521 sc.lockOfTopicLockMap.Lock()
522 if _, exist := sc.topicLockMap[topic.Name]; exist {
523 sc.lockOfTopicLockMap.Unlock()
524 sc.topicLockMap[topic.Name].Lock()
525 } else {
526 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
527 sc.lockOfTopicLockMap.Unlock()
528 sc.topicLockMap[topic.Name].Lock()
529 }
530}
531
532func (sc *SaramaClient) unLockTopic(topic *Topic) {
533 sc.lockOfTopicLockMap.Lock()
534 defer sc.lockOfTopicLockMap.Unlock()
535 if _, exist := sc.topicLockMap[topic.Name]; exist {
536 sc.topicLockMap[topic.Name].Unlock()
537 }
538}
539
540func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
541 sc.lockTopicToConsumerChannelMap.Lock()
542 defer sc.lockTopicToConsumerChannelMap.Unlock()
543 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
544 sc.topicToConsumerChannelMap[id] = arg
545 }
546}
547
548func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
549 sc.lockTopicToConsumerChannelMap.Lock()
550 defer sc.lockTopicToConsumerChannelMap.Unlock()
551 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
552 delete(sc.topicToConsumerChannelMap, id)
553 }
554}
555
556func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
557 sc.lockTopicToConsumerChannelMap.RLock()
558 defer sc.lockTopicToConsumerChannelMap.RUnlock()
559
560 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
561 return consumerCh
562 }
563 return nil
564}
565
566func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
567 sc.lockTopicToConsumerChannelMap.Lock()
568 defer sc.lockTopicToConsumerChannelMap.Unlock()
569 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
570 consumerCh.channels = append(consumerCh.channels, ch)
571 return
572 }
573 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
574}
575
576//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
577func closeConsumers(consumers []interface{}) error {
578 var err error
579 for _, consumer := range consumers {
580 // Is it a partition consumers?
581 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
582 if errTemp := partionConsumer.Close(); errTemp != nil {
583 log.Debugw("partition!!!", log.Fields{"err": errTemp})
584 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
585 // This can occur on race condition
586 err = nil
587 } else {
588 err = errTemp
589 }
590 }
591 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
592 if errTemp := groupConsumer.Close(); errTemp != nil {
593 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
594 // This can occur on race condition
595 err = nil
596 } else {
597 err = errTemp
598 }
599 }
600 }
601 }
602 return err
603}
604
605func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
606 sc.lockTopicToConsumerChannelMap.Lock()
607 defer sc.lockTopicToConsumerChannelMap.Unlock()
608 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
609 // Channel will be closed in the removeChannel method
610 consumerCh.channels = removeChannel(consumerCh.channels, ch)
611 // If there are no more channels then we can close the consumers itself
612 if len(consumerCh.channels) == 0 {
613 log.Debugw("closing-consumers", log.Fields{"topic": topic})
614 err := closeConsumers(consumerCh.consumers)
615 //err := consumerCh.consumers.Close()
616 delete(sc.topicToConsumerChannelMap, topic.Name)
617 return err
618 }
619 return nil
620 }
621 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
622 return errors.New("topic-does-not-exist")
623}
624
625func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
626 sc.lockTopicToConsumerChannelMap.Lock()
627 defer sc.lockTopicToConsumerChannelMap.Unlock()
628 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
629 for _, ch := range consumerCh.channels {
630 // Channel will be closed in the removeChannel method
631 removeChannel(consumerCh.channels, ch)
632 }
633 err := closeConsumers(consumerCh.consumers)
634 //if err == sarama.ErrUnknownTopicOrPartition {
635 // // Not an error
636 // err = nil
637 //}
638 //err := consumerCh.consumers.Close()
639 delete(sc.topicToConsumerChannelMap, topic.Name)
640 return err
641 }
642 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
643 return nil
644}
645
646func (sc *SaramaClient) clearConsumerChannelMap() error {
647 sc.lockTopicToConsumerChannelMap.Lock()
648 defer sc.lockTopicToConsumerChannelMap.Unlock()
649 var err error
650 for topic, consumerCh := range sc.topicToConsumerChannelMap {
651 for _, ch := range consumerCh.channels {
652 // Channel will be closed in the removeChannel method
653 removeChannel(consumerCh.channels, ch)
654 }
655 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
656 err = errTemp
657 }
658 //err = consumerCh.consumers.Close()
659 delete(sc.topicToConsumerChannelMap, topic)
660 }
661 return err
662}
663
664//createPublisher creates the publisher which is used to send a message onto kafka
665func (sc *SaramaClient) createPublisher() error {
666 // This Creates the publisher
667 config := sarama.NewConfig()
668 config.Producer.Partitioner = sarama.NewRandomPartitioner
669 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
670 config.Producer.Flush.Messages = sc.producerFlushMessages
671 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
672 config.Producer.Return.Errors = sc.producerReturnErrors
673 config.Producer.Return.Successes = sc.producerReturnSuccess
674 //config.Producer.RequiredAcks = sarama.WaitForAll
675 config.Producer.RequiredAcks = sarama.WaitForLocal
676
677 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
678 brokers := []string{kafkaFullAddr}
679
680 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
681 log.Errorw("error-starting-publisher", log.Fields{"error": err})
682 return err
683 } else {
684 sc.producer = producer
685 }
686 log.Info("Kafka-publisher-created")
687 return nil
688}
689
690func (sc *SaramaClient) createConsumer() error {
691 config := sarama.NewConfig()
692 config.Consumer.Return.Errors = true
693 config.Consumer.Fetch.Min = 1
694 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
695 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
696 config.Consumer.Offsets.Initial = sarama.OffsetNewest
697 config.Metadata.Retry.Max = sc.metadataMaxRetry
698 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
699 brokers := []string{kafkaFullAddr}
700
701 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
702 log.Errorw("error-starting-consumers", log.Fields{"error": err})
703 return err
704 } else {
705 sc.consumer = consumer
706 }
707 log.Info("Kafka-consumers-created")
708 return nil
709}
710
711// createGroupConsumer creates a consumers group
712func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
713 config := scc.NewConfig()
714 config.ClientID = uuid.New().String()
715 config.Group.Mode = scc.ConsumerModeMultiplex
716 //config.Consumer.Return.Errors = true
717 //config.Group.Return.Notifications = false
718 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
719 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
720 config.Consumer.Offsets.Initial = initialOffset
721 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
722 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
723 brokers := []string{kafkaFullAddr}
724
725 topics := []string{topic.Name}
726 var consumer *scc.Consumer
727 var err error
728
729 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
730 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
731 return nil, err
732 }
733 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
734
735 //sc.groupConsumers[topic.Name] = consumer
736 sc.addToGroupConsumers(topic.Name, consumer)
737 return consumer, nil
738}
739
740// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
741// topic via the unique channel each subscriber received during subscription
742func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
743 // Need to go over all channels and publish messages to them - do we need to copy msg?
744 sc.lockTopicToConsumerChannelMap.RLock()
745 defer sc.lockTopicToConsumerChannelMap.RUnlock()
746 for _, ch := range consumerCh.channels {
747 go func(c chan *ic.InterContainerMessage) {
748 c <- protoMessage
749 }(ch)
750 }
751}
752
753func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
754 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
755startloop:
756 for {
757 select {
758 case err, ok := <-consumer.Errors():
759 if ok {
760 log.Warnw("partition-consumers-error", log.Fields{"error": err})
761 } else {
762 // Channel is closed
763 break startloop
764 }
765 case msg, ok := <-consumer.Messages():
766 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
767 if !ok {
768 // channel is closed
769 break startloop
770 }
771 msgBody := msg.Value
772 icm := &ic.InterContainerMessage{}
773 if err := proto.Unmarshal(msgBody, icm); err != nil {
774 log.Warnw("partition-invalid-message", log.Fields{"error": err})
775 continue
776 }
777 go sc.dispatchToConsumers(consumerChnls, icm)
778 case <-sc.doneCh:
779 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
780 break startloop
781 }
782 }
783 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
784}
785
786func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
787 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
788
789startloop:
790 for {
791 select {
792 case err, ok := <-consumer.Errors():
793 if ok {
794 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
795 } else {
796 // channel is closed
797 break startloop
798 }
799 case msg, ok := <-consumer.Messages():
800 if !ok {
801 // Channel closed
802 break startloop
803 }
804 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
805 msgBody := msg.Value
806 icm := &ic.InterContainerMessage{}
807 if err := proto.Unmarshal(msgBody, icm); err != nil {
808 log.Warnw("invalid-message", log.Fields{"error": err})
809 continue
810 }
811 go sc.dispatchToConsumers(consumerChnls, icm)
812 consumer.MarkOffset(msg, "")
813 case ntf := <-consumer.Notifications():
814 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
815 case <-sc.doneCh:
816 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
817 break startloop
818 }
819 }
820 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
821}
822
823func (sc *SaramaClient) startConsumers(topic *Topic) error {
824 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
825 var consumerCh *consumerChannels
826 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
827 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
828 return errors.New("consumers-not-exist")
829 }
830 // For each consumer listening for that topic, start a consumption loop
831 for _, consumer := range consumerCh.consumers {
832 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
833 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
834 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
835 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
836 } else {
837 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
838 return errors.New("invalid-consumer")
839 }
840 }
841 return nil
842}
843
844//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
845//// for that topic. It also starts the routine that listens for messages on that topic.
846func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
847 var pConsumers []sarama.PartitionConsumer
848 var err error
849
850 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
851 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
852 return nil, err
853 }
854
855 consumersIf := make([]interface{}, 0)
856 for _, pConsumer := range pConsumers {
857 consumersIf = append(consumersIf, pConsumer)
858 }
859
860 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
861 // unbuffered to verify race conditions.
862 consumerListeningChannel := make(chan *ic.InterContainerMessage)
863 cc := &consumerChannels{
864 consumers: consumersIf,
865 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
866 }
867
868 // Add the consumers channel to the map
869 sc.addTopicToConsumerChannelMap(topic.Name, cc)
870
871 //Start a consumers to listen on that specific topic
872 go sc.startConsumers(topic)
873
874 return consumerListeningChannel, nil
875}
876
877// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
878// for that topic. It also starts the routine that listens for messages on that topic.
879func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
880 // TODO: Replace this development partition consumers with a group consumers
881 var pConsumer *scc.Consumer
882 var err error
883 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
884 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
885 return nil, err
886 }
887 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
888 // unbuffered to verify race conditions.
889 consumerListeningChannel := make(chan *ic.InterContainerMessage)
890 cc := &consumerChannels{
891 consumers: []interface{}{pConsumer},
892 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
893 }
894
895 // Add the consumers channel to the map
896 sc.addTopicToConsumerChannelMap(topic.Name, cc)
897
898 //Start a consumers to listen on that specific topic
899 go sc.startConsumers(topic)
900
901 return consumerListeningChannel, nil
902}
903
904func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
905 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
906 partitionList, err := sc.consumer.Partitions(topic.Name)
907 if err != nil {
908 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
909 return nil, err
910 }
911
912 pConsumers := make([]sarama.PartitionConsumer, 0)
913 for _, partition := range partitionList {
914 var pConsumer sarama.PartitionConsumer
915 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
916 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
917 return nil, err
918 }
919 pConsumers = append(pConsumers, pConsumer)
920 }
921 return pConsumers, nil
922}
923
924func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
925 var i int
926 var channel chan *ic.InterContainerMessage
927 for i, channel = range channels {
928 if channel == ch {
929 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
930 close(channel)
931 log.Debug("channel-closed")
932 return channels[:len(channels)-1]
933 }
934 }
935 return channels
936}
937
938func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
939 sc.lockOfGroupConsumers.Lock()
940 defer sc.lockOfGroupConsumers.Unlock()
941 if _, exist := sc.groupConsumers[topic]; !exist {
942 sc.groupConsumers[topic] = consumer
943 }
944}
945
946func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
947 sc.lockOfGroupConsumers.Lock()
948 defer sc.lockOfGroupConsumers.Unlock()
949 if _, exist := sc.groupConsumers[topic]; exist {
950 consumer := sc.groupConsumers[topic]
951 delete(sc.groupConsumers, topic)
952 if err := consumer.Close(); err != nil {
953 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
954 return err
955 }
956 }
957 return nil
958}