blob: 8037002e6df7d60b018c7921b9905b0dca95212c [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
Mahir Gunyele77977b2019-06-27 05:36:22 -070021 "strings"
22 "sync"
23 "time"
24
William Kurkianea869482019-04-09 15:16:11 -040025 scc "github.com/bsm/sarama-cluster"
26 "github.com/golang/protobuf/proto"
27 "github.com/google/uuid"
28 "github.com/opencord/voltha-go/common/log"
29 ic "github.com/opencord/voltha-protos/go/inter_container"
30 "gopkg.in/Shopify/sarama.v1"
William Kurkianea869482019-04-09 15:16:11 -040031)
32
33func init() {
34 log.AddPackage(log.JSON, log.DebugLevel, nil)
35}
36
37type returnErrorFunction func() error
38
39// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
40// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
41//consumer or a group consumer
42type consumerChannels struct {
43 consumers []interface{}
44 channels []chan *ic.InterContainerMessage
45}
46
47// SaramaClient represents the messaging proxy
48type SaramaClient struct {
49 cAdmin sarama.ClusterAdmin
50 client sarama.Client
51 KafkaHost string
52 KafkaPort int
53 producer sarama.AsyncProducer
54 consumer sarama.Consumer
55 groupConsumers map[string]*scc.Consumer
Matt Jeanneret384d8c92019-05-06 14:27:31 -040056 lockOfGroupConsumers sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040057 consumerGroupPrefix string
58 consumerType int
59 consumerGroupName string
60 producerFlushFrequency int
61 producerFlushMessages int
62 producerFlushMaxmessages int
63 producerRetryMax int
64 producerRetryBackOff time.Duration
65 producerReturnSuccess bool
66 producerReturnErrors bool
67 consumerMaxwait int
68 maxProcessingTime int
69 numPartitions int
70 numReplicas int
71 autoCreateTopic bool
72 doneCh chan int
73 topicToConsumerChannelMap map[string]*consumerChannels
74 lockTopicToConsumerChannelMap sync.RWMutex
75 topicLockMap map[string]*sync.RWMutex
76 lockOfTopicLockMap sync.RWMutex
Mahir Gunyele77977b2019-06-27 05:36:22 -070077 metadataMaxRetry int
William Kurkianea869482019-04-09 15:16:11 -040078}
79
80type SaramaClientOption func(*SaramaClient)
81
82func Host(host string) SaramaClientOption {
83 return func(args *SaramaClient) {
84 args.KafkaHost = host
85 }
86}
87
88func Port(port int) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaPort = port
91 }
92}
93
94func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
Mahir Gunyele77977b2019-06-27 05:36:22 -0700184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
William Kurkianea869482019-04-09 15:16:11 -0400190func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
191 client := &SaramaClient{
192 KafkaHost: DefaultKafkaHost,
193 KafkaPort: DefaultKafkaPort,
194 }
195 client.consumerType = DefaultConsumerType
196 client.producerFlushFrequency = DefaultProducerFlushFrequency
197 client.producerFlushMessages = DefaultProducerFlushMessages
198 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
199 client.producerReturnErrors = DefaultProducerReturnErrors
200 client.producerReturnSuccess = DefaultProducerReturnSuccess
201 client.producerRetryMax = DefaultProducerRetryMax
202 client.producerRetryBackOff = DefaultProducerRetryBackoff
203 client.consumerMaxwait = DefaultConsumerMaxwait
204 client.maxProcessingTime = DefaultMaxProcessingTime
205 client.numPartitions = DefaultNumberPartitions
206 client.numReplicas = DefaultNumberReplicas
207 client.autoCreateTopic = DefaultAutoCreateTopic
Mahir Gunyele77977b2019-06-27 05:36:22 -0700208 client.metadataMaxRetry = DefaultMetadataMaxRetry
William Kurkianea869482019-04-09 15:16:11 -0400209
210 for _, option := range opts {
211 option(client)
212 }
213
214 client.groupConsumers = make(map[string]*scc.Consumer)
215
216 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
217 client.topicLockMap = make(map[string]*sync.RWMutex)
218 client.lockOfTopicLockMap = sync.RWMutex{}
219 client.lockOfGroupConsumers = sync.RWMutex{}
220 return client
221}
222
223func (sc *SaramaClient) Start() error {
224 log.Info("Starting-kafka-sarama-client")
225
226 // Create the Done channel
227 sc.doneCh = make(chan int, 1)
228
229 var err error
230
Devmalya Paul495b94a2019-08-27 19:42:00 -0400231 // Add a cleanup in case of failure to startup
232 defer func() {
233 if err != nil {
234 sc.Stop()
235 }
236 }()
237
William Kurkianea869482019-04-09 15:16:11 -0400238 // Create the Cluster Admin
239 if err = sc.createClusterAdmin(); err != nil {
240 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
241 return err
242 }
243
244 // Create the Publisher
245 if err := sc.createPublisher(); err != nil {
246 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
247 return err
248 }
249
250 if sc.consumerType == DefaultConsumerType {
251 // Create the master consumers
252 if err := sc.createConsumer(); err != nil {
253 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
254 return err
255 }
256 }
257
258 // Create the topic to consumers/channel map
259 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
260
261 log.Info("kafka-sarama-client-started")
262
263 return nil
264}
265
266func (sc *SaramaClient) Stop() {
267 log.Info("stopping-sarama-client")
268
269 //Send a message over the done channel to close all long running routines
270 sc.doneCh <- 1
271
272 if sc.producer != nil {
273 if err := sc.producer.Close(); err != nil {
274 log.Errorw("closing-producer-failed", log.Fields{"error": err})
275 }
276 }
277
278 if sc.consumer != nil {
279 if err := sc.consumer.Close(); err != nil {
280 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
281 }
282 }
283
284 for key, val := range sc.groupConsumers {
285 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
286 if err := val.Close(); err != nil {
287 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
288 }
289 }
290
291 if sc.cAdmin != nil {
292 if err := sc.cAdmin.Close(); err != nil {
293 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
294 }
295 }
296
297 //TODO: Clear the consumers map
298 //sc.clearConsumerChannelMap()
299
300 log.Info("sarama-client-stopped")
301}
302
303//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
304// the invoking function must hold the lock
305func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
306 // Set the topic details
307 topicDetail := &sarama.TopicDetail{}
308 topicDetail.NumPartitions = int32(numPartition)
309 topicDetail.ReplicationFactor = int16(repFactor)
310 topicDetail.ConfigEntries = make(map[string]*string)
311 topicDetails := make(map[string]*sarama.TopicDetail)
312 topicDetails[topic.Name] = topicDetail
313
314 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
315 if err == sarama.ErrTopicAlreadyExists {
316 // Not an error
317 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
318 return nil
319 }
320 log.Errorw("create-topic-failure", log.Fields{"error": err})
321 return err
322 }
323 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
324 // do so.
325 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
326 return nil
327}
328
329//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
330// ensure no two go routines are performing operations on the same topic
331func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
332 sc.lockTopic(topic)
333 defer sc.unLockTopic(topic)
334
335 return sc.createTopic(topic, numPartition, repFactor)
336}
337
338//DeleteTopic removes a topic from the kafka Broker
339func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
340 sc.lockTopic(topic)
341 defer sc.unLockTopic(topic)
342
343 // Remove the topic from the broker
344 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
345 if err == sarama.ErrUnknownTopicOrPartition {
346 // Not an error as does not exist
347 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
348 return nil
349 }
350 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
351 return err
352 }
353
354 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
355 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
356 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
357 return err
358 }
359 return nil
360}
361
362// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
363// messages from that topic
364func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
365 sc.lockTopic(topic)
366 defer sc.unLockTopic(topic)
367
368 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
369
370 // If a consumers already exist for that topic then resuse it
371 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
372 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
373 // Create a channel specific for that consumers and add it to the consumers channel map
374 ch := make(chan *ic.InterContainerMessage)
375 sc.addChannelToConsumerChannelMap(topic, ch)
376 return ch, nil
377 }
378
379 // Register for the topic and set it up
380 var consumerListeningChannel chan *ic.InterContainerMessage
381 var err error
382
383 // Use the consumerType option to figure out the type of consumer to launch
384 if sc.consumerType == PartitionConsumer {
385 if sc.autoCreateTopic {
386 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
387 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
388 return nil, err
389 }
390 }
391 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
392 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
393 return nil, err
394 }
395 } else if sc.consumerType == GroupCustomer {
396 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
397 // does not consume from a precreated topic in some scenarios
398 //if sc.autoCreateTopic {
399 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
400 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
401 // return nil, err
402 // }
403 //}
404 //groupId := sc.consumerGroupName
405 groupId := getGroupId(kvArgs...)
406 // Include the group prefix
407 if groupId != "" {
408 groupId = sc.consumerGroupPrefix + groupId
409 } else {
410 // Need to use a unique group Id per topic
411 groupId = sc.consumerGroupPrefix + topic.Name
412 }
413 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
414 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
415 return nil, err
416 }
417
418 } else {
419 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
420 return nil, errors.New("unknown-consumer-type")
421 }
422
423 return consumerListeningChannel, nil
424}
425
426//UnSubscribe unsubscribe a consumer from a given topic
427func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
428 sc.lockTopic(topic)
429 defer sc.unLockTopic(topic)
430
431 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
432 var err error
433 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
434 log.Errorw("failed-removing-channel", log.Fields{"error": err})
435 }
436 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
437 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
438 }
439 return err
440}
441
442// send formats and sends the request onto the kafka messaging bus.
443func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
444
445 // Assert message is a proto message
446 var protoMsg proto.Message
447 var ok bool
448 // ascertain the value interface type is a proto.Message
449 if protoMsg, ok = msg.(proto.Message); !ok {
450 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
451 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
452 }
453
454 var marshalled []byte
455 var err error
456 // Create the Sarama producer message
457 if marshalled, err = proto.Marshal(protoMsg); err != nil {
458 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
459 return err
460 }
461 key := ""
462 if len(keys) > 0 {
463 key = keys[0] // Only the first key is relevant
464 }
465 kafkaMsg := &sarama.ProducerMessage{
466 Topic: topic.Name,
467 Key: sarama.StringEncoder(key),
468 Value: sarama.ByteEncoder(marshalled),
469 }
470
471 // Send message to kafka
472 sc.producer.Input() <- kafkaMsg
William Kurkianea869482019-04-09 15:16:11 -0400473 // Wait for result
474 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
475 select {
476 case ok := <-sc.producer.Successes():
477 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
478 case notOk := <-sc.producer.Errors():
479 log.Debugw("error-sending", log.Fields{"status": notOk})
480 return notOk
481 }
482 return nil
483}
484
485// getGroupId returns the group id from the key-value args.
486func getGroupId(kvArgs ...*KVArg) string {
487 for _, arg := range kvArgs {
488 if arg.Key == GroupIdKey {
489 return arg.Value.(string)
490 }
491 }
492 return ""
493}
494
495// getOffset returns the offset from the key-value args.
496func getOffset(kvArgs ...*KVArg) int64 {
497 for _, arg := range kvArgs {
498 if arg.Key == Offset {
499 return arg.Value.(int64)
500 }
501 }
502 return sarama.OffsetNewest
503}
504
505func (sc *SaramaClient) createClusterAdmin() error {
506 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
507 config := sarama.NewConfig()
508 config.Version = sarama.V1_0_0_0
509
510 // Create a cluster Admin
511 var cAdmin sarama.ClusterAdmin
512 var err error
513 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
514 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
515 return err
516 }
517 sc.cAdmin = cAdmin
518 return nil
519}
520
521func (sc *SaramaClient) lockTopic(topic *Topic) {
522 sc.lockOfTopicLockMap.Lock()
523 if _, exist := sc.topicLockMap[topic.Name]; exist {
524 sc.lockOfTopicLockMap.Unlock()
525 sc.topicLockMap[topic.Name].Lock()
526 } else {
527 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
528 sc.lockOfTopicLockMap.Unlock()
529 sc.topicLockMap[topic.Name].Lock()
530 }
531}
532
533func (sc *SaramaClient) unLockTopic(topic *Topic) {
534 sc.lockOfTopicLockMap.Lock()
535 defer sc.lockOfTopicLockMap.Unlock()
536 if _, exist := sc.topicLockMap[topic.Name]; exist {
537 sc.topicLockMap[topic.Name].Unlock()
538 }
539}
540
541func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
542 sc.lockTopicToConsumerChannelMap.Lock()
543 defer sc.lockTopicToConsumerChannelMap.Unlock()
544 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
545 sc.topicToConsumerChannelMap[id] = arg
546 }
547}
548
549func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
550 sc.lockTopicToConsumerChannelMap.Lock()
551 defer sc.lockTopicToConsumerChannelMap.Unlock()
552 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
553 delete(sc.topicToConsumerChannelMap, id)
554 }
555}
556
557func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
558 sc.lockTopicToConsumerChannelMap.RLock()
559 defer sc.lockTopicToConsumerChannelMap.RUnlock()
560
561 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
562 return consumerCh
563 }
564 return nil
565}
566
567func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
568 sc.lockTopicToConsumerChannelMap.Lock()
569 defer sc.lockTopicToConsumerChannelMap.Unlock()
570 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
571 consumerCh.channels = append(consumerCh.channels, ch)
572 return
573 }
574 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
575}
576
577//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
578func closeConsumers(consumers []interface{}) error {
579 var err error
580 for _, consumer := range consumers {
581 // Is it a partition consumers?
582 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
583 if errTemp := partionConsumer.Close(); errTemp != nil {
584 log.Debugw("partition!!!", log.Fields{"err": errTemp})
585 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
586 // This can occur on race condition
587 err = nil
588 } else {
589 err = errTemp
590 }
591 }
592 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
593 if errTemp := groupConsumer.Close(); errTemp != nil {
594 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
595 // This can occur on race condition
596 err = nil
597 } else {
598 err = errTemp
599 }
600 }
601 }
602 }
603 return err
604}
605
606func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
607 sc.lockTopicToConsumerChannelMap.Lock()
608 defer sc.lockTopicToConsumerChannelMap.Unlock()
609 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
610 // Channel will be closed in the removeChannel method
611 consumerCh.channels = removeChannel(consumerCh.channels, ch)
612 // If there are no more channels then we can close the consumers itself
613 if len(consumerCh.channels) == 0 {
614 log.Debugw("closing-consumers", log.Fields{"topic": topic})
615 err := closeConsumers(consumerCh.consumers)
616 //err := consumerCh.consumers.Close()
617 delete(sc.topicToConsumerChannelMap, topic.Name)
618 return err
619 }
620 return nil
621 }
622 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
623 return errors.New("topic-does-not-exist")
624}
625
626func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
627 sc.lockTopicToConsumerChannelMap.Lock()
628 defer sc.lockTopicToConsumerChannelMap.Unlock()
629 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
630 for _, ch := range consumerCh.channels {
631 // Channel will be closed in the removeChannel method
632 removeChannel(consumerCh.channels, ch)
633 }
634 err := closeConsumers(consumerCh.consumers)
635 //if err == sarama.ErrUnknownTopicOrPartition {
636 // // Not an error
637 // err = nil
638 //}
639 //err := consumerCh.consumers.Close()
640 delete(sc.topicToConsumerChannelMap, topic.Name)
641 return err
642 }
643 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
644 return nil
645}
646
647func (sc *SaramaClient) clearConsumerChannelMap() error {
648 sc.lockTopicToConsumerChannelMap.Lock()
649 defer sc.lockTopicToConsumerChannelMap.Unlock()
650 var err error
651 for topic, consumerCh := range sc.topicToConsumerChannelMap {
652 for _, ch := range consumerCh.channels {
653 // Channel will be closed in the removeChannel method
654 removeChannel(consumerCh.channels, ch)
655 }
656 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
657 err = errTemp
658 }
659 //err = consumerCh.consumers.Close()
660 delete(sc.topicToConsumerChannelMap, topic)
661 }
662 return err
663}
664
665//createPublisher creates the publisher which is used to send a message onto kafka
666func (sc *SaramaClient) createPublisher() error {
667 // This Creates the publisher
668 config := sarama.NewConfig()
669 config.Producer.Partitioner = sarama.NewRandomPartitioner
670 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
671 config.Producer.Flush.Messages = sc.producerFlushMessages
672 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
673 config.Producer.Return.Errors = sc.producerReturnErrors
674 config.Producer.Return.Successes = sc.producerReturnSuccess
675 //config.Producer.RequiredAcks = sarama.WaitForAll
676 config.Producer.RequiredAcks = sarama.WaitForLocal
677
678 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
679 brokers := []string{kafkaFullAddr}
680
681 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
682 log.Errorw("error-starting-publisher", log.Fields{"error": err})
683 return err
684 } else {
685 sc.producer = producer
686 }
687 log.Info("Kafka-publisher-created")
688 return nil
689}
690
691func (sc *SaramaClient) createConsumer() error {
692 config := sarama.NewConfig()
693 config.Consumer.Return.Errors = true
694 config.Consumer.Fetch.Min = 1
695 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
696 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
697 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Mahir Gunyele77977b2019-06-27 05:36:22 -0700698 config.Metadata.Retry.Max = sc.metadataMaxRetry
William Kurkianea869482019-04-09 15:16:11 -0400699 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
700 brokers := []string{kafkaFullAddr}
701
702 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
703 log.Errorw("error-starting-consumers", log.Fields{"error": err})
704 return err
705 } else {
706 sc.consumer = consumer
707 }
708 log.Info("Kafka-consumers-created")
709 return nil
710}
711
712// createGroupConsumer creates a consumers group
713func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
714 config := scc.NewConfig()
715 config.ClientID = uuid.New().String()
716 config.Group.Mode = scc.ConsumerModeMultiplex
717 //config.Consumer.Return.Errors = true
718 //config.Group.Return.Notifications = false
719 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
720 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
721 config.Consumer.Offsets.Initial = initialOffset
722 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
723 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
724 brokers := []string{kafkaFullAddr}
725
726 topics := []string{topic.Name}
727 var consumer *scc.Consumer
728 var err error
729
730 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
731 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
732 return nil, err
733 }
734 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
735
736 //sc.groupConsumers[topic.Name] = consumer
737 sc.addToGroupConsumers(topic.Name, consumer)
738 return consumer, nil
739}
740
741// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
742// topic via the unique channel each subscriber received during subscription
743func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
744 // Need to go over all channels and publish messages to them - do we need to copy msg?
745 sc.lockTopicToConsumerChannelMap.RLock()
746 defer sc.lockTopicToConsumerChannelMap.RUnlock()
747 for _, ch := range consumerCh.channels {
748 go func(c chan *ic.InterContainerMessage) {
749 c <- protoMessage
750 }(ch)
751 }
752}
753
754func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
755 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
756startloop:
757 for {
758 select {
759 case err, ok := <-consumer.Errors():
760 if ok {
761 log.Warnw("partition-consumers-error", log.Fields{"error": err})
762 } else {
763 // Channel is closed
764 break startloop
765 }
766 case msg, ok := <-consumer.Messages():
767 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
768 if !ok {
769 // channel is closed
770 break startloop
771 }
772 msgBody := msg.Value
773 icm := &ic.InterContainerMessage{}
774 if err := proto.Unmarshal(msgBody, icm); err != nil {
775 log.Warnw("partition-invalid-message", log.Fields{"error": err})
776 continue
777 }
778 go sc.dispatchToConsumers(consumerChnls, icm)
779 case <-sc.doneCh:
780 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
781 break startloop
782 }
783 }
784 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
785}
786
787func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
788 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
789
790startloop:
791 for {
792 select {
793 case err, ok := <-consumer.Errors():
794 if ok {
795 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
796 } else {
797 // channel is closed
798 break startloop
799 }
800 case msg, ok := <-consumer.Messages():
801 if !ok {
802 // Channel closed
803 break startloop
804 }
805 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
806 msgBody := msg.Value
807 icm := &ic.InterContainerMessage{}
808 if err := proto.Unmarshal(msgBody, icm); err != nil {
809 log.Warnw("invalid-message", log.Fields{"error": err})
810 continue
811 }
812 go sc.dispatchToConsumers(consumerChnls, icm)
813 consumer.MarkOffset(msg, "")
814 case ntf := <-consumer.Notifications():
815 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
816 case <-sc.doneCh:
817 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
818 break startloop
819 }
820 }
821 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
822}
823
824func (sc *SaramaClient) startConsumers(topic *Topic) error {
825 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
826 var consumerCh *consumerChannels
827 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
828 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
829 return errors.New("consumers-not-exist")
830 }
831 // For each consumer listening for that topic, start a consumption loop
832 for _, consumer := range consumerCh.consumers {
833 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
834 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
835 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
836 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
837 } else {
838 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
839 return errors.New("invalid-consumer")
840 }
841 }
842 return nil
843}
844
845//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
846//// for that topic. It also starts the routine that listens for messages on that topic.
847func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
848 var pConsumers []sarama.PartitionConsumer
849 var err error
850
851 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
852 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
853 return nil, err
854 }
855
856 consumersIf := make([]interface{}, 0)
857 for _, pConsumer := range pConsumers {
858 consumersIf = append(consumersIf, pConsumer)
859 }
860
861 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
862 // unbuffered to verify race conditions.
863 consumerListeningChannel := make(chan *ic.InterContainerMessage)
864 cc := &consumerChannels{
865 consumers: consumersIf,
866 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
867 }
868
869 // Add the consumers channel to the map
870 sc.addTopicToConsumerChannelMap(topic.Name, cc)
871
872 //Start a consumers to listen on that specific topic
873 go sc.startConsumers(topic)
874
875 return consumerListeningChannel, nil
876}
877
878// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
879// for that topic. It also starts the routine that listens for messages on that topic.
880func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
881 // TODO: Replace this development partition consumers with a group consumers
882 var pConsumer *scc.Consumer
883 var err error
884 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
885 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
886 return nil, err
887 }
888 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
889 // unbuffered to verify race conditions.
890 consumerListeningChannel := make(chan *ic.InterContainerMessage)
891 cc := &consumerChannels{
892 consumers: []interface{}{pConsumer},
893 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
894 }
895
896 // Add the consumers channel to the map
897 sc.addTopicToConsumerChannelMap(topic.Name, cc)
898
899 //Start a consumers to listen on that specific topic
900 go sc.startConsumers(topic)
901
902 return consumerListeningChannel, nil
903}
904
905func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
906 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
907 partitionList, err := sc.consumer.Partitions(topic.Name)
908 if err != nil {
909 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
910 return nil, err
911 }
912
913 pConsumers := make([]sarama.PartitionConsumer, 0)
914 for _, partition := range partitionList {
915 var pConsumer sarama.PartitionConsumer
916 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
917 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
918 return nil, err
919 }
920 pConsumers = append(pConsumers, pConsumer)
921 }
922 return pConsumers, nil
923}
924
925func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
926 var i int
927 var channel chan *ic.InterContainerMessage
928 for i, channel = range channels {
929 if channel == ch {
930 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
931 close(channel)
932 log.Debug("channel-closed")
933 return channels[:len(channels)-1]
934 }
935 }
936 return channels
937}
938
William Kurkianea869482019-04-09 15:16:11 -0400939func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
940 sc.lockOfGroupConsumers.Lock()
941 defer sc.lockOfGroupConsumers.Unlock()
942 if _, exist := sc.groupConsumers[topic]; !exist {
943 sc.groupConsumers[topic] = consumer
944 }
945}
946
947func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
948 sc.lockOfGroupConsumers.Lock()
949 defer sc.lockOfGroupConsumers.Unlock()
950 if _, exist := sc.groupConsumers[topic]; exist {
951 consumer := sc.groupConsumers[topic]
952 delete(sc.groupConsumers, topic)
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400953 if err := consumer.Close(); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400954 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
955 return err
956 }
957 }
958 return nil
959}