blob: add1900b4a16fdbe5074c7fce62e7f64c9c3d7e5 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
21 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
23 "github.com/google/uuid"
24 "github.com/opencord/voltha-go/common/log"
25 ic "github.com/opencord/voltha-protos/go/inter_container"
26 "gopkg.in/Shopify/sarama.v1"
27 "strings"
28 "sync"
29 "time"
30)
31
32func init() {
33 log.AddPackage(log.JSON, log.DebugLevel, nil)
34}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
41type consumerChannels struct {
42 consumers []interface{}
43 channels []chan *ic.InterContainerMessage
44}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
48 cAdmin sarama.ClusterAdmin
49 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumers map[string]*scc.Consumer
55 lockOfGroupConsumers sync.RWMutex
56 consumerGroupPrefix string
57 consumerType int
58 consumerGroupName string
59 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
71 doneCh chan int
72 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
74 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
76}
77
78type SaramaClientOption func(*SaramaClient)
79
80func Host(host string) SaramaClientOption {
81 return func(args *SaramaClient) {
82 args.KafkaHost = host
83 }
84}
85
86func Port(port int) SaramaClientOption {
87 return func(args *SaramaClient) {
88 args.KafkaPort = port
89 }
90}
91
92func ConsumerGroupPrefix(prefix string) SaramaClientOption {
93 return func(args *SaramaClient) {
94 args.consumerGroupPrefix = prefix
95 }
96}
97
98func ConsumerGroupName(name string) SaramaClientOption {
99 return func(args *SaramaClient) {
100 args.consumerGroupName = name
101 }
102}
103
104func ConsumerType(consumer int) SaramaClientOption {
105 return func(args *SaramaClient) {
106 args.consumerType = consumer
107 }
108}
109
110func ProducerFlushFrequency(frequency int) SaramaClientOption {
111 return func(args *SaramaClient) {
112 args.producerFlushFrequency = frequency
113 }
114}
115
116func ProducerFlushMessages(num int) SaramaClientOption {
117 return func(args *SaramaClient) {
118 args.producerFlushMessages = num
119 }
120}
121
122func ProducerFlushMaxMessages(num int) SaramaClientOption {
123 return func(args *SaramaClient) {
124 args.producerFlushMaxmessages = num
125 }
126}
127
128func ProducerMaxRetries(num int) SaramaClientOption {
129 return func(args *SaramaClient) {
130 args.producerRetryMax = num
131 }
132}
133
134func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
135 return func(args *SaramaClient) {
136 args.producerRetryBackOff = duration
137 }
138}
139
140func ProducerReturnOnErrors(opt bool) SaramaClientOption {
141 return func(args *SaramaClient) {
142 args.producerReturnErrors = opt
143 }
144}
145
146func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
147 return func(args *SaramaClient) {
148 args.producerReturnSuccess = opt
149 }
150}
151
152func ConsumerMaxWait(wait int) SaramaClientOption {
153 return func(args *SaramaClient) {
154 args.consumerMaxwait = wait
155 }
156}
157
158func MaxProcessingTime(pTime int) SaramaClientOption {
159 return func(args *SaramaClient) {
160 args.maxProcessingTime = pTime
161 }
162}
163
164func NumPartitions(number int) SaramaClientOption {
165 return func(args *SaramaClient) {
166 args.numPartitions = number
167 }
168}
169
170func NumReplicas(number int) SaramaClientOption {
171 return func(args *SaramaClient) {
172 args.numReplicas = number
173 }
174}
175
176func AutoCreateTopic(opt bool) SaramaClientOption {
177 return func(args *SaramaClient) {
178 args.autoCreateTopic = opt
179 }
180}
181
182func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
183 client := &SaramaClient{
184 KafkaHost: DefaultKafkaHost,
185 KafkaPort: DefaultKafkaPort,
186 }
187 client.consumerType = DefaultConsumerType
188 client.producerFlushFrequency = DefaultProducerFlushFrequency
189 client.producerFlushMessages = DefaultProducerFlushMessages
190 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
191 client.producerReturnErrors = DefaultProducerReturnErrors
192 client.producerReturnSuccess = DefaultProducerReturnSuccess
193 client.producerRetryMax = DefaultProducerRetryMax
194 client.producerRetryBackOff = DefaultProducerRetryBackoff
195 client.consumerMaxwait = DefaultConsumerMaxwait
196 client.maxProcessingTime = DefaultMaxProcessingTime
197 client.numPartitions = DefaultNumberPartitions
198 client.numReplicas = DefaultNumberReplicas
199 client.autoCreateTopic = DefaultAutoCreateTopic
200
201 for _, option := range opts {
202 option(client)
203 }
204
205 client.groupConsumers = make(map[string]*scc.Consumer)
206
207 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
208 client.topicLockMap = make(map[string]*sync.RWMutex)
209 client.lockOfTopicLockMap = sync.RWMutex{}
210 client.lockOfGroupConsumers = sync.RWMutex{}
211 return client
212}
213
214func (sc *SaramaClient) Start() error {
215 log.Info("Starting-kafka-sarama-client")
216
217 // Create the Done channel
218 sc.doneCh = make(chan int, 1)
219
220 var err error
221
222 // Create the Cluster Admin
223 if err = sc.createClusterAdmin(); err != nil {
224 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
225 return err
226 }
227
228 // Create the Publisher
229 if err := sc.createPublisher(); err != nil {
230 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
231 return err
232 }
233
234 if sc.consumerType == DefaultConsumerType {
235 // Create the master consumers
236 if err := sc.createConsumer(); err != nil {
237 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
238 return err
239 }
240 }
241
242 // Create the topic to consumers/channel map
243 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
244
245 log.Info("kafka-sarama-client-started")
246
247 return nil
248}
249
250func (sc *SaramaClient) Stop() {
251 log.Info("stopping-sarama-client")
252
253 //Send a message over the done channel to close all long running routines
254 sc.doneCh <- 1
255
256 if sc.producer != nil {
257 if err := sc.producer.Close(); err != nil {
258 log.Errorw("closing-producer-failed", log.Fields{"error": err})
259 }
260 }
261
262 if sc.consumer != nil {
263 if err := sc.consumer.Close(); err != nil {
264 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
265 }
266 }
267
268 for key, val := range sc.groupConsumers {
269 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
270 if err := val.Close(); err != nil {
271 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
272 }
273 }
274
275 if sc.cAdmin != nil {
276 if err := sc.cAdmin.Close(); err != nil {
277 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
278 }
279 }
280
281 //TODO: Clear the consumers map
282 //sc.clearConsumerChannelMap()
283
284 log.Info("sarama-client-stopped")
285}
286
287//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
288// the invoking function must hold the lock
289func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
290 // Set the topic details
291 topicDetail := &sarama.TopicDetail{}
292 topicDetail.NumPartitions = int32(numPartition)
293 topicDetail.ReplicationFactor = int16(repFactor)
294 topicDetail.ConfigEntries = make(map[string]*string)
295 topicDetails := make(map[string]*sarama.TopicDetail)
296 topicDetails[topic.Name] = topicDetail
297
298 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
299 if err == sarama.ErrTopicAlreadyExists {
300 // Not an error
301 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
302 return nil
303 }
304 log.Errorw("create-topic-failure", log.Fields{"error": err})
305 return err
306 }
307 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
308 // do so.
309 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
310 return nil
311}
312
313//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
314// ensure no two go routines are performing operations on the same topic
315func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
316 sc.lockTopic(topic)
317 defer sc.unLockTopic(topic)
318
319 return sc.createTopic(topic, numPartition, repFactor)
320}
321
322//DeleteTopic removes a topic from the kafka Broker
323func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
324 sc.lockTopic(topic)
325 defer sc.unLockTopic(topic)
326
327 // Remove the topic from the broker
328 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
329 if err == sarama.ErrUnknownTopicOrPartition {
330 // Not an error as does not exist
331 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
332 return nil
333 }
334 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
335 return err
336 }
337
338 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
339 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
340 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
341 return err
342 }
343 return nil
344}
345
346// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
347// messages from that topic
348func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
349 sc.lockTopic(topic)
350 defer sc.unLockTopic(topic)
351
352 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
353
354 // If a consumers already exist for that topic then resuse it
355 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
356 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
357 // Create a channel specific for that consumers and add it to the consumers channel map
358 ch := make(chan *ic.InterContainerMessage)
359 sc.addChannelToConsumerChannelMap(topic, ch)
360 return ch, nil
361 }
362
363 // Register for the topic and set it up
364 var consumerListeningChannel chan *ic.InterContainerMessage
365 var err error
366
367 // Use the consumerType option to figure out the type of consumer to launch
368 if sc.consumerType == PartitionConsumer {
369 if sc.autoCreateTopic {
370 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
371 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
372 return nil, err
373 }
374 }
375 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
376 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
377 return nil, err
378 }
379 } else if sc.consumerType == GroupCustomer {
380 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
381 // does not consume from a precreated topic in some scenarios
382 //if sc.autoCreateTopic {
383 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
384 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
385 // return nil, err
386 // }
387 //}
388 //groupId := sc.consumerGroupName
389 groupId := getGroupId(kvArgs...)
390 // Include the group prefix
391 if groupId != "" {
392 groupId = sc.consumerGroupPrefix + groupId
393 } else {
394 // Need to use a unique group Id per topic
395 groupId = sc.consumerGroupPrefix + topic.Name
396 }
397 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
398 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
399 return nil, err
400 }
401
402 } else {
403 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
404 return nil, errors.New("unknown-consumer-type")
405 }
406
407 return consumerListeningChannel, nil
408}
409
410//UnSubscribe unsubscribe a consumer from a given topic
411func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
412 sc.lockTopic(topic)
413 defer sc.unLockTopic(topic)
414
415 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
416 var err error
417 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
418 log.Errorw("failed-removing-channel", log.Fields{"error": err})
419 }
420 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
421 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
422 }
423 return err
424}
425
426// send formats and sends the request onto the kafka messaging bus.
427func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
428
429 // Assert message is a proto message
430 var protoMsg proto.Message
431 var ok bool
432 // ascertain the value interface type is a proto.Message
433 if protoMsg, ok = msg.(proto.Message); !ok {
434 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
435 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
436 }
437
438 var marshalled []byte
439 var err error
440 // Create the Sarama producer message
441 if marshalled, err = proto.Marshal(protoMsg); err != nil {
442 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
443 return err
444 }
445 key := ""
446 if len(keys) > 0 {
447 key = keys[0] // Only the first key is relevant
448 }
449 kafkaMsg := &sarama.ProducerMessage{
450 Topic: topic.Name,
451 Key: sarama.StringEncoder(key),
452 Value: sarama.ByteEncoder(marshalled),
453 }
454
455 // Send message to kafka
456 sc.producer.Input() <- kafkaMsg
457
458 // Wait for result
459 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
460 select {
461 case ok := <-sc.producer.Successes():
462 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
463 case notOk := <-sc.producer.Errors():
464 log.Debugw("error-sending", log.Fields{"status": notOk})
465 return notOk
466 }
467 return nil
468}
469
470// getGroupId returns the group id from the key-value args.
471func getGroupId(kvArgs ...*KVArg) string {
472 for _, arg := range kvArgs {
473 if arg.Key == GroupIdKey {
474 return arg.Value.(string)
475 }
476 }
477 return ""
478}
479
480// getOffset returns the offset from the key-value args.
481func getOffset(kvArgs ...*KVArg) int64 {
482 for _, arg := range kvArgs {
483 if arg.Key == Offset {
484 return arg.Value.(int64)
485 }
486 }
487 return sarama.OffsetNewest
488}
489
490func (sc *SaramaClient) createClusterAdmin() error {
491 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
492 config := sarama.NewConfig()
493 config.Version = sarama.V1_0_0_0
494
495 // Create a cluster Admin
496 var cAdmin sarama.ClusterAdmin
497 var err error
498 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
499 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
500 return err
501 }
502 sc.cAdmin = cAdmin
503 return nil
504}
505
506func (sc *SaramaClient) lockTopic(topic *Topic) {
507 sc.lockOfTopicLockMap.Lock()
508 if _, exist := sc.topicLockMap[topic.Name]; exist {
509 sc.lockOfTopicLockMap.Unlock()
510 sc.topicLockMap[topic.Name].Lock()
511 } else {
512 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
513 sc.lockOfTopicLockMap.Unlock()
514 sc.topicLockMap[topic.Name].Lock()
515 }
516}
517
518func (sc *SaramaClient) unLockTopic(topic *Topic) {
519 sc.lockOfTopicLockMap.Lock()
520 defer sc.lockOfTopicLockMap.Unlock()
521 if _, exist := sc.topicLockMap[topic.Name]; exist {
522 sc.topicLockMap[topic.Name].Unlock()
523 }
524}
525
526func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
527 sc.lockTopicToConsumerChannelMap.Lock()
528 defer sc.lockTopicToConsumerChannelMap.Unlock()
529 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
530 sc.topicToConsumerChannelMap[id] = arg
531 }
532}
533
534func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
535 sc.lockTopicToConsumerChannelMap.Lock()
536 defer sc.lockTopicToConsumerChannelMap.Unlock()
537 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
538 delete(sc.topicToConsumerChannelMap, id)
539 }
540}
541
542func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
543 sc.lockTopicToConsumerChannelMap.RLock()
544 defer sc.lockTopicToConsumerChannelMap.RUnlock()
545
546 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
547 return consumerCh
548 }
549 return nil
550}
551
552func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
553 sc.lockTopicToConsumerChannelMap.Lock()
554 defer sc.lockTopicToConsumerChannelMap.Unlock()
555 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
556 consumerCh.channels = append(consumerCh.channels, ch)
557 return
558 }
559 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
560}
561
562//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
563func closeConsumers(consumers []interface{}) error {
564 var err error
565 for _, consumer := range consumers {
566 // Is it a partition consumers?
567 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
568 if errTemp := partionConsumer.Close(); errTemp != nil {
569 log.Debugw("partition!!!", log.Fields{"err": errTemp})
570 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
571 // This can occur on race condition
572 err = nil
573 } else {
574 err = errTemp
575 }
576 }
577 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
578 if errTemp := groupConsumer.Close(); errTemp != nil {
579 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
580 // This can occur on race condition
581 err = nil
582 } else {
583 err = errTemp
584 }
585 }
586 }
587 }
588 return err
589}
590
591func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
592 sc.lockTopicToConsumerChannelMap.Lock()
593 defer sc.lockTopicToConsumerChannelMap.Unlock()
594 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
595 // Channel will be closed in the removeChannel method
596 consumerCh.channels = removeChannel(consumerCh.channels, ch)
597 // If there are no more channels then we can close the consumers itself
598 if len(consumerCh.channels) == 0 {
599 log.Debugw("closing-consumers", log.Fields{"topic": topic})
600 err := closeConsumers(consumerCh.consumers)
601 //err := consumerCh.consumers.Close()
602 delete(sc.topicToConsumerChannelMap, topic.Name)
603 return err
604 }
605 return nil
606 }
607 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
608 return errors.New("topic-does-not-exist")
609}
610
611func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
612 sc.lockTopicToConsumerChannelMap.Lock()
613 defer sc.lockTopicToConsumerChannelMap.Unlock()
614 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
615 for _, ch := range consumerCh.channels {
616 // Channel will be closed in the removeChannel method
617 removeChannel(consumerCh.channels, ch)
618 }
619 err := closeConsumers(consumerCh.consumers)
620 //if err == sarama.ErrUnknownTopicOrPartition {
621 // // Not an error
622 // err = nil
623 //}
624 //err := consumerCh.consumers.Close()
625 delete(sc.topicToConsumerChannelMap, topic.Name)
626 return err
627 }
628 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
629 return nil
630}
631
632func (sc *SaramaClient) clearConsumerChannelMap() error {
633 sc.lockTopicToConsumerChannelMap.Lock()
634 defer sc.lockTopicToConsumerChannelMap.Unlock()
635 var err error
636 for topic, consumerCh := range sc.topicToConsumerChannelMap {
637 for _, ch := range consumerCh.channels {
638 // Channel will be closed in the removeChannel method
639 removeChannel(consumerCh.channels, ch)
640 }
641 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
642 err = errTemp
643 }
644 //err = consumerCh.consumers.Close()
645 delete(sc.topicToConsumerChannelMap, topic)
646 }
647 return err
648}
649
650//createPublisher creates the publisher which is used to send a message onto kafka
651func (sc *SaramaClient) createPublisher() error {
652 // This Creates the publisher
653 config := sarama.NewConfig()
654 config.Producer.Partitioner = sarama.NewRandomPartitioner
655 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
656 config.Producer.Flush.Messages = sc.producerFlushMessages
657 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
658 config.Producer.Return.Errors = sc.producerReturnErrors
659 config.Producer.Return.Successes = sc.producerReturnSuccess
660 //config.Producer.RequiredAcks = sarama.WaitForAll
661 config.Producer.RequiredAcks = sarama.WaitForLocal
662
663 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
664 brokers := []string{kafkaFullAddr}
665
666 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
667 log.Errorw("error-starting-publisher", log.Fields{"error": err})
668 return err
669 } else {
670 sc.producer = producer
671 }
672 log.Info("Kafka-publisher-created")
673 return nil
674}
675
676func (sc *SaramaClient) createConsumer() error {
677 config := sarama.NewConfig()
678 config.Consumer.Return.Errors = true
679 config.Consumer.Fetch.Min = 1
680 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
681 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
682 config.Consumer.Offsets.Initial = sarama.OffsetNewest
683 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
684 brokers := []string{kafkaFullAddr}
685
686 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
687 log.Errorw("error-starting-consumers", log.Fields{"error": err})
688 return err
689 } else {
690 sc.consumer = consumer
691 }
692 log.Info("Kafka-consumers-created")
693 return nil
694}
695
696// createGroupConsumer creates a consumers group
697func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
698 config := scc.NewConfig()
699 config.ClientID = uuid.New().String()
700 config.Group.Mode = scc.ConsumerModeMultiplex
701 //config.Consumer.Return.Errors = true
702 //config.Group.Return.Notifications = false
703 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
704 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
705 config.Consumer.Offsets.Initial = initialOffset
706 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
707 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
708 brokers := []string{kafkaFullAddr}
709
710 topics := []string{topic.Name}
711 var consumer *scc.Consumer
712 var err error
713
714 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
715 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
716 return nil, err
717 }
718 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
719
720 //sc.groupConsumers[topic.Name] = consumer
721 sc.addToGroupConsumers(topic.Name, consumer)
722 return consumer, nil
723}
724
725// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
726// topic via the unique channel each subscriber received during subscription
727func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
728 // Need to go over all channels and publish messages to them - do we need to copy msg?
729 sc.lockTopicToConsumerChannelMap.RLock()
730 defer sc.lockTopicToConsumerChannelMap.RUnlock()
731 for _, ch := range consumerCh.channels {
732 go func(c chan *ic.InterContainerMessage) {
733 c <- protoMessage
734 }(ch)
735 }
736}
737
738func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
739 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
740startloop:
741 for {
742 select {
743 case err, ok := <-consumer.Errors():
744 if ok {
745 log.Warnw("partition-consumers-error", log.Fields{"error": err})
746 } else {
747 // Channel is closed
748 break startloop
749 }
750 case msg, ok := <-consumer.Messages():
751 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
752 if !ok {
753 // channel is closed
754 break startloop
755 }
756 msgBody := msg.Value
757 icm := &ic.InterContainerMessage{}
758 if err := proto.Unmarshal(msgBody, icm); err != nil {
759 log.Warnw("partition-invalid-message", log.Fields{"error": err})
760 continue
761 }
762 go sc.dispatchToConsumers(consumerChnls, icm)
763 case <-sc.doneCh:
764 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
765 break startloop
766 }
767 }
768 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
769}
770
771func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
772 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
773
774startloop:
775 for {
776 select {
777 case err, ok := <-consumer.Errors():
778 if ok {
779 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
780 } else {
781 // channel is closed
782 break startloop
783 }
784 case msg, ok := <-consumer.Messages():
785 if !ok {
786 // Channel closed
787 break startloop
788 }
789 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
790 msgBody := msg.Value
791 icm := &ic.InterContainerMessage{}
792 if err := proto.Unmarshal(msgBody, icm); err != nil {
793 log.Warnw("invalid-message", log.Fields{"error": err})
794 continue
795 }
796 go sc.dispatchToConsumers(consumerChnls, icm)
797 consumer.MarkOffset(msg, "")
798 case ntf := <-consumer.Notifications():
799 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
800 case <-sc.doneCh:
801 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
802 break startloop
803 }
804 }
805 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
806}
807
808func (sc *SaramaClient) startConsumers(topic *Topic) error {
809 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
810 var consumerCh *consumerChannels
811 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
812 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
813 return errors.New("consumers-not-exist")
814 }
815 // For each consumer listening for that topic, start a consumption loop
816 for _, consumer := range consumerCh.consumers {
817 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
818 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
819 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
820 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
821 } else {
822 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
823 return errors.New("invalid-consumer")
824 }
825 }
826 return nil
827}
828
829//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
830//// for that topic. It also starts the routine that listens for messages on that topic.
831func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
832 var pConsumers []sarama.PartitionConsumer
833 var err error
834
835 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
836 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
837 return nil, err
838 }
839
840 consumersIf := make([]interface{}, 0)
841 for _, pConsumer := range pConsumers {
842 consumersIf = append(consumersIf, pConsumer)
843 }
844
845 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
846 // unbuffered to verify race conditions.
847 consumerListeningChannel := make(chan *ic.InterContainerMessage)
848 cc := &consumerChannels{
849 consumers: consumersIf,
850 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
851 }
852
853 // Add the consumers channel to the map
854 sc.addTopicToConsumerChannelMap(topic.Name, cc)
855
856 //Start a consumers to listen on that specific topic
857 go sc.startConsumers(topic)
858
859 return consumerListeningChannel, nil
860}
861
862// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
863// for that topic. It also starts the routine that listens for messages on that topic.
864func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
865 // TODO: Replace this development partition consumers with a group consumers
866 var pConsumer *scc.Consumer
867 var err error
868 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
869 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
870 return nil, err
871 }
872 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
873 // unbuffered to verify race conditions.
874 consumerListeningChannel := make(chan *ic.InterContainerMessage)
875 cc := &consumerChannels{
876 consumers: []interface{}{pConsumer},
877 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
878 }
879
880 // Add the consumers channel to the map
881 sc.addTopicToConsumerChannelMap(topic.Name, cc)
882
883 //Start a consumers to listen on that specific topic
884 go sc.startConsumers(topic)
885
886 return consumerListeningChannel, nil
887}
888
889func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
890 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
891 partitionList, err := sc.consumer.Partitions(topic.Name)
892 if err != nil {
893 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
894 return nil, err
895 }
896
897 pConsumers := make([]sarama.PartitionConsumer, 0)
898 for _, partition := range partitionList {
899 var pConsumer sarama.PartitionConsumer
900 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
901 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
902 return nil, err
903 }
904 pConsumers = append(pConsumers, pConsumer)
905 }
906 return pConsumers, nil
907}
908
909func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
910 var i int
911 var channel chan *ic.InterContainerMessage
912 for i, channel = range channels {
913 if channel == ch {
914 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
915 close(channel)
916 log.Debug("channel-closed")
917 return channels[:len(channels)-1]
918 }
919 }
920 return channels
921}
922
923
924func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
925 sc.lockOfGroupConsumers.Lock()
926 defer sc.lockOfGroupConsumers.Unlock()
927 if _, exist := sc.groupConsumers[topic]; !exist {
928 sc.groupConsumers[topic] = consumer
929 }
930}
931
932func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
933 sc.lockOfGroupConsumers.Lock()
934 defer sc.lockOfGroupConsumers.Unlock()
935 if _, exist := sc.groupConsumers[topic]; exist {
936 consumer := sc.groupConsumers[topic]
937 delete(sc.groupConsumers, topic)
938 if err := consumer.Close(); err!= nil {
939 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
940 return err
941 }
942 }
943 return nil
944}