blob: 0576da961d7004bfb60c42e643d29a9f309103bd [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
Mahir Gunyele77977b2019-06-27 05:36:22 -070021 "strings"
22 "sync"
23 "time"
24
William Kurkianea869482019-04-09 15:16:11 -040025 scc "github.com/bsm/sarama-cluster"
26 "github.com/golang/protobuf/proto"
27 "github.com/google/uuid"
28 "github.com/opencord/voltha-go/common/log"
29 ic "github.com/opencord/voltha-protos/go/inter_container"
30 "gopkg.in/Shopify/sarama.v1"
William Kurkianea869482019-04-09 15:16:11 -040031)
32
33func init() {
34 log.AddPackage(log.JSON, log.DebugLevel, nil)
35}
36
37type returnErrorFunction func() error
38
39// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
40// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
41//consumer or a group consumer
42type consumerChannels struct {
43 consumers []interface{}
44 channels []chan *ic.InterContainerMessage
45}
46
47// SaramaClient represents the messaging proxy
48type SaramaClient struct {
49 cAdmin sarama.ClusterAdmin
50 client sarama.Client
51 KafkaHost string
52 KafkaPort int
53 producer sarama.AsyncProducer
54 consumer sarama.Consumer
55 groupConsumers map[string]*scc.Consumer
Matt Jeanneret384d8c92019-05-06 14:27:31 -040056 lockOfGroupConsumers sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040057 consumerGroupPrefix string
58 consumerType int
59 consumerGroupName string
60 producerFlushFrequency int
61 producerFlushMessages int
62 producerFlushMaxmessages int
63 producerRetryMax int
64 producerRetryBackOff time.Duration
65 producerReturnSuccess bool
66 producerReturnErrors bool
67 consumerMaxwait int
68 maxProcessingTime int
69 numPartitions int
70 numReplicas int
71 autoCreateTopic bool
72 doneCh chan int
73 topicToConsumerChannelMap map[string]*consumerChannels
74 lockTopicToConsumerChannelMap sync.RWMutex
75 topicLockMap map[string]*sync.RWMutex
76 lockOfTopicLockMap sync.RWMutex
Mahir Gunyele77977b2019-06-27 05:36:22 -070077 metadataMaxRetry int
William Kurkianea869482019-04-09 15:16:11 -040078}
79
80type SaramaClientOption func(*SaramaClient)
81
82func Host(host string) SaramaClientOption {
83 return func(args *SaramaClient) {
84 args.KafkaHost = host
85 }
86}
87
88func Port(port int) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaPort = port
91 }
92}
93
94func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
Mahir Gunyele77977b2019-06-27 05:36:22 -0700184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
William Kurkianea869482019-04-09 15:16:11 -0400190func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
191 client := &SaramaClient{
192 KafkaHost: DefaultKafkaHost,
193 KafkaPort: DefaultKafkaPort,
194 }
195 client.consumerType = DefaultConsumerType
196 client.producerFlushFrequency = DefaultProducerFlushFrequency
197 client.producerFlushMessages = DefaultProducerFlushMessages
198 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
199 client.producerReturnErrors = DefaultProducerReturnErrors
200 client.producerReturnSuccess = DefaultProducerReturnSuccess
201 client.producerRetryMax = DefaultProducerRetryMax
202 client.producerRetryBackOff = DefaultProducerRetryBackoff
203 client.consumerMaxwait = DefaultConsumerMaxwait
204 client.maxProcessingTime = DefaultMaxProcessingTime
205 client.numPartitions = DefaultNumberPartitions
206 client.numReplicas = DefaultNumberReplicas
207 client.autoCreateTopic = DefaultAutoCreateTopic
Mahir Gunyele77977b2019-06-27 05:36:22 -0700208 client.metadataMaxRetry = DefaultMetadataMaxRetry
William Kurkianea869482019-04-09 15:16:11 -0400209
210 for _, option := range opts {
211 option(client)
212 }
213
214 client.groupConsumers = make(map[string]*scc.Consumer)
215
216 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
217 client.topicLockMap = make(map[string]*sync.RWMutex)
218 client.lockOfTopicLockMap = sync.RWMutex{}
219 client.lockOfGroupConsumers = sync.RWMutex{}
220 return client
221}
222
223func (sc *SaramaClient) Start() error {
224 log.Info("Starting-kafka-sarama-client")
225
226 // Create the Done channel
227 sc.doneCh = make(chan int, 1)
228
229 var err error
230
231 // Create the Cluster Admin
232 if err = sc.createClusterAdmin(); err != nil {
233 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
234 return err
235 }
236
237 // Create the Publisher
238 if err := sc.createPublisher(); err != nil {
239 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
240 return err
241 }
242
243 if sc.consumerType == DefaultConsumerType {
244 // Create the master consumers
245 if err := sc.createConsumer(); err != nil {
246 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
247 return err
248 }
249 }
250
251 // Create the topic to consumers/channel map
252 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
253
254 log.Info("kafka-sarama-client-started")
255
256 return nil
257}
258
259func (sc *SaramaClient) Stop() {
260 log.Info("stopping-sarama-client")
261
262 //Send a message over the done channel to close all long running routines
263 sc.doneCh <- 1
264
265 if sc.producer != nil {
266 if err := sc.producer.Close(); err != nil {
267 log.Errorw("closing-producer-failed", log.Fields{"error": err})
268 }
269 }
270
271 if sc.consumer != nil {
272 if err := sc.consumer.Close(); err != nil {
273 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
274 }
275 }
276
277 for key, val := range sc.groupConsumers {
278 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
279 if err := val.Close(); err != nil {
280 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
281 }
282 }
283
284 if sc.cAdmin != nil {
285 if err := sc.cAdmin.Close(); err != nil {
286 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
287 }
288 }
289
290 //TODO: Clear the consumers map
291 //sc.clearConsumerChannelMap()
292
293 log.Info("sarama-client-stopped")
294}
295
296//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
297// the invoking function must hold the lock
298func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
299 // Set the topic details
300 topicDetail := &sarama.TopicDetail{}
301 topicDetail.NumPartitions = int32(numPartition)
302 topicDetail.ReplicationFactor = int16(repFactor)
303 topicDetail.ConfigEntries = make(map[string]*string)
304 topicDetails := make(map[string]*sarama.TopicDetail)
305 topicDetails[topic.Name] = topicDetail
306
307 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
308 if err == sarama.ErrTopicAlreadyExists {
309 // Not an error
310 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
311 return nil
312 }
313 log.Errorw("create-topic-failure", log.Fields{"error": err})
314 return err
315 }
316 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
317 // do so.
318 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
319 return nil
320}
321
322//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
323// ensure no two go routines are performing operations on the same topic
324func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
325 sc.lockTopic(topic)
326 defer sc.unLockTopic(topic)
327
328 return sc.createTopic(topic, numPartition, repFactor)
329}
330
331//DeleteTopic removes a topic from the kafka Broker
332func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
333 sc.lockTopic(topic)
334 defer sc.unLockTopic(topic)
335
336 // Remove the topic from the broker
337 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
338 if err == sarama.ErrUnknownTopicOrPartition {
339 // Not an error as does not exist
340 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
341 return nil
342 }
343 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
344 return err
345 }
346
347 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
348 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
349 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
350 return err
351 }
352 return nil
353}
354
355// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
356// messages from that topic
357func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
358 sc.lockTopic(topic)
359 defer sc.unLockTopic(topic)
360
361 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
362
363 // If a consumers already exist for that topic then resuse it
364 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
365 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
366 // Create a channel specific for that consumers and add it to the consumers channel map
367 ch := make(chan *ic.InterContainerMessage)
368 sc.addChannelToConsumerChannelMap(topic, ch)
369 return ch, nil
370 }
371
372 // Register for the topic and set it up
373 var consumerListeningChannel chan *ic.InterContainerMessage
374 var err error
375
376 // Use the consumerType option to figure out the type of consumer to launch
377 if sc.consumerType == PartitionConsumer {
378 if sc.autoCreateTopic {
379 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
380 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
381 return nil, err
382 }
383 }
384 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
385 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
386 return nil, err
387 }
388 } else if sc.consumerType == GroupCustomer {
389 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
390 // does not consume from a precreated topic in some scenarios
391 //if sc.autoCreateTopic {
392 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
393 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
394 // return nil, err
395 // }
396 //}
397 //groupId := sc.consumerGroupName
398 groupId := getGroupId(kvArgs...)
399 // Include the group prefix
400 if groupId != "" {
401 groupId = sc.consumerGroupPrefix + groupId
402 } else {
403 // Need to use a unique group Id per topic
404 groupId = sc.consumerGroupPrefix + topic.Name
405 }
406 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
407 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
408 return nil, err
409 }
410
411 } else {
412 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
413 return nil, errors.New("unknown-consumer-type")
414 }
415
416 return consumerListeningChannel, nil
417}
418
419//UnSubscribe unsubscribe a consumer from a given topic
420func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
421 sc.lockTopic(topic)
422 defer sc.unLockTopic(topic)
423
424 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
425 var err error
426 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
427 log.Errorw("failed-removing-channel", log.Fields{"error": err})
428 }
429 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
430 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
431 }
432 return err
433}
434
435// send formats and sends the request onto the kafka messaging bus.
436func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
437
438 // Assert message is a proto message
439 var protoMsg proto.Message
440 var ok bool
441 // ascertain the value interface type is a proto.Message
442 if protoMsg, ok = msg.(proto.Message); !ok {
443 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
444 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
445 }
446
447 var marshalled []byte
448 var err error
449 // Create the Sarama producer message
450 if marshalled, err = proto.Marshal(protoMsg); err != nil {
451 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
452 return err
453 }
454 key := ""
455 if len(keys) > 0 {
456 key = keys[0] // Only the first key is relevant
457 }
458 kafkaMsg := &sarama.ProducerMessage{
459 Topic: topic.Name,
460 Key: sarama.StringEncoder(key),
461 Value: sarama.ByteEncoder(marshalled),
462 }
463
464 // Send message to kafka
465 sc.producer.Input() <- kafkaMsg
William Kurkianea869482019-04-09 15:16:11 -0400466 // Wait for result
467 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
468 select {
469 case ok := <-sc.producer.Successes():
470 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
471 case notOk := <-sc.producer.Errors():
472 log.Debugw("error-sending", log.Fields{"status": notOk})
473 return notOk
474 }
475 return nil
476}
477
478// getGroupId returns the group id from the key-value args.
479func getGroupId(kvArgs ...*KVArg) string {
480 for _, arg := range kvArgs {
481 if arg.Key == GroupIdKey {
482 return arg.Value.(string)
483 }
484 }
485 return ""
486}
487
488// getOffset returns the offset from the key-value args.
489func getOffset(kvArgs ...*KVArg) int64 {
490 for _, arg := range kvArgs {
491 if arg.Key == Offset {
492 return arg.Value.(int64)
493 }
494 }
495 return sarama.OffsetNewest
496}
497
498func (sc *SaramaClient) createClusterAdmin() error {
499 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
500 config := sarama.NewConfig()
501 config.Version = sarama.V1_0_0_0
502
503 // Create a cluster Admin
504 var cAdmin sarama.ClusterAdmin
505 var err error
506 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
507 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
508 return err
509 }
510 sc.cAdmin = cAdmin
511 return nil
512}
513
514func (sc *SaramaClient) lockTopic(topic *Topic) {
515 sc.lockOfTopicLockMap.Lock()
516 if _, exist := sc.topicLockMap[topic.Name]; exist {
517 sc.lockOfTopicLockMap.Unlock()
518 sc.topicLockMap[topic.Name].Lock()
519 } else {
520 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
521 sc.lockOfTopicLockMap.Unlock()
522 sc.topicLockMap[topic.Name].Lock()
523 }
524}
525
526func (sc *SaramaClient) unLockTopic(topic *Topic) {
527 sc.lockOfTopicLockMap.Lock()
528 defer sc.lockOfTopicLockMap.Unlock()
529 if _, exist := sc.topicLockMap[topic.Name]; exist {
530 sc.topicLockMap[topic.Name].Unlock()
531 }
532}
533
534func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
535 sc.lockTopicToConsumerChannelMap.Lock()
536 defer sc.lockTopicToConsumerChannelMap.Unlock()
537 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
538 sc.topicToConsumerChannelMap[id] = arg
539 }
540}
541
542func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
543 sc.lockTopicToConsumerChannelMap.Lock()
544 defer sc.lockTopicToConsumerChannelMap.Unlock()
545 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
546 delete(sc.topicToConsumerChannelMap, id)
547 }
548}
549
550func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
551 sc.lockTopicToConsumerChannelMap.RLock()
552 defer sc.lockTopicToConsumerChannelMap.RUnlock()
553
554 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
555 return consumerCh
556 }
557 return nil
558}
559
560func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
561 sc.lockTopicToConsumerChannelMap.Lock()
562 defer sc.lockTopicToConsumerChannelMap.Unlock()
563 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
564 consumerCh.channels = append(consumerCh.channels, ch)
565 return
566 }
567 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
568}
569
570//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
571func closeConsumers(consumers []interface{}) error {
572 var err error
573 for _, consumer := range consumers {
574 // Is it a partition consumers?
575 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
576 if errTemp := partionConsumer.Close(); errTemp != nil {
577 log.Debugw("partition!!!", log.Fields{"err": errTemp})
578 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
579 // This can occur on race condition
580 err = nil
581 } else {
582 err = errTemp
583 }
584 }
585 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
586 if errTemp := groupConsumer.Close(); errTemp != nil {
587 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
588 // This can occur on race condition
589 err = nil
590 } else {
591 err = errTemp
592 }
593 }
594 }
595 }
596 return err
597}
598
599func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
600 sc.lockTopicToConsumerChannelMap.Lock()
601 defer sc.lockTopicToConsumerChannelMap.Unlock()
602 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
603 // Channel will be closed in the removeChannel method
604 consumerCh.channels = removeChannel(consumerCh.channels, ch)
605 // If there are no more channels then we can close the consumers itself
606 if len(consumerCh.channels) == 0 {
607 log.Debugw("closing-consumers", log.Fields{"topic": topic})
608 err := closeConsumers(consumerCh.consumers)
609 //err := consumerCh.consumers.Close()
610 delete(sc.topicToConsumerChannelMap, topic.Name)
611 return err
612 }
613 return nil
614 }
615 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
616 return errors.New("topic-does-not-exist")
617}
618
619func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
620 sc.lockTopicToConsumerChannelMap.Lock()
621 defer sc.lockTopicToConsumerChannelMap.Unlock()
622 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
623 for _, ch := range consumerCh.channels {
624 // Channel will be closed in the removeChannel method
625 removeChannel(consumerCh.channels, ch)
626 }
627 err := closeConsumers(consumerCh.consumers)
628 //if err == sarama.ErrUnknownTopicOrPartition {
629 // // Not an error
630 // err = nil
631 //}
632 //err := consumerCh.consumers.Close()
633 delete(sc.topicToConsumerChannelMap, topic.Name)
634 return err
635 }
636 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
637 return nil
638}
639
640func (sc *SaramaClient) clearConsumerChannelMap() error {
641 sc.lockTopicToConsumerChannelMap.Lock()
642 defer sc.lockTopicToConsumerChannelMap.Unlock()
643 var err error
644 for topic, consumerCh := range sc.topicToConsumerChannelMap {
645 for _, ch := range consumerCh.channels {
646 // Channel will be closed in the removeChannel method
647 removeChannel(consumerCh.channels, ch)
648 }
649 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
650 err = errTemp
651 }
652 //err = consumerCh.consumers.Close()
653 delete(sc.topicToConsumerChannelMap, topic)
654 }
655 return err
656}
657
658//createPublisher creates the publisher which is used to send a message onto kafka
659func (sc *SaramaClient) createPublisher() error {
660 // This Creates the publisher
661 config := sarama.NewConfig()
662 config.Producer.Partitioner = sarama.NewRandomPartitioner
663 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
664 config.Producer.Flush.Messages = sc.producerFlushMessages
665 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
666 config.Producer.Return.Errors = sc.producerReturnErrors
667 config.Producer.Return.Successes = sc.producerReturnSuccess
668 //config.Producer.RequiredAcks = sarama.WaitForAll
669 config.Producer.RequiredAcks = sarama.WaitForLocal
670
671 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
672 brokers := []string{kafkaFullAddr}
673
674 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
675 log.Errorw("error-starting-publisher", log.Fields{"error": err})
676 return err
677 } else {
678 sc.producer = producer
679 }
680 log.Info("Kafka-publisher-created")
681 return nil
682}
683
684func (sc *SaramaClient) createConsumer() error {
685 config := sarama.NewConfig()
686 config.Consumer.Return.Errors = true
687 config.Consumer.Fetch.Min = 1
688 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
689 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
690 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Mahir Gunyele77977b2019-06-27 05:36:22 -0700691 config.Metadata.Retry.Max = sc.metadataMaxRetry
William Kurkianea869482019-04-09 15:16:11 -0400692 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
693 brokers := []string{kafkaFullAddr}
694
695 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
696 log.Errorw("error-starting-consumers", log.Fields{"error": err})
697 return err
698 } else {
699 sc.consumer = consumer
700 }
701 log.Info("Kafka-consumers-created")
702 return nil
703}
704
705// createGroupConsumer creates a consumers group
706func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
707 config := scc.NewConfig()
708 config.ClientID = uuid.New().String()
709 config.Group.Mode = scc.ConsumerModeMultiplex
710 //config.Consumer.Return.Errors = true
711 //config.Group.Return.Notifications = false
712 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
713 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
714 config.Consumer.Offsets.Initial = initialOffset
715 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
716 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
717 brokers := []string{kafkaFullAddr}
718
719 topics := []string{topic.Name}
720 var consumer *scc.Consumer
721 var err error
722
723 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
724 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
725 return nil, err
726 }
727 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
728
729 //sc.groupConsumers[topic.Name] = consumer
730 sc.addToGroupConsumers(topic.Name, consumer)
731 return consumer, nil
732}
733
734// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
735// topic via the unique channel each subscriber received during subscription
736func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
737 // Need to go over all channels and publish messages to them - do we need to copy msg?
738 sc.lockTopicToConsumerChannelMap.RLock()
739 defer sc.lockTopicToConsumerChannelMap.RUnlock()
740 for _, ch := range consumerCh.channels {
741 go func(c chan *ic.InterContainerMessage) {
742 c <- protoMessage
743 }(ch)
744 }
745}
746
747func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
748 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
749startloop:
750 for {
751 select {
752 case err, ok := <-consumer.Errors():
753 if ok {
754 log.Warnw("partition-consumers-error", log.Fields{"error": err})
755 } else {
756 // Channel is closed
757 break startloop
758 }
759 case msg, ok := <-consumer.Messages():
760 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
761 if !ok {
762 // channel is closed
763 break startloop
764 }
765 msgBody := msg.Value
766 icm := &ic.InterContainerMessage{}
767 if err := proto.Unmarshal(msgBody, icm); err != nil {
768 log.Warnw("partition-invalid-message", log.Fields{"error": err})
769 continue
770 }
771 go sc.dispatchToConsumers(consumerChnls, icm)
772 case <-sc.doneCh:
773 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
774 break startloop
775 }
776 }
777 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
778}
779
780func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
781 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
782
783startloop:
784 for {
785 select {
786 case err, ok := <-consumer.Errors():
787 if ok {
788 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
789 } else {
790 // channel is closed
791 break startloop
792 }
793 case msg, ok := <-consumer.Messages():
794 if !ok {
795 // Channel closed
796 break startloop
797 }
798 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
799 msgBody := msg.Value
800 icm := &ic.InterContainerMessage{}
801 if err := proto.Unmarshal(msgBody, icm); err != nil {
802 log.Warnw("invalid-message", log.Fields{"error": err})
803 continue
804 }
805 go sc.dispatchToConsumers(consumerChnls, icm)
806 consumer.MarkOffset(msg, "")
807 case ntf := <-consumer.Notifications():
808 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
809 case <-sc.doneCh:
810 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
811 break startloop
812 }
813 }
814 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
815}
816
817func (sc *SaramaClient) startConsumers(topic *Topic) error {
818 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
819 var consumerCh *consumerChannels
820 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
821 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
822 return errors.New("consumers-not-exist")
823 }
824 // For each consumer listening for that topic, start a consumption loop
825 for _, consumer := range consumerCh.consumers {
826 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
827 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
828 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
829 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
830 } else {
831 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
832 return errors.New("invalid-consumer")
833 }
834 }
835 return nil
836}
837
838//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
839//// for that topic. It also starts the routine that listens for messages on that topic.
840func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
841 var pConsumers []sarama.PartitionConsumer
842 var err error
843
844 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
845 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
846 return nil, err
847 }
848
849 consumersIf := make([]interface{}, 0)
850 for _, pConsumer := range pConsumers {
851 consumersIf = append(consumersIf, pConsumer)
852 }
853
854 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
855 // unbuffered to verify race conditions.
856 consumerListeningChannel := make(chan *ic.InterContainerMessage)
857 cc := &consumerChannels{
858 consumers: consumersIf,
859 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
860 }
861
862 // Add the consumers channel to the map
863 sc.addTopicToConsumerChannelMap(topic.Name, cc)
864
865 //Start a consumers to listen on that specific topic
866 go sc.startConsumers(topic)
867
868 return consumerListeningChannel, nil
869}
870
871// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
872// for that topic. It also starts the routine that listens for messages on that topic.
873func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
874 // TODO: Replace this development partition consumers with a group consumers
875 var pConsumer *scc.Consumer
876 var err error
877 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
878 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
879 return nil, err
880 }
881 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
882 // unbuffered to verify race conditions.
883 consumerListeningChannel := make(chan *ic.InterContainerMessage)
884 cc := &consumerChannels{
885 consumers: []interface{}{pConsumer},
886 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
887 }
888
889 // Add the consumers channel to the map
890 sc.addTopicToConsumerChannelMap(topic.Name, cc)
891
892 //Start a consumers to listen on that specific topic
893 go sc.startConsumers(topic)
894
895 return consumerListeningChannel, nil
896}
897
898func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
899 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
900 partitionList, err := sc.consumer.Partitions(topic.Name)
901 if err != nil {
902 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
903 return nil, err
904 }
905
906 pConsumers := make([]sarama.PartitionConsumer, 0)
907 for _, partition := range partitionList {
908 var pConsumer sarama.PartitionConsumer
909 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
910 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
911 return nil, err
912 }
913 pConsumers = append(pConsumers, pConsumer)
914 }
915 return pConsumers, nil
916}
917
918func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
919 var i int
920 var channel chan *ic.InterContainerMessage
921 for i, channel = range channels {
922 if channel == ch {
923 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
924 close(channel)
925 log.Debug("channel-closed")
926 return channels[:len(channels)-1]
927 }
928 }
929 return channels
930}
931
William Kurkianea869482019-04-09 15:16:11 -0400932func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
933 sc.lockOfGroupConsumers.Lock()
934 defer sc.lockOfGroupConsumers.Unlock()
935 if _, exist := sc.groupConsumers[topic]; !exist {
936 sc.groupConsumers[topic] = consumer
937 }
938}
939
940func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
941 sc.lockOfGroupConsumers.Lock()
942 defer sc.lockOfGroupConsumers.Unlock()
943 if _, exist := sc.groupConsumers[topic]; exist {
944 consumer := sc.groupConsumers[topic]
945 delete(sc.groupConsumers, topic)
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400946 if err := consumer.Close(); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400947 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
948 return err
949 }
950 }
951 return nil
952}