blob: e330b852a4fb494acf5fbaf963b542d12a09e59d [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
khenaidoo43c82122018-11-22 18:38:28 -050021 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050023 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050024 "github.com/opencord/voltha-go/common/log"
khenaidoo79232702018-12-04 11:00:41 -050025 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "gopkg.in/Shopify/sarama.v1"
27 "strings"
khenaidoo43c82122018-11-22 18:38:28 -050028 "sync"
29 "time"
30)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
33 log.AddPackage(log.JSON, log.WarnLevel, nil)
34}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050043 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumer *scc.Consumer
khenaidoo4c1a5bf2018-11-29 15:53:42 -050055 consumerType int
56 groupName string
57 producerFlushFrequency int
58 producerFlushMessages int
59 producerFlushMaxmessages int
60 producerRetryMax int
61 producerRetryBackOff time.Duration
62 producerReturnSuccess bool
63 producerReturnErrors bool
64 consumerMaxwait int
65 maxProcessingTime int
66 numPartitions int
67 numReplicas int
68 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050069 doneCh chan int
70 topicToConsumerChannelMap map[string]*consumerChannels
71 lockTopicToConsumerChannelMap sync.RWMutex
72}
73
74type SaramaClientOption func(*SaramaClient)
75
76func Host(host string) SaramaClientOption {
77 return func(args *SaramaClient) {
78 args.KafkaHost = host
79 }
80}
81
82func Port(port int) SaramaClientOption {
83 return func(args *SaramaClient) {
84 args.KafkaPort = port
85 }
86}
87
khenaidoo4c1a5bf2018-11-29 15:53:42 -050088func ConsumerType(consumer int) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.consumerType = consumer
91 }
92}
93
94func ProducerFlushFrequency(frequency int) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.producerFlushFrequency = frequency
97 }
98}
99
100func ProducerFlushMessages(num int) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.producerFlushMessages = num
103 }
104}
105
106func ProducerFlushMaxMessages(num int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.producerFlushMaxmessages = num
109 }
110}
111
khenaidoo90847922018-12-03 14:47:51 -0500112func ProducerMaxRetries(num int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerRetryMax = num
115 }
116}
117
118func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerRetryBackOff = duration
121 }
122}
123
124func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500125 return func(args *SaramaClient) {
126 args.producerReturnErrors = opt
127 }
128}
129
khenaidoo90847922018-12-03 14:47:51 -0500130func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500131 return func(args *SaramaClient) {
132 args.producerReturnSuccess = opt
133 }
134}
135
136func ConsumerMaxWait(wait int) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.consumerMaxwait = wait
139 }
140}
141
142func MaxProcessingTime(pTime int) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.maxProcessingTime = pTime
145 }
146}
147
148func NumPartitions(number int) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.numPartitions = number
151 }
152}
153
154func NumReplicas(number int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.numReplicas = number
157 }
158}
159
160func AutoCreateTopic(opt bool) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.autoCreateTopic = opt
163 }
164}
165
khenaidoo43c82122018-11-22 18:38:28 -0500166func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
167 client := &SaramaClient{
168 KafkaHost: DefaultKafkaHost,
169 KafkaPort: DefaultKafkaPort,
170 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500171 client.consumerType = DefaultConsumerType
172 client.producerFlushFrequency = DefaultProducerFlushFrequency
173 client.producerFlushMessages = DefaultProducerFlushMessages
174 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
175 client.producerReturnErrors = DefaultProducerReturnErrors
176 client.producerReturnSuccess = DefaultProducerReturnSuccess
177 client.producerRetryMax = DefaultProducerRetryMax
178 client.producerRetryBackOff = DefaultProducerRetryBackoff
179 client.consumerMaxwait = DefaultConsumerMaxwait
180 client.maxProcessingTime = DefaultMaxProcessingTime
181 client.numPartitions = DefaultNumberPartitions
182 client.numReplicas = DefaultNumberReplicas
183 client.autoCreateTopic = DefaultAutoCreateTopic
khenaidoo43c82122018-11-22 18:38:28 -0500184
185 for _, option := range opts {
186 option(client)
187 }
188
189 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
190
191 return client
192}
193
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500194func (sc *SaramaClient) Start() error {
195 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500196
197 // Create the Done channel
198 sc.doneCh = make(chan int, 1)
199
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500200 var err error
201
202 // Create the Cluster Admin
203 if err = sc.createClusterAdmin(); err != nil {
204 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
205 return err
206 }
207
khenaidoo43c82122018-11-22 18:38:28 -0500208 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500209 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500210 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
211 return err
212 }
213
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500214 // Create the master consumers
215 if err := sc.createConsumer(); err != nil {
216 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500217 return err
218 }
219
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500220 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500221 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
222
223 return nil
224}
225
226func (sc *SaramaClient) Stop() {
227 log.Info("stopping-sarama-client")
228
229 //Send a message over the done channel to close all long running routines
230 sc.doneCh <- 1
231
khenaidoo43c82122018-11-22 18:38:28 -0500232 if sc.producer != nil {
233 if err := sc.producer.Close(); err != nil {
234 panic(err)
235 }
236 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500237
khenaidoo43c82122018-11-22 18:38:28 -0500238 if sc.consumer != nil {
239 if err := sc.consumer.Close(); err != nil {
240 panic(err)
241 }
242 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500243
244 if sc.groupConsumer != nil {
245 if err := sc.groupConsumer.Close(); err != nil {
246 panic(err)
247 }
248 }
249
250 if sc.cAdmin != nil {
251 if err := sc.cAdmin.Close(); err != nil {
252 panic(err)
253 }
254 }
255
256 //TODO: Clear the consumers map
257 sc.clearConsumerChannelMap()
258
259 log.Info("sarama-client-stopped")
260}
261
262//CreateTopic creates a topic on the Kafka Broker.
263func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
264 // Set the topic details
265 topicDetail := &sarama.TopicDetail{}
266 topicDetail.NumPartitions = int32(numPartition)
267 topicDetail.ReplicationFactor = int16(repFactor)
268 topicDetail.ConfigEntries = make(map[string]*string)
269 topicDetails := make(map[string]*sarama.TopicDetail)
270 topicDetails[topic.Name] = topicDetail
271
272 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
273 if err == sarama.ErrTopicAlreadyExists {
274 // Not an error
275 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
276 return nil
277 }
278 log.Errorw("create-topic-failure", log.Fields{"error": err})
279 return err
280 }
281 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
282 // do so.
283 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
284 return nil
285}
286
287//DeleteTopic removes a topic from the kafka Broker
288func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
289 // Remove the topic from the broker
290 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
291 if err == sarama.ErrUnknownTopicOrPartition {
292 // Not an error as does not exist
293 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
294 return nil
295 }
296 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
297 return err
298 }
299
300 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
301 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
302 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
303 return err
304 }
305 return nil
306}
307
308// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
309// messages from that topic
khenaidoo79232702018-12-04 11:00:41 -0500310func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500311 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
312
313 // If a consumers already exist for that topic then resuse it
314 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
315 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
316 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500317 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500318 sc.addChannelToConsumerChannelMap(topic, ch)
319 return ch, nil
320 }
321
322 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500323 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500324 var err error
325
326 // Use the consumerType option to figure out the type of consumer to launch
327 if sc.consumerType == PartitionConsumer {
328 if sc.autoCreateTopic {
329 if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
330 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
331 return nil, err
332 }
333 }
334 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
335 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
336 return nil, err
337 }
338 } else if sc.consumerType == GroupCustomer {
339 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
340 // does not consume from a precreated topic in some scenarios
341 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
342 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
343 return nil, err
344 }
345 } else {
346 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
347 return nil, errors.New("unknown-consumer-type")
348 }
349
350 return consumerListeningChannel, nil
351}
352
353//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500354func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500355 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
356 err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
357 return err
358}
359
360// send formats and sends the request onto the kafka messaging bus.
361func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
362
363 // Assert message is a proto message
364 var protoMsg proto.Message
365 var ok bool
366 // ascertain the value interface type is a proto.Message
367 if protoMsg, ok = msg.(proto.Message); !ok {
368 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
369 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
370 }
371
372 var marshalled []byte
373 var err error
374 // Create the Sarama producer message
375 if marshalled, err = proto.Marshal(protoMsg); err != nil {
376 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
377 return err
378 }
379 key := ""
380 if len(keys) > 0 {
381 key = keys[0] // Only the first key is relevant
382 }
383 kafkaMsg := &sarama.ProducerMessage{
384 Topic: topic.Name,
385 Key: sarama.StringEncoder(key),
386 Value: sarama.ByteEncoder(marshalled),
387 }
388
389 // Send message to kafka
390 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500391
392 // Wait for result
393 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
394 select {
395 case ok := <-sc.producer.Successes():
khenaidoo79232702018-12-04 11:00:41 -0500396 log.Debugw("message-sent", log.Fields{"status": ok})
khenaidoo90847922018-12-03 14:47:51 -0500397 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500398 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500399 return notOk
400 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500401 return nil
402}
403
404func (sc *SaramaClient) createClusterAdmin() error {
405 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
406 config := sarama.NewConfig()
407 config.Version = sarama.V1_0_0_0
408
409 // Create a cluster Admin
410 var cAdmin sarama.ClusterAdmin
411 var err error
412 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
413 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
414 return err
415 }
416 sc.cAdmin = cAdmin
417 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500418}
419
420func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
421 sc.lockTopicToConsumerChannelMap.Lock()
422 defer sc.lockTopicToConsumerChannelMap.Unlock()
423 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
424 sc.topicToConsumerChannelMap[id] = arg
425 }
426}
427
428func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
429 sc.lockTopicToConsumerChannelMap.Lock()
430 defer sc.lockTopicToConsumerChannelMap.Unlock()
431 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
432 delete(sc.topicToConsumerChannelMap, id)
433 }
434}
435
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500436func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo43c82122018-11-22 18:38:28 -0500437 sc.lockTopicToConsumerChannelMap.Lock()
438 defer sc.lockTopicToConsumerChannelMap.Unlock()
439
440 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
441 return consumerCh
442 }
443 return nil
444}
445
khenaidoo79232702018-12-04 11:00:41 -0500446func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500447 sc.lockTopicToConsumerChannelMap.Lock()
448 defer sc.lockTopicToConsumerChannelMap.Unlock()
449 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
450 consumerCh.channels = append(consumerCh.channels, ch)
451 return
452 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500453 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
454}
455
456//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
457func closeConsumers(consumers []interface{}) error {
458 var err error
459 for _, consumer := range consumers {
460 // Is it a partition consumers?
461 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
462 if errTemp := partionConsumer.Close(); errTemp != nil {
463 log.Debugw("partition!!!", log.Fields{"err": errTemp})
464 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
465 // This can occur on race condition
466 err = nil
467 } else {
468 err = errTemp
469 }
470 }
471 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
472 if errTemp := groupConsumer.Close(); errTemp != nil {
473 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
474 // This can occur on race condition
475 err = nil
476 } else {
477 err = errTemp
478 }
479 }
480 }
481 }
482 return err
khenaidoo43c82122018-11-22 18:38:28 -0500483}
484
khenaidoo79232702018-12-04 11:00:41 -0500485func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500486 sc.lockTopicToConsumerChannelMap.Lock()
487 defer sc.lockTopicToConsumerChannelMap.Unlock()
488 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
489 // Channel will be closed in the removeChannel method
490 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500491 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500492 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500493 log.Debugw("closing-consumers", log.Fields{"topic": topic})
494 err := closeConsumers(consumerCh.consumers)
495 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500496 delete(sc.topicToConsumerChannelMap, topic.Name)
497 return err
498 }
499 return nil
500 }
501 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
502 return errors.New("topic-does-not-exist")
503}
504
505func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
506 sc.lockTopicToConsumerChannelMap.Lock()
507 defer sc.lockTopicToConsumerChannelMap.Unlock()
508 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
509 for _, ch := range consumerCh.channels {
510 // Channel will be closed in the removeChannel method
511 removeChannel(consumerCh.channels, ch)
512 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500513 err := closeConsumers(consumerCh.consumers)
514 //if err == sarama.ErrUnknownTopicOrPartition {
515 // // Not an error
516 // err = nil
517 //}
518 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500519 delete(sc.topicToConsumerChannelMap, topic.Name)
520 return err
521 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500522 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
523 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500524}
525
526func (sc *SaramaClient) clearConsumerChannelMap() error {
527 sc.lockTopicToConsumerChannelMap.Lock()
528 defer sc.lockTopicToConsumerChannelMap.Unlock()
529 var err error
530 for topic, consumerCh := range sc.topicToConsumerChannelMap {
531 for _, ch := range consumerCh.channels {
532 // Channel will be closed in the removeChannel method
533 removeChannel(consumerCh.channels, ch)
534 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500535 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
536 err = errTemp
537 }
538 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500539 delete(sc.topicToConsumerChannelMap, topic)
540 }
541 return err
542}
543
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500544//createPublisher creates the publisher which is used to send a message onto kafka
545func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500546 // This Creates the publisher
547 config := sarama.NewConfig()
548 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500549 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
550 config.Producer.Flush.Messages = sc.producerFlushMessages
551 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
552 config.Producer.Return.Errors = sc.producerReturnErrors
553 config.Producer.Return.Successes = sc.producerReturnSuccess
554 //config.Producer.RequiredAcks = sarama.WaitForAll
555 config.Producer.RequiredAcks = sarama.WaitForLocal
556
khenaidoo43c82122018-11-22 18:38:28 -0500557 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
558 brokers := []string{kafkaFullAddr}
559
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500560 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
561 log.Errorw("error-starting-publisher", log.Fields{"error": err})
562 return err
563 } else {
564 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500565 }
566 log.Info("Kafka-publisher-created")
567 return nil
568}
569
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500570func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500571 config := sarama.NewConfig()
572 config.Consumer.Return.Errors = true
573 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500574 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
575 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500576 config.Consumer.Offsets.Initial = sarama.OffsetNewest
577 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
578 brokers := []string{kafkaFullAddr}
579
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500580 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
581 log.Errorw("error-starting-consumers", log.Fields{"error": err})
582 return err
583 } else {
584 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500585 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500586 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500587 return nil
588}
589
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500590// createGroupConsumer creates a consumers group
khenaidoo43c82122018-11-22 18:38:28 -0500591func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
592 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500593 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500594 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500595 //config.Consumer.Return.Errors = true
596 //config.Group.Return.Notifications = false
597 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
598 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500599 config.Consumer.Offsets.Initial = sarama.OffsetNewest
600 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
601 brokers := []string{kafkaFullAddr}
602
603 if groupId == nil {
604 g := DefaultGroupName
605 groupId = &g
606 }
607 topics := []string{topic.Name}
608 var consumer *scc.Consumer
609 var err error
610
khenaidoo43c82122018-11-22 18:38:28 -0500611 if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500612 log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500613 return nil, err
614 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500615 log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500616 //time.Sleep(10*time.Second)
617 sc.groupConsumer = consumer
618 return consumer, nil
619}
620
khenaidoo43c82122018-11-22 18:38:28 -0500621// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
622// topic via the unique channel each subsciber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500623func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500624 // Need to go over all channels and publish messages to them - do we need to copy msg?
625 sc.lockTopicToConsumerChannelMap.Lock()
626 defer sc.lockTopicToConsumerChannelMap.Unlock()
627 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500628 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500629 c <- protoMessage
630 }(ch)
631 }
632}
633
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500634func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
635 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500636startloop:
637 for {
638 select {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500639 case err := <-consumer.Errors():
640 if err != nil {
641 log.Warnw("partition-consumers-error", log.Fields{"error": err})
642 } else {
643 // There is a race condition when this loop is stopped and the consumer is closed where
644 // the actual error comes as nil
645 log.Warn("partition-consumers-error")
646 }
647 case msg := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500648 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500649 if msg == nil {
650 // There is a race condition when this loop is stopped and the consumer is closed where
651 // the actual msg comes as nil
652 break startloop
653 }
654 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500655 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500656 if err := proto.Unmarshal(msgBody, icm); err != nil {
657 log.Warnw("partition-invalid-message", log.Fields{"error": err})
658 continue
659 }
660 go sc.dispatchToConsumers(consumerChnls, icm)
661 case <-sc.doneCh:
662 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
663 break startloop
664 }
665 }
666 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
667}
668
669func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
670 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
671
672startloop:
673 for {
674 select {
675 case err := <-consumer.Errors():
676 if err != nil {
677 log.Warnw("group-consumers-error", log.Fields{"error": err})
678 } else {
679 // There is a race condition when this loop is stopped and the consumer is closed where
680 // the actual error comes as nil
681 log.Warn("group-consumers-error")
682 }
683 case msg := <-consumer.Messages():
684 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
685 if msg == nil {
686 // There is a race condition when this loop is stopped and the consumer is closed where
687 // the actual msg comes as nil
688 break startloop
689 }
khenaidoo43c82122018-11-22 18:38:28 -0500690 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500691 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500692 if err := proto.Unmarshal(msgBody, icm); err != nil {
693 log.Warnw("invalid-message", log.Fields{"error": err})
694 continue
695 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500696 go sc.dispatchToConsumers(consumerChnls, icm)
697 consumer.MarkOffset(msg, "")
698 case ntf := <-consumer.Notifications():
699 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500700 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500701 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500702 break startloop
703 }
704 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500705 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500706}
707
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500708func (sc *SaramaClient) startConsumers(topic *Topic) error {
709 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
710 var consumerCh *consumerChannels
711 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
712 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
713 return errors.New("consumers-not-exist")
714 }
715 // For each consumer listening for that topic, start a consumption loop
716 for _, consumer := range consumerCh.consumers {
717 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
718 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
719 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
720 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
721 } else {
722 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
723 return errors.New("invalid-consumer")
724 }
725 }
726 return nil
727}
728
729//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
730//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500731func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500732 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500733 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500734
735 if pConsumers, err = sc.createPartionConsumers(topic, initialOffset); err != nil {
736 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500737 return nil, err
738 }
739
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500740 consumersIf := make([]interface{}, 0)
741 for _, pConsumer := range pConsumers {
742 consumersIf = append(consumersIf, pConsumer)
743 }
744
745 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500746 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500747 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500748 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500749 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500750 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500751 }
752
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500753 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500754 sc.addTopicToConsumerChannelMap(topic.Name, cc)
755
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500756 //Start a consumers to listen on that specific topic
757 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500758
759 return consumerListeningChannel, nil
760}
761
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500762// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
763// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500764func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500765 // TODO: Replace this development partition consumers with a group consumers
766 var pConsumer *scc.Consumer
767 var err error
768 if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
769 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
770 return nil, err
771 }
772 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
773 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500774 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500775 cc := &consumerChannels{
776 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500777 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500778 }
779
780 // Add the consumers channel to the map
781 sc.addTopicToConsumerChannelMap(topic.Name, cc)
782
783 //Start a consumers to listen on that specific topic
784 go sc.startConsumers(topic)
785
786 return consumerListeningChannel, nil
787}
788
789func (sc *SaramaClient) createPartionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
790 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500791 partitionList, err := sc.consumer.Partitions(topic.Name)
792 if err != nil {
793 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
794 return nil, err
795 }
796
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500797 pConsumers := make([]sarama.PartitionConsumer, 0)
798 for _, partition := range partitionList {
799 var pConsumer sarama.PartitionConsumer
800 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
801 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
802 return nil, err
803 }
804 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500805 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500806 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500807}
808
khenaidoo79232702018-12-04 11:00:41 -0500809func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500810 var i int
khenaidoo79232702018-12-04 11:00:41 -0500811 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500812 for i, channel = range channels {
813 if channel == ch {
814 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
815 close(channel)
816 return channels[:len(channels)-1]
817 }
818 }
819 return channels
820}