blob: f2de01a5beded008db335cd44f8ba506dee76180 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
khenaidoo43c82122018-11-22 18:38:28 -050021 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050023 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050024 "github.com/opencord/voltha-go/common/log"
25 ca "github.com/opencord/voltha-go/protos/core_adapter"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "gopkg.in/Shopify/sarama.v1"
27 "strings"
khenaidoo43c82122018-11-22 18:38:28 -050028 "sync"
29 "time"
30)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
33 log.AddPackage(log.JSON, log.WarnLevel, nil)
34}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
43 channels []chan *ca.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumer *scc.Consumer
khenaidoo4c1a5bf2018-11-29 15:53:42 -050055 consumerType int
56 groupName string
57 producerFlushFrequency int
58 producerFlushMessages int
59 producerFlushMaxmessages int
60 producerRetryMax int
61 producerRetryBackOff time.Duration
62 producerReturnSuccess bool
63 producerReturnErrors bool
64 consumerMaxwait int
65 maxProcessingTime int
66 numPartitions int
67 numReplicas int
68 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050069 doneCh chan int
70 topicToConsumerChannelMap map[string]*consumerChannels
71 lockTopicToConsumerChannelMap sync.RWMutex
72}
73
74type SaramaClientOption func(*SaramaClient)
75
76func Host(host string) SaramaClientOption {
77 return func(args *SaramaClient) {
78 args.KafkaHost = host
79 }
80}
81
82func Port(port int) SaramaClientOption {
83 return func(args *SaramaClient) {
84 args.KafkaPort = port
85 }
86}
87
khenaidoo4c1a5bf2018-11-29 15:53:42 -050088func ConsumerType(consumer int) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.consumerType = consumer
91 }
92}
93
94func ProducerFlushFrequency(frequency int) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.producerFlushFrequency = frequency
97 }
98}
99
100func ProducerFlushMessages(num int) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.producerFlushMessages = num
103 }
104}
105
106func ProducerFlushMaxMessages(num int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.producerFlushMaxmessages = num
109 }
110}
111
112func ReturnOnErrors(opt bool) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerReturnErrors = opt
115 }
116}
117
118func ReturnOnSuccess(opt bool) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerReturnSuccess = opt
121 }
122}
123
124func ConsumerMaxWait(wait int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.consumerMaxwait = wait
127 }
128}
129
130func MaxProcessingTime(pTime int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.maxProcessingTime = pTime
133 }
134}
135
136func NumPartitions(number int) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.numPartitions = number
139 }
140}
141
142func NumReplicas(number int) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.numReplicas = number
145 }
146}
147
148func AutoCreateTopic(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.autoCreateTopic = opt
151 }
152}
153
khenaidoo43c82122018-11-22 18:38:28 -0500154func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
155 client := &SaramaClient{
156 KafkaHost: DefaultKafkaHost,
157 KafkaPort: DefaultKafkaPort,
158 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500159 client.consumerType = DefaultConsumerType
160 client.producerFlushFrequency = DefaultProducerFlushFrequency
161 client.producerFlushMessages = DefaultProducerFlushMessages
162 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
163 client.producerReturnErrors = DefaultProducerReturnErrors
164 client.producerReturnSuccess = DefaultProducerReturnSuccess
165 client.producerRetryMax = DefaultProducerRetryMax
166 client.producerRetryBackOff = DefaultProducerRetryBackoff
167 client.consumerMaxwait = DefaultConsumerMaxwait
168 client.maxProcessingTime = DefaultMaxProcessingTime
169 client.numPartitions = DefaultNumberPartitions
170 client.numReplicas = DefaultNumberReplicas
171 client.autoCreateTopic = DefaultAutoCreateTopic
khenaidoo43c82122018-11-22 18:38:28 -0500172
173 for _, option := range opts {
174 option(client)
175 }
176
177 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
178
179 return client
180}
181
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500182func (sc *SaramaClient) Start() error {
183 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500184
185 // Create the Done channel
186 sc.doneCh = make(chan int, 1)
187
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500188 var err error
189
190 // Create the Cluster Admin
191 if err = sc.createClusterAdmin(); err != nil {
192 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
193 return err
194 }
195
khenaidoo43c82122018-11-22 18:38:28 -0500196 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500197 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500198 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
199 return err
200 }
201
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500202 // Create the master consumers
203 if err := sc.createConsumer(); err != nil {
204 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500205 return err
206 }
207
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500208 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500209 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
210
211 return nil
212}
213
214func (sc *SaramaClient) Stop() {
215 log.Info("stopping-sarama-client")
216
217 //Send a message over the done channel to close all long running routines
218 sc.doneCh <- 1
219
khenaidoo43c82122018-11-22 18:38:28 -0500220 if sc.producer != nil {
221 if err := sc.producer.Close(); err != nil {
222 panic(err)
223 }
224 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500225
khenaidoo43c82122018-11-22 18:38:28 -0500226 if sc.consumer != nil {
227 if err := sc.consumer.Close(); err != nil {
228 panic(err)
229 }
230 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500231
232 if sc.groupConsumer != nil {
233 if err := sc.groupConsumer.Close(); err != nil {
234 panic(err)
235 }
236 }
237
238 if sc.cAdmin != nil {
239 if err := sc.cAdmin.Close(); err != nil {
240 panic(err)
241 }
242 }
243
244 //TODO: Clear the consumers map
245 sc.clearConsumerChannelMap()
246
247 log.Info("sarama-client-stopped")
248}
249
250//CreateTopic creates a topic on the Kafka Broker.
251func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
252 // Set the topic details
253 topicDetail := &sarama.TopicDetail{}
254 topicDetail.NumPartitions = int32(numPartition)
255 topicDetail.ReplicationFactor = int16(repFactor)
256 topicDetail.ConfigEntries = make(map[string]*string)
257 topicDetails := make(map[string]*sarama.TopicDetail)
258 topicDetails[topic.Name] = topicDetail
259
260 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
261 if err == sarama.ErrTopicAlreadyExists {
262 // Not an error
263 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
264 return nil
265 }
266 log.Errorw("create-topic-failure", log.Fields{"error": err})
267 return err
268 }
269 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
270 // do so.
271 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
272 return nil
273}
274
275//DeleteTopic removes a topic from the kafka Broker
276func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
277 // Remove the topic from the broker
278 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
279 if err == sarama.ErrUnknownTopicOrPartition {
280 // Not an error as does not exist
281 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
282 return nil
283 }
284 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
285 return err
286 }
287
288 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
289 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
290 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
291 return err
292 }
293 return nil
294}
295
296// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
297// messages from that topic
298func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error) {
299 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
300
301 // If a consumers already exist for that topic then resuse it
302 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
303 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
304 // Create a channel specific for that consumers and add it to the consumers channel map
305 ch := make(chan *ca.InterContainerMessage)
306 sc.addChannelToConsumerChannelMap(topic, ch)
307 return ch, nil
308 }
309
310 // Register for the topic and set it up
311 var consumerListeningChannel chan *ca.InterContainerMessage
312 var err error
313
314 // Use the consumerType option to figure out the type of consumer to launch
315 if sc.consumerType == PartitionConsumer {
316 if sc.autoCreateTopic {
317 if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
318 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
319 return nil, err
320 }
321 }
322 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
323 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
324 return nil, err
325 }
326 } else if sc.consumerType == GroupCustomer {
327 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
328 // does not consume from a precreated topic in some scenarios
329 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
330 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
331 return nil, err
332 }
333 } else {
334 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
335 return nil, errors.New("unknown-consumer-type")
336 }
337
338 return consumerListeningChannel, nil
339}
340
341//UnSubscribe unsubscribe a consumer from a given topic
342func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
343 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
344 err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
345 return err
346}
347
348// send formats and sends the request onto the kafka messaging bus.
349func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
350
351 // Assert message is a proto message
352 var protoMsg proto.Message
353 var ok bool
354 // ascertain the value interface type is a proto.Message
355 if protoMsg, ok = msg.(proto.Message); !ok {
356 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
357 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
358 }
359
360 var marshalled []byte
361 var err error
362 // Create the Sarama producer message
363 if marshalled, err = proto.Marshal(protoMsg); err != nil {
364 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
365 return err
366 }
367 key := ""
368 if len(keys) > 0 {
369 key = keys[0] // Only the first key is relevant
370 }
371 kafkaMsg := &sarama.ProducerMessage{
372 Topic: topic.Name,
373 Key: sarama.StringEncoder(key),
374 Value: sarama.ByteEncoder(marshalled),
375 }
376
377 // Send message to kafka
378 sc.producer.Input() <- kafkaMsg
379 return nil
380}
381
382func (sc *SaramaClient) createClusterAdmin() error {
383 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
384 config := sarama.NewConfig()
385 config.Version = sarama.V1_0_0_0
386
387 // Create a cluster Admin
388 var cAdmin sarama.ClusterAdmin
389 var err error
390 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
391 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
392 return err
393 }
394 sc.cAdmin = cAdmin
395 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500396}
397
398func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
399 sc.lockTopicToConsumerChannelMap.Lock()
400 defer sc.lockTopicToConsumerChannelMap.Unlock()
401 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
402 sc.topicToConsumerChannelMap[id] = arg
403 }
404}
405
406func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
407 sc.lockTopicToConsumerChannelMap.Lock()
408 defer sc.lockTopicToConsumerChannelMap.Unlock()
409 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
410 delete(sc.topicToConsumerChannelMap, id)
411 }
412}
413
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500414func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo43c82122018-11-22 18:38:28 -0500415 sc.lockTopicToConsumerChannelMap.Lock()
416 defer sc.lockTopicToConsumerChannelMap.Unlock()
417
418 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
419 return consumerCh
420 }
421 return nil
422}
423
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500424func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ca.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500425 sc.lockTopicToConsumerChannelMap.Lock()
426 defer sc.lockTopicToConsumerChannelMap.Unlock()
427 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
428 consumerCh.channels = append(consumerCh.channels, ch)
429 return
430 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500431 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
432}
433
434//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
435func closeConsumers(consumers []interface{}) error {
436 var err error
437 for _, consumer := range consumers {
438 // Is it a partition consumers?
439 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
440 if errTemp := partionConsumer.Close(); errTemp != nil {
441 log.Debugw("partition!!!", log.Fields{"err": errTemp})
442 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
443 // This can occur on race condition
444 err = nil
445 } else {
446 err = errTemp
447 }
448 }
449 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
450 if errTemp := groupConsumer.Close(); errTemp != nil {
451 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
452 // This can occur on race condition
453 err = nil
454 } else {
455 err = errTemp
456 }
457 }
458 }
459 }
460 return err
khenaidoo43c82122018-11-22 18:38:28 -0500461}
462
463func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
464 sc.lockTopicToConsumerChannelMap.Lock()
465 defer sc.lockTopicToConsumerChannelMap.Unlock()
466 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
467 // Channel will be closed in the removeChannel method
468 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500469 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500470 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500471 log.Debugw("closing-consumers", log.Fields{"topic": topic})
472 err := closeConsumers(consumerCh.consumers)
473 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500474 delete(sc.topicToConsumerChannelMap, topic.Name)
475 return err
476 }
477 return nil
478 }
479 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
480 return errors.New("topic-does-not-exist")
481}
482
483func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
484 sc.lockTopicToConsumerChannelMap.Lock()
485 defer sc.lockTopicToConsumerChannelMap.Unlock()
486 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
487 for _, ch := range consumerCh.channels {
488 // Channel will be closed in the removeChannel method
489 removeChannel(consumerCh.channels, ch)
490 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500491 err := closeConsumers(consumerCh.consumers)
492 //if err == sarama.ErrUnknownTopicOrPartition {
493 // // Not an error
494 // err = nil
495 //}
496 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500497 delete(sc.topicToConsumerChannelMap, topic.Name)
498 return err
499 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500500 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
501 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500502}
503
504func (sc *SaramaClient) clearConsumerChannelMap() error {
505 sc.lockTopicToConsumerChannelMap.Lock()
506 defer sc.lockTopicToConsumerChannelMap.Unlock()
507 var err error
508 for topic, consumerCh := range sc.topicToConsumerChannelMap {
509 for _, ch := range consumerCh.channels {
510 // Channel will be closed in the removeChannel method
511 removeChannel(consumerCh.channels, ch)
512 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500513 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
514 err = errTemp
515 }
516 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500517 delete(sc.topicToConsumerChannelMap, topic)
518 }
519 return err
520}
521
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500522//createPublisher creates the publisher which is used to send a message onto kafka
523func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500524 // This Creates the publisher
525 config := sarama.NewConfig()
526 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500527 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
528 config.Producer.Flush.Messages = sc.producerFlushMessages
529 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
530 config.Producer.Return.Errors = sc.producerReturnErrors
531 config.Producer.Return.Successes = sc.producerReturnSuccess
532 //config.Producer.RequiredAcks = sarama.WaitForAll
533 config.Producer.RequiredAcks = sarama.WaitForLocal
534
khenaidoo43c82122018-11-22 18:38:28 -0500535 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
536 brokers := []string{kafkaFullAddr}
537
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500538 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
539 log.Errorw("error-starting-publisher", log.Fields{"error": err})
540 return err
541 } else {
542 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500543 }
544 log.Info("Kafka-publisher-created")
545 return nil
546}
547
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500548func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500549 config := sarama.NewConfig()
550 config.Consumer.Return.Errors = true
551 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500552 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
553 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500554 config.Consumer.Offsets.Initial = sarama.OffsetNewest
555 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
556 brokers := []string{kafkaFullAddr}
557
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500558 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
559 log.Errorw("error-starting-consumers", log.Fields{"error": err})
560 return err
561 } else {
562 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500563 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500564 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500565 return nil
566}
567
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500568// createGroupConsumer creates a consumers group
khenaidoo43c82122018-11-22 18:38:28 -0500569func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
570 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500571 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500572 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500573 //config.Consumer.Return.Errors = true
574 //config.Group.Return.Notifications = false
575 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
576 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500577 config.Consumer.Offsets.Initial = sarama.OffsetNewest
578 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
579 brokers := []string{kafkaFullAddr}
580
581 if groupId == nil {
582 g := DefaultGroupName
583 groupId = &g
584 }
585 topics := []string{topic.Name}
586 var consumer *scc.Consumer
587 var err error
588
khenaidoo43c82122018-11-22 18:38:28 -0500589 if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500590 log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500591 return nil, err
592 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500593 log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500594 //time.Sleep(10*time.Second)
595 sc.groupConsumer = consumer
596 return consumer, nil
597}
598
khenaidoo43c82122018-11-22 18:38:28 -0500599// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
600// topic via the unique channel each subsciber received during subscription
601func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
602 // Need to go over all channels and publish messages to them - do we need to copy msg?
603 sc.lockTopicToConsumerChannelMap.Lock()
604 defer sc.lockTopicToConsumerChannelMap.Unlock()
605 for _, ch := range consumerCh.channels {
606 go func(c chan *ca.InterContainerMessage) {
607 c <- protoMessage
608 }(ch)
609 }
610}
611
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500612func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
613 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500614startloop:
615 for {
616 select {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500617 case err := <-consumer.Errors():
618 if err != nil {
619 log.Warnw("partition-consumers-error", log.Fields{"error": err})
620 } else {
621 // There is a race condition when this loop is stopped and the consumer is closed where
622 // the actual error comes as nil
623 log.Warn("partition-consumers-error")
624 }
625 case msg := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500626 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500627 if msg == nil {
628 // There is a race condition when this loop is stopped and the consumer is closed where
629 // the actual msg comes as nil
630 break startloop
631 }
632 msgBody := msg.Value
633 icm := &ca.InterContainerMessage{}
634 if err := proto.Unmarshal(msgBody, icm); err != nil {
635 log.Warnw("partition-invalid-message", log.Fields{"error": err})
636 continue
637 }
638 go sc.dispatchToConsumers(consumerChnls, icm)
639 case <-sc.doneCh:
640 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
641 break startloop
642 }
643 }
644 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
645}
646
647func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
648 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
649
650startloop:
651 for {
652 select {
653 case err := <-consumer.Errors():
654 if err != nil {
655 log.Warnw("group-consumers-error", log.Fields{"error": err})
656 } else {
657 // There is a race condition when this loop is stopped and the consumer is closed where
658 // the actual error comes as nil
659 log.Warn("group-consumers-error")
660 }
661 case msg := <-consumer.Messages():
662 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
663 if msg == nil {
664 // There is a race condition when this loop is stopped and the consumer is closed where
665 // the actual msg comes as nil
666 break startloop
667 }
khenaidoo43c82122018-11-22 18:38:28 -0500668 msgBody := msg.Value
669 icm := &ca.InterContainerMessage{}
670 if err := proto.Unmarshal(msgBody, icm); err != nil {
671 log.Warnw("invalid-message", log.Fields{"error": err})
672 continue
673 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500674 go sc.dispatchToConsumers(consumerChnls, icm)
675 consumer.MarkOffset(msg, "")
676 case ntf := <-consumer.Notifications():
677 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500678 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500679 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500680 break startloop
681 }
682 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500683 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500684}
685
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500686func (sc *SaramaClient) startConsumers(topic *Topic) error {
687 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
688 var consumerCh *consumerChannels
689 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
690 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
691 return errors.New("consumers-not-exist")
692 }
693 // For each consumer listening for that topic, start a consumption loop
694 for _, consumer := range consumerCh.consumers {
695 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
696 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
697 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
698 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
699 } else {
700 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
701 return errors.New("invalid-consumer")
702 }
703 }
704 return nil
705}
706
707//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
708//// for that topic. It also starts the routine that listens for messages on that topic.
709func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ca.InterContainerMessage, error) {
710 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500711 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500712
713 if pConsumers, err = sc.createPartionConsumers(topic, initialOffset); err != nil {
714 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500715 return nil, err
716 }
717
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500718 consumersIf := make([]interface{}, 0)
719 for _, pConsumer := range pConsumers {
720 consumersIf = append(consumersIf, pConsumer)
721 }
722
723 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500724 // unbuffered to verify race conditions.
725 consumerListeningChannel := make(chan *ca.InterContainerMessage)
726 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500727 consumers: consumersIf,
728 channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500729 }
730
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500731 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500732 sc.addTopicToConsumerChannelMap(topic.Name, cc)
733
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500734 //Start a consumers to listen on that specific topic
735 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500736
737 return consumerListeningChannel, nil
738}
739
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500740// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
741// for that topic. It also starts the routine that listens for messages on that topic.
742func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ca.InterContainerMessage, error) {
743 // TODO: Replace this development partition consumers with a group consumers
744 var pConsumer *scc.Consumer
745 var err error
746 if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
747 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
748 return nil, err
749 }
750 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
751 // unbuffered to verify race conditions.
752 consumerListeningChannel := make(chan *ca.InterContainerMessage)
753 cc := &consumerChannels{
754 consumers: []interface{}{pConsumer},
755 channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
756 }
757
758 // Add the consumers channel to the map
759 sc.addTopicToConsumerChannelMap(topic.Name, cc)
760
761 //Start a consumers to listen on that specific topic
762 go sc.startConsumers(topic)
763
764 return consumerListeningChannel, nil
765}
766
767func (sc *SaramaClient) createPartionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
768 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500769 partitionList, err := sc.consumer.Partitions(topic.Name)
770 if err != nil {
771 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
772 return nil, err
773 }
774
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500775 pConsumers := make([]sarama.PartitionConsumer, 0)
776 for _, partition := range partitionList {
777 var pConsumer sarama.PartitionConsumer
778 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
779 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
780 return nil, err
781 }
782 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500783 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500784 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500785}
786
787func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
788 var i int
789 var channel chan *ca.InterContainerMessage
790 for i, channel = range channels {
791 if channel == ch {
792 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
793 close(channel)
794 return channels[:len(channels)-1]
795 }
796 }
797 return channels
798}