blob: e920a83533bbe736793334d491493edc545f8022 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
21 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
23 "github.com/google/uuid"
24 "github.com/opencord/voltha-go/common/log"
25 ic "github.com/opencord/voltha-protos/go/inter_container"
26 "gopkg.in/Shopify/sarama.v1"
27 "strings"
28 "sync"
29 "time"
30)
31
32func init() {
33 log.AddPackage(log.JSON, log.DebugLevel, nil)
34}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
41type consumerChannels struct {
42 consumers []interface{}
43 channels []chan *ic.InterContainerMessage
44}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
48 cAdmin sarama.ClusterAdmin
49 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumers map[string]*scc.Consumer
Matt Jeanneret384d8c92019-05-06 14:27:31 -040055 lockOfGroupConsumers sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040056 consumerGroupPrefix string
57 consumerType int
58 consumerGroupName string
59 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
71 doneCh chan int
72 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
74 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
76}
77
78type SaramaClientOption func(*SaramaClient)
79
80func Host(host string) SaramaClientOption {
81 return func(args *SaramaClient) {
82 args.KafkaHost = host
83 }
84}
85
86func Port(port int) SaramaClientOption {
87 return func(args *SaramaClient) {
88 args.KafkaPort = port
89 }
90}
91
92func ConsumerGroupPrefix(prefix string) SaramaClientOption {
93 return func(args *SaramaClient) {
94 args.consumerGroupPrefix = prefix
95 }
96}
97
98func ConsumerGroupName(name string) SaramaClientOption {
99 return func(args *SaramaClient) {
100 args.consumerGroupName = name
101 }
102}
103
104func ConsumerType(consumer int) SaramaClientOption {
105 return func(args *SaramaClient) {
106 args.consumerType = consumer
107 }
108}
109
110func ProducerFlushFrequency(frequency int) SaramaClientOption {
111 return func(args *SaramaClient) {
112 args.producerFlushFrequency = frequency
113 }
114}
115
116func ProducerFlushMessages(num int) SaramaClientOption {
117 return func(args *SaramaClient) {
118 args.producerFlushMessages = num
119 }
120}
121
122func ProducerFlushMaxMessages(num int) SaramaClientOption {
123 return func(args *SaramaClient) {
124 args.producerFlushMaxmessages = num
125 }
126}
127
128func ProducerMaxRetries(num int) SaramaClientOption {
129 return func(args *SaramaClient) {
130 args.producerRetryMax = num
131 }
132}
133
134func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
135 return func(args *SaramaClient) {
136 args.producerRetryBackOff = duration
137 }
138}
139
140func ProducerReturnOnErrors(opt bool) SaramaClientOption {
141 return func(args *SaramaClient) {
142 args.producerReturnErrors = opt
143 }
144}
145
146func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
147 return func(args *SaramaClient) {
148 args.producerReturnSuccess = opt
149 }
150}
151
152func ConsumerMaxWait(wait int) SaramaClientOption {
153 return func(args *SaramaClient) {
154 args.consumerMaxwait = wait
155 }
156}
157
158func MaxProcessingTime(pTime int) SaramaClientOption {
159 return func(args *SaramaClient) {
160 args.maxProcessingTime = pTime
161 }
162}
163
164func NumPartitions(number int) SaramaClientOption {
165 return func(args *SaramaClient) {
166 args.numPartitions = number
167 }
168}
169
170func NumReplicas(number int) SaramaClientOption {
171 return func(args *SaramaClient) {
172 args.numReplicas = number
173 }
174}
175
176func AutoCreateTopic(opt bool) SaramaClientOption {
177 return func(args *SaramaClient) {
178 args.autoCreateTopic = opt
179 }
180}
181
182func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
183 client := &SaramaClient{
184 KafkaHost: DefaultKafkaHost,
185 KafkaPort: DefaultKafkaPort,
186 }
187 client.consumerType = DefaultConsumerType
188 client.producerFlushFrequency = DefaultProducerFlushFrequency
189 client.producerFlushMessages = DefaultProducerFlushMessages
190 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
191 client.producerReturnErrors = DefaultProducerReturnErrors
192 client.producerReturnSuccess = DefaultProducerReturnSuccess
193 client.producerRetryMax = DefaultProducerRetryMax
194 client.producerRetryBackOff = DefaultProducerRetryBackoff
195 client.consumerMaxwait = DefaultConsumerMaxwait
196 client.maxProcessingTime = DefaultMaxProcessingTime
197 client.numPartitions = DefaultNumberPartitions
198 client.numReplicas = DefaultNumberReplicas
199 client.autoCreateTopic = DefaultAutoCreateTopic
200
201 for _, option := range opts {
202 option(client)
203 }
204
205 client.groupConsumers = make(map[string]*scc.Consumer)
206
207 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
208 client.topicLockMap = make(map[string]*sync.RWMutex)
209 client.lockOfTopicLockMap = sync.RWMutex{}
210 client.lockOfGroupConsumers = sync.RWMutex{}
211 return client
212}
213
214func (sc *SaramaClient) Start() error {
215 log.Info("Starting-kafka-sarama-client")
216
217 // Create the Done channel
218 sc.doneCh = make(chan int, 1)
219
220 var err error
221
222 // Create the Cluster Admin
223 if err = sc.createClusterAdmin(); err != nil {
224 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
225 return err
226 }
227
228 // Create the Publisher
229 if err := sc.createPublisher(); err != nil {
230 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
231 return err
232 }
233
234 if sc.consumerType == DefaultConsumerType {
235 // Create the master consumers
236 if err := sc.createConsumer(); err != nil {
237 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
238 return err
239 }
240 }
241
242 // Create the topic to consumers/channel map
243 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
244
245 log.Info("kafka-sarama-client-started")
246
247 return nil
248}
249
250func (sc *SaramaClient) Stop() {
251 log.Info("stopping-sarama-client")
252
253 //Send a message over the done channel to close all long running routines
254 sc.doneCh <- 1
255
256 if sc.producer != nil {
257 if err := sc.producer.Close(); err != nil {
258 log.Errorw("closing-producer-failed", log.Fields{"error": err})
259 }
260 }
261
262 if sc.consumer != nil {
263 if err := sc.consumer.Close(); err != nil {
264 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
265 }
266 }
267
268 for key, val := range sc.groupConsumers {
269 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
270 if err := val.Close(); err != nil {
271 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
272 }
273 }
274
275 if sc.cAdmin != nil {
276 if err := sc.cAdmin.Close(); err != nil {
277 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
278 }
279 }
280
281 //TODO: Clear the consumers map
282 //sc.clearConsumerChannelMap()
283
284 log.Info("sarama-client-stopped")
285}
286
287//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
288// the invoking function must hold the lock
289func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
290 // Set the topic details
291 topicDetail := &sarama.TopicDetail{}
292 topicDetail.NumPartitions = int32(numPartition)
293 topicDetail.ReplicationFactor = int16(repFactor)
294 topicDetail.ConfigEntries = make(map[string]*string)
295 topicDetails := make(map[string]*sarama.TopicDetail)
296 topicDetails[topic.Name] = topicDetail
297
298 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
299 if err == sarama.ErrTopicAlreadyExists {
300 // Not an error
301 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
302 return nil
303 }
304 log.Errorw("create-topic-failure", log.Fields{"error": err})
305 return err
306 }
307 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
308 // do so.
309 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
310 return nil
311}
312
313//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
314// ensure no two go routines are performing operations on the same topic
315func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
316 sc.lockTopic(topic)
317 defer sc.unLockTopic(topic)
318
319 return sc.createTopic(topic, numPartition, repFactor)
320}
321
322//DeleteTopic removes a topic from the kafka Broker
323func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
324 sc.lockTopic(topic)
325 defer sc.unLockTopic(topic)
326
327 // Remove the topic from the broker
328 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
329 if err == sarama.ErrUnknownTopicOrPartition {
330 // Not an error as does not exist
331 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
332 return nil
333 }
334 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
335 return err
336 }
337
338 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
339 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
340 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
341 return err
342 }
343 return nil
344}
345
346// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
347// messages from that topic
348func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
349 sc.lockTopic(topic)
350 defer sc.unLockTopic(topic)
351
352 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
353
354 // If a consumers already exist for that topic then resuse it
355 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
356 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
357 // Create a channel specific for that consumers and add it to the consumers channel map
358 ch := make(chan *ic.InterContainerMessage)
359 sc.addChannelToConsumerChannelMap(topic, ch)
360 return ch, nil
361 }
362
363 // Register for the topic and set it up
364 var consumerListeningChannel chan *ic.InterContainerMessage
365 var err error
366
367 // Use the consumerType option to figure out the type of consumer to launch
368 if sc.consumerType == PartitionConsumer {
369 if sc.autoCreateTopic {
370 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
371 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
372 return nil, err
373 }
374 }
375 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
376 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
377 return nil, err
378 }
379 } else if sc.consumerType == GroupCustomer {
380 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
381 // does not consume from a precreated topic in some scenarios
382 //if sc.autoCreateTopic {
383 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
384 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
385 // return nil, err
386 // }
387 //}
388 //groupId := sc.consumerGroupName
389 groupId := getGroupId(kvArgs...)
390 // Include the group prefix
391 if groupId != "" {
392 groupId = sc.consumerGroupPrefix + groupId
393 } else {
394 // Need to use a unique group Id per topic
395 groupId = sc.consumerGroupPrefix + topic.Name
396 }
397 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
398 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
399 return nil, err
400 }
401
402 } else {
403 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
404 return nil, errors.New("unknown-consumer-type")
405 }
406
407 return consumerListeningChannel, nil
408}
409
410//UnSubscribe unsubscribe a consumer from a given topic
411func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
412 sc.lockTopic(topic)
413 defer sc.unLockTopic(topic)
414
415 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
416 var err error
417 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
418 log.Errorw("failed-removing-channel", log.Fields{"error": err})
419 }
420 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
421 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
422 }
423 return err
424}
425
426// send formats and sends the request onto the kafka messaging bus.
427func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
428
429 // Assert message is a proto message
430 var protoMsg proto.Message
431 var ok bool
432 // ascertain the value interface type is a proto.Message
433 if protoMsg, ok = msg.(proto.Message); !ok {
434 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
435 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
436 }
437
438 var marshalled []byte
439 var err error
440 // Create the Sarama producer message
441 if marshalled, err = proto.Marshal(protoMsg); err != nil {
442 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
443 return err
444 }
445 key := ""
446 if len(keys) > 0 {
447 key = keys[0] // Only the first key is relevant
448 }
449 kafkaMsg := &sarama.ProducerMessage{
450 Topic: topic.Name,
451 Key: sarama.StringEncoder(key),
452 Value: sarama.ByteEncoder(marshalled),
453 }
454
455 // Send message to kafka
456 sc.producer.Input() <- kafkaMsg
William Kurkianea869482019-04-09 15:16:11 -0400457 // Wait for result
458 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
459 select {
460 case ok := <-sc.producer.Successes():
461 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
462 case notOk := <-sc.producer.Errors():
463 log.Debugw("error-sending", log.Fields{"status": notOk})
464 return notOk
465 }
466 return nil
467}
468
469// getGroupId returns the group id from the key-value args.
470func getGroupId(kvArgs ...*KVArg) string {
471 for _, arg := range kvArgs {
472 if arg.Key == GroupIdKey {
473 return arg.Value.(string)
474 }
475 }
476 return ""
477}
478
479// getOffset returns the offset from the key-value args.
480func getOffset(kvArgs ...*KVArg) int64 {
481 for _, arg := range kvArgs {
482 if arg.Key == Offset {
483 return arg.Value.(int64)
484 }
485 }
486 return sarama.OffsetNewest
487}
488
489func (sc *SaramaClient) createClusterAdmin() error {
490 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
491 config := sarama.NewConfig()
492 config.Version = sarama.V1_0_0_0
493
494 // Create a cluster Admin
495 var cAdmin sarama.ClusterAdmin
496 var err error
497 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
498 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
499 return err
500 }
501 sc.cAdmin = cAdmin
502 return nil
503}
504
505func (sc *SaramaClient) lockTopic(topic *Topic) {
506 sc.lockOfTopicLockMap.Lock()
507 if _, exist := sc.topicLockMap[topic.Name]; exist {
508 sc.lockOfTopicLockMap.Unlock()
509 sc.topicLockMap[topic.Name].Lock()
510 } else {
511 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
512 sc.lockOfTopicLockMap.Unlock()
513 sc.topicLockMap[topic.Name].Lock()
514 }
515}
516
517func (sc *SaramaClient) unLockTopic(topic *Topic) {
518 sc.lockOfTopicLockMap.Lock()
519 defer sc.lockOfTopicLockMap.Unlock()
520 if _, exist := sc.topicLockMap[topic.Name]; exist {
521 sc.topicLockMap[topic.Name].Unlock()
522 }
523}
524
525func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
526 sc.lockTopicToConsumerChannelMap.Lock()
527 defer sc.lockTopicToConsumerChannelMap.Unlock()
528 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
529 sc.topicToConsumerChannelMap[id] = arg
530 }
531}
532
533func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
534 sc.lockTopicToConsumerChannelMap.Lock()
535 defer sc.lockTopicToConsumerChannelMap.Unlock()
536 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
537 delete(sc.topicToConsumerChannelMap, id)
538 }
539}
540
541func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
542 sc.lockTopicToConsumerChannelMap.RLock()
543 defer sc.lockTopicToConsumerChannelMap.RUnlock()
544
545 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
546 return consumerCh
547 }
548 return nil
549}
550
551func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
552 sc.lockTopicToConsumerChannelMap.Lock()
553 defer sc.lockTopicToConsumerChannelMap.Unlock()
554 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
555 consumerCh.channels = append(consumerCh.channels, ch)
556 return
557 }
558 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
559}
560
561//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
562func closeConsumers(consumers []interface{}) error {
563 var err error
564 for _, consumer := range consumers {
565 // Is it a partition consumers?
566 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
567 if errTemp := partionConsumer.Close(); errTemp != nil {
568 log.Debugw("partition!!!", log.Fields{"err": errTemp})
569 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
570 // This can occur on race condition
571 err = nil
572 } else {
573 err = errTemp
574 }
575 }
576 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
577 if errTemp := groupConsumer.Close(); errTemp != nil {
578 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
579 // This can occur on race condition
580 err = nil
581 } else {
582 err = errTemp
583 }
584 }
585 }
586 }
587 return err
588}
589
590func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
591 sc.lockTopicToConsumerChannelMap.Lock()
592 defer sc.lockTopicToConsumerChannelMap.Unlock()
593 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
594 // Channel will be closed in the removeChannel method
595 consumerCh.channels = removeChannel(consumerCh.channels, ch)
596 // If there are no more channels then we can close the consumers itself
597 if len(consumerCh.channels) == 0 {
598 log.Debugw("closing-consumers", log.Fields{"topic": topic})
599 err := closeConsumers(consumerCh.consumers)
600 //err := consumerCh.consumers.Close()
601 delete(sc.topicToConsumerChannelMap, topic.Name)
602 return err
603 }
604 return nil
605 }
606 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
607 return errors.New("topic-does-not-exist")
608}
609
610func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
611 sc.lockTopicToConsumerChannelMap.Lock()
612 defer sc.lockTopicToConsumerChannelMap.Unlock()
613 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
614 for _, ch := range consumerCh.channels {
615 // Channel will be closed in the removeChannel method
616 removeChannel(consumerCh.channels, ch)
617 }
618 err := closeConsumers(consumerCh.consumers)
619 //if err == sarama.ErrUnknownTopicOrPartition {
620 // // Not an error
621 // err = nil
622 //}
623 //err := consumerCh.consumers.Close()
624 delete(sc.topicToConsumerChannelMap, topic.Name)
625 return err
626 }
627 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
628 return nil
629}
630
631func (sc *SaramaClient) clearConsumerChannelMap() error {
632 sc.lockTopicToConsumerChannelMap.Lock()
633 defer sc.lockTopicToConsumerChannelMap.Unlock()
634 var err error
635 for topic, consumerCh := range sc.topicToConsumerChannelMap {
636 for _, ch := range consumerCh.channels {
637 // Channel will be closed in the removeChannel method
638 removeChannel(consumerCh.channels, ch)
639 }
640 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
641 err = errTemp
642 }
643 //err = consumerCh.consumers.Close()
644 delete(sc.topicToConsumerChannelMap, topic)
645 }
646 return err
647}
648
649//createPublisher creates the publisher which is used to send a message onto kafka
650func (sc *SaramaClient) createPublisher() error {
651 // This Creates the publisher
652 config := sarama.NewConfig()
653 config.Producer.Partitioner = sarama.NewRandomPartitioner
654 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
655 config.Producer.Flush.Messages = sc.producerFlushMessages
656 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
657 config.Producer.Return.Errors = sc.producerReturnErrors
658 config.Producer.Return.Successes = sc.producerReturnSuccess
659 //config.Producer.RequiredAcks = sarama.WaitForAll
660 config.Producer.RequiredAcks = sarama.WaitForLocal
661
662 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
663 brokers := []string{kafkaFullAddr}
664
665 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
666 log.Errorw("error-starting-publisher", log.Fields{"error": err})
667 return err
668 } else {
669 sc.producer = producer
670 }
671 log.Info("Kafka-publisher-created")
672 return nil
673}
674
675func (sc *SaramaClient) createConsumer() error {
676 config := sarama.NewConfig()
677 config.Consumer.Return.Errors = true
678 config.Consumer.Fetch.Min = 1
679 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
680 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
681 config.Consumer.Offsets.Initial = sarama.OffsetNewest
682 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
683 brokers := []string{kafkaFullAddr}
684
685 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
686 log.Errorw("error-starting-consumers", log.Fields{"error": err})
687 return err
688 } else {
689 sc.consumer = consumer
690 }
691 log.Info("Kafka-consumers-created")
692 return nil
693}
694
695// createGroupConsumer creates a consumers group
696func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
697 config := scc.NewConfig()
698 config.ClientID = uuid.New().String()
699 config.Group.Mode = scc.ConsumerModeMultiplex
700 //config.Consumer.Return.Errors = true
701 //config.Group.Return.Notifications = false
702 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
703 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
704 config.Consumer.Offsets.Initial = initialOffset
705 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
706 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
707 brokers := []string{kafkaFullAddr}
708
709 topics := []string{topic.Name}
710 var consumer *scc.Consumer
711 var err error
712
713 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
714 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
715 return nil, err
716 }
717 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
718
719 //sc.groupConsumers[topic.Name] = consumer
720 sc.addToGroupConsumers(topic.Name, consumer)
721 return consumer, nil
722}
723
724// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
725// topic via the unique channel each subscriber received during subscription
726func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
727 // Need to go over all channels and publish messages to them - do we need to copy msg?
728 sc.lockTopicToConsumerChannelMap.RLock()
729 defer sc.lockTopicToConsumerChannelMap.RUnlock()
730 for _, ch := range consumerCh.channels {
731 go func(c chan *ic.InterContainerMessage) {
732 c <- protoMessage
733 }(ch)
734 }
735}
736
737func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
738 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
739startloop:
740 for {
741 select {
742 case err, ok := <-consumer.Errors():
743 if ok {
744 log.Warnw("partition-consumers-error", log.Fields{"error": err})
745 } else {
746 // Channel is closed
747 break startloop
748 }
749 case msg, ok := <-consumer.Messages():
750 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
751 if !ok {
752 // channel is closed
753 break startloop
754 }
755 msgBody := msg.Value
756 icm := &ic.InterContainerMessage{}
757 if err := proto.Unmarshal(msgBody, icm); err != nil {
758 log.Warnw("partition-invalid-message", log.Fields{"error": err})
759 continue
760 }
761 go sc.dispatchToConsumers(consumerChnls, icm)
762 case <-sc.doneCh:
763 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
764 break startloop
765 }
766 }
767 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
768}
769
770func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
771 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
772
773startloop:
774 for {
775 select {
776 case err, ok := <-consumer.Errors():
777 if ok {
778 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
779 } else {
780 // channel is closed
781 break startloop
782 }
783 case msg, ok := <-consumer.Messages():
784 if !ok {
785 // Channel closed
786 break startloop
787 }
788 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
789 msgBody := msg.Value
790 icm := &ic.InterContainerMessage{}
791 if err := proto.Unmarshal(msgBody, icm); err != nil {
792 log.Warnw("invalid-message", log.Fields{"error": err})
793 continue
794 }
795 go sc.dispatchToConsumers(consumerChnls, icm)
796 consumer.MarkOffset(msg, "")
797 case ntf := <-consumer.Notifications():
798 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
799 case <-sc.doneCh:
800 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
801 break startloop
802 }
803 }
804 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
805}
806
807func (sc *SaramaClient) startConsumers(topic *Topic) error {
808 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
809 var consumerCh *consumerChannels
810 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
811 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
812 return errors.New("consumers-not-exist")
813 }
814 // For each consumer listening for that topic, start a consumption loop
815 for _, consumer := range consumerCh.consumers {
816 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
817 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
818 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
819 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
820 } else {
821 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
822 return errors.New("invalid-consumer")
823 }
824 }
825 return nil
826}
827
828//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
829//// for that topic. It also starts the routine that listens for messages on that topic.
830func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
831 var pConsumers []sarama.PartitionConsumer
832 var err error
833
834 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
835 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
836 return nil, err
837 }
838
839 consumersIf := make([]interface{}, 0)
840 for _, pConsumer := range pConsumers {
841 consumersIf = append(consumersIf, pConsumer)
842 }
843
844 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
845 // unbuffered to verify race conditions.
846 consumerListeningChannel := make(chan *ic.InterContainerMessage)
847 cc := &consumerChannels{
848 consumers: consumersIf,
849 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
850 }
851
852 // Add the consumers channel to the map
853 sc.addTopicToConsumerChannelMap(topic.Name, cc)
854
855 //Start a consumers to listen on that specific topic
856 go sc.startConsumers(topic)
857
858 return consumerListeningChannel, nil
859}
860
861// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
862// for that topic. It also starts the routine that listens for messages on that topic.
863func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
864 // TODO: Replace this development partition consumers with a group consumers
865 var pConsumer *scc.Consumer
866 var err error
867 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
868 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
869 return nil, err
870 }
871 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
872 // unbuffered to verify race conditions.
873 consumerListeningChannel := make(chan *ic.InterContainerMessage)
874 cc := &consumerChannels{
875 consumers: []interface{}{pConsumer},
876 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
877 }
878
879 // Add the consumers channel to the map
880 sc.addTopicToConsumerChannelMap(topic.Name, cc)
881
882 //Start a consumers to listen on that specific topic
883 go sc.startConsumers(topic)
884
885 return consumerListeningChannel, nil
886}
887
888func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
889 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
890 partitionList, err := sc.consumer.Partitions(topic.Name)
891 if err != nil {
892 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
893 return nil, err
894 }
895
896 pConsumers := make([]sarama.PartitionConsumer, 0)
897 for _, partition := range partitionList {
898 var pConsumer sarama.PartitionConsumer
899 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
900 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
901 return nil, err
902 }
903 pConsumers = append(pConsumers, pConsumer)
904 }
905 return pConsumers, nil
906}
907
908func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
909 var i int
910 var channel chan *ic.InterContainerMessage
911 for i, channel = range channels {
912 if channel == ch {
913 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
914 close(channel)
915 log.Debug("channel-closed")
916 return channels[:len(channels)-1]
917 }
918 }
919 return channels
920}
921
William Kurkianea869482019-04-09 15:16:11 -0400922func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
923 sc.lockOfGroupConsumers.Lock()
924 defer sc.lockOfGroupConsumers.Unlock()
925 if _, exist := sc.groupConsumers[topic]; !exist {
926 sc.groupConsumers[topic] = consumer
927 }
928}
929
930func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
931 sc.lockOfGroupConsumers.Lock()
932 defer sc.lockOfGroupConsumers.Unlock()
933 if _, exist := sc.groupConsumers[topic]; exist {
934 consumer := sc.groupConsumers[topic]
935 delete(sc.groupConsumers, topic)
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400936 if err := consumer.Close(); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400937 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
938 return err
939 }
940 }
941 return nil
942}