blob: e6699404c5caa868f564cdd704119ed766faa7a4 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
khenaidoo43c82122018-11-22 18:38:28 -050021 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050023 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050024 "github.com/opencord/voltha-go/common/log"
khenaidoo79232702018-12-04 11:00:41 -050025 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "gopkg.in/Shopify/sarama.v1"
27 "strings"
khenaidoo43c82122018-11-22 18:38:28 -050028 "sync"
29 "time"
30)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
khenaidooca301322019-01-09 23:06:32 -050033 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050034}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050043 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050054 groupConsumers map[string]*scc.Consumer
55 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050056 consumerType int
khenaidooca301322019-01-09 23:06:32 -050057 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050058 producerFlushFrequency int
59 producerFlushMessages int
60 producerFlushMaxmessages int
61 producerRetryMax int
62 producerRetryBackOff time.Duration
63 producerReturnSuccess bool
64 producerReturnErrors bool
65 consumerMaxwait int
66 maxProcessingTime int
67 numPartitions int
68 numReplicas int
69 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050070 doneCh chan int
71 topicToConsumerChannelMap map[string]*consumerChannels
72 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050073 topicLockMap map[string]*sync.RWMutex
74 lockOfTopicLockMap sync.RWMutex
khenaidoo43c82122018-11-22 18:38:28 -050075}
76
77type SaramaClientOption func(*SaramaClient)
78
79func Host(host string) SaramaClientOption {
80 return func(args *SaramaClient) {
81 args.KafkaHost = host
82 }
83}
84
85func Port(port int) SaramaClientOption {
86 return func(args *SaramaClient) {
87 args.KafkaPort = port
88 }
89}
90
khenaidooca301322019-01-09 23:06:32 -050091func ConsumerGroupPrefix(prefix string) SaramaClientOption {
92 return func(args *SaramaClient) {
93 args.consumerGroupPrefix = prefix
94 }
95}
96
97func ConsumerGroupName(name string) SaramaClientOption {
98 return func(args *SaramaClient) {
99 args.consumerGroupName = name
100 }
101}
102
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500103func ConsumerType(consumer int) SaramaClientOption {
104 return func(args *SaramaClient) {
105 args.consumerType = consumer
106 }
107}
108
109func ProducerFlushFrequency(frequency int) SaramaClientOption {
110 return func(args *SaramaClient) {
111 args.producerFlushFrequency = frequency
112 }
113}
114
115func ProducerFlushMessages(num int) SaramaClientOption {
116 return func(args *SaramaClient) {
117 args.producerFlushMessages = num
118 }
119}
120
121func ProducerFlushMaxMessages(num int) SaramaClientOption {
122 return func(args *SaramaClient) {
123 args.producerFlushMaxmessages = num
124 }
125}
126
khenaidoo90847922018-12-03 14:47:51 -0500127func ProducerMaxRetries(num int) SaramaClientOption {
128 return func(args *SaramaClient) {
129 args.producerRetryMax = num
130 }
131}
132
133func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
134 return func(args *SaramaClient) {
135 args.producerRetryBackOff = duration
136 }
137}
138
139func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500140 return func(args *SaramaClient) {
141 args.producerReturnErrors = opt
142 }
143}
144
khenaidoo90847922018-12-03 14:47:51 -0500145func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500146 return func(args *SaramaClient) {
147 args.producerReturnSuccess = opt
148 }
149}
150
151func ConsumerMaxWait(wait int) SaramaClientOption {
152 return func(args *SaramaClient) {
153 args.consumerMaxwait = wait
154 }
155}
156
157func MaxProcessingTime(pTime int) SaramaClientOption {
158 return func(args *SaramaClient) {
159 args.maxProcessingTime = pTime
160 }
161}
162
163func NumPartitions(number int) SaramaClientOption {
164 return func(args *SaramaClient) {
165 args.numPartitions = number
166 }
167}
168
169func NumReplicas(number int) SaramaClientOption {
170 return func(args *SaramaClient) {
171 args.numReplicas = number
172 }
173}
174
175func AutoCreateTopic(opt bool) SaramaClientOption {
176 return func(args *SaramaClient) {
177 args.autoCreateTopic = opt
178 }
179}
180
khenaidoo43c82122018-11-22 18:38:28 -0500181func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
182 client := &SaramaClient{
183 KafkaHost: DefaultKafkaHost,
184 KafkaPort: DefaultKafkaPort,
185 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500186 client.consumerType = DefaultConsumerType
187 client.producerFlushFrequency = DefaultProducerFlushFrequency
188 client.producerFlushMessages = DefaultProducerFlushMessages
189 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
190 client.producerReturnErrors = DefaultProducerReturnErrors
191 client.producerReturnSuccess = DefaultProducerReturnSuccess
192 client.producerRetryMax = DefaultProducerRetryMax
193 client.producerRetryBackOff = DefaultProducerRetryBackoff
194 client.consumerMaxwait = DefaultConsumerMaxwait
195 client.maxProcessingTime = DefaultMaxProcessingTime
196 client.numPartitions = DefaultNumberPartitions
197 client.numReplicas = DefaultNumberReplicas
198 client.autoCreateTopic = DefaultAutoCreateTopic
khenaidoo43c82122018-11-22 18:38:28 -0500199
200 for _, option := range opts {
201 option(client)
202 }
203
khenaidooca301322019-01-09 23:06:32 -0500204 client.groupConsumers = make(map[string]*scc.Consumer)
205
khenaidoo43c82122018-11-22 18:38:28 -0500206 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500207 client.topicLockMap = make(map[string]*sync.RWMutex)
208 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500209 return client
210}
211
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500212func (sc *SaramaClient) Start() error {
213 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500214
215 // Create the Done channel
216 sc.doneCh = make(chan int, 1)
217
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500218 var err error
219
220 // Create the Cluster Admin
221 if err = sc.createClusterAdmin(); err != nil {
222 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
223 return err
224 }
225
khenaidoo43c82122018-11-22 18:38:28 -0500226 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500227 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500228 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
229 return err
230 }
231
khenaidooca301322019-01-09 23:06:32 -0500232 if sc.consumerType == DefaultConsumerType {
233 // Create the master consumers
234 if err := sc.createConsumer(); err != nil {
235 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
236 return err
237 }
khenaidoo43c82122018-11-22 18:38:28 -0500238 }
239
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500240 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500241 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
242
khenaidooca301322019-01-09 23:06:32 -0500243 log.Info("kafka-sarama-client-started")
244
khenaidoo43c82122018-11-22 18:38:28 -0500245 return nil
246}
247
248func (sc *SaramaClient) Stop() {
249 log.Info("stopping-sarama-client")
250
251 //Send a message over the done channel to close all long running routines
252 sc.doneCh <- 1
253
khenaidoo43c82122018-11-22 18:38:28 -0500254 if sc.producer != nil {
255 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500256 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500257 }
258 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500259
khenaidoo43c82122018-11-22 18:38:28 -0500260 if sc.consumer != nil {
261 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500262 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500263 }
264 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500265
khenaidooca301322019-01-09 23:06:32 -0500266 for key, val := range sc.groupConsumers {
267 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
268 if err := val.Close(); err != nil {
269 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500270 }
271 }
272
273 if sc.cAdmin != nil {
274 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500275 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500276 }
277 }
278
279 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500280 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500281
282 log.Info("sarama-client-stopped")
283}
284
khenaidooca301322019-01-09 23:06:32 -0500285//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
286// the invoking function must hold the lock
287func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500288 // Set the topic details
289 topicDetail := &sarama.TopicDetail{}
290 topicDetail.NumPartitions = int32(numPartition)
291 topicDetail.ReplicationFactor = int16(repFactor)
292 topicDetail.ConfigEntries = make(map[string]*string)
293 topicDetails := make(map[string]*sarama.TopicDetail)
294 topicDetails[topic.Name] = topicDetail
295
296 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
297 if err == sarama.ErrTopicAlreadyExists {
298 // Not an error
299 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
300 return nil
301 }
302 log.Errorw("create-topic-failure", log.Fields{"error": err})
303 return err
304 }
305 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
306 // do so.
307 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
308 return nil
309}
310
khenaidooca301322019-01-09 23:06:32 -0500311//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
312// ensure no two go routines are performing operations on the same topic
313func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
314 sc.lockTopic(topic)
315 defer sc.unLockTopic(topic)
316
317 return sc.createTopic(topic, numPartition, repFactor)
318}
319
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500320//DeleteTopic removes a topic from the kafka Broker
321func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500322 sc.lockTopic(topic)
323 defer sc.unLockTopic(topic)
324
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500325 // Remove the topic from the broker
326 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
327 if err == sarama.ErrUnknownTopicOrPartition {
328 // Not an error as does not exist
329 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
330 return nil
331 }
332 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
333 return err
334 }
335
336 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
337 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
338 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
339 return err
340 }
341 return nil
342}
343
344// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
345// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500346func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500347 sc.lockTopic(topic)
348 defer sc.unLockTopic(topic)
349
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500350 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
351
352 // If a consumers already exist for that topic then resuse it
353 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
354 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
355 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500356 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500357 sc.addChannelToConsumerChannelMap(topic, ch)
358 return ch, nil
359 }
360
361 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500362 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500363 var err error
364
365 // Use the consumerType option to figure out the type of consumer to launch
366 if sc.consumerType == PartitionConsumer {
367 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500368 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500369 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
370 return nil, err
371 }
372 }
373 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
374 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
375 return nil, err
376 }
377 } else if sc.consumerType == GroupCustomer {
378 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
379 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500380 //if sc.autoCreateTopic {
381 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
382 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
383 // return nil, err
384 // }
385 //}
386 //groupId := sc.consumerGroupName
387 groupId := getGroupId(kvArgs...)
388 // Include the group prefix
389 if groupId != "" {
390 groupId = sc.consumerGroupPrefix + groupId
391 } else {
392 // Need to use a unique group Id per topic
393 groupId = sc.consumerGroupPrefix + topic.Name
394 }
395 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
396 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500397 return nil, err
398 }
khenaidooca301322019-01-09 23:06:32 -0500399
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500400 } else {
401 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
402 return nil, errors.New("unknown-consumer-type")
403 }
404
405 return consumerListeningChannel, nil
406}
407
408//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500409func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500410 sc.lockTopic(topic)
411 defer sc.unLockTopic(topic)
412
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500413 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
414 err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
415 return err
416}
417
418// send formats and sends the request onto the kafka messaging bus.
419func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
420
421 // Assert message is a proto message
422 var protoMsg proto.Message
423 var ok bool
424 // ascertain the value interface type is a proto.Message
425 if protoMsg, ok = msg.(proto.Message); !ok {
426 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
427 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
428 }
429
430 var marshalled []byte
431 var err error
432 // Create the Sarama producer message
433 if marshalled, err = proto.Marshal(protoMsg); err != nil {
434 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
435 return err
436 }
437 key := ""
438 if len(keys) > 0 {
439 key = keys[0] // Only the first key is relevant
440 }
441 kafkaMsg := &sarama.ProducerMessage{
442 Topic: topic.Name,
443 Key: sarama.StringEncoder(key),
444 Value: sarama.ByteEncoder(marshalled),
445 }
446
447 // Send message to kafka
448 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500449
450 // Wait for result
451 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
452 select {
453 case ok := <-sc.producer.Successes():
khenaidoo79232702018-12-04 11:00:41 -0500454 log.Debugw("message-sent", log.Fields{"status": ok})
khenaidoo90847922018-12-03 14:47:51 -0500455 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500456 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500457 return notOk
458 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500459 return nil
460}
461
khenaidooca301322019-01-09 23:06:32 -0500462// getGroupId returns the group id from the key-value args.
463func getGroupId(kvArgs ...*KVArg) string {
464 for _, arg := range kvArgs {
465 if arg.Key == GroupIdKey {
466 return arg.Value.(string)
467 }
468 }
469 return ""
470}
471
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500472func (sc *SaramaClient) createClusterAdmin() error {
473 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
474 config := sarama.NewConfig()
475 config.Version = sarama.V1_0_0_0
476
477 // Create a cluster Admin
478 var cAdmin sarama.ClusterAdmin
479 var err error
480 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
481 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
482 return err
483 }
484 sc.cAdmin = cAdmin
485 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500486}
487
khenaidood2b6df92018-12-13 16:37:20 -0500488func (sc *SaramaClient) lockTopic(topic *Topic) {
489 sc.lockOfTopicLockMap.Lock()
490 if _, exist := sc.topicLockMap[topic.Name]; exist {
491 sc.lockOfTopicLockMap.Unlock()
492 sc.topicLockMap[topic.Name].Lock()
493 } else {
494 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
495 sc.lockOfTopicLockMap.Unlock()
496 sc.topicLockMap[topic.Name].Lock()
497 }
498}
499
500func (sc *SaramaClient) unLockTopic(topic *Topic) {
501 sc.lockOfTopicLockMap.Lock()
502 defer sc.lockOfTopicLockMap.Unlock()
503 if _, exist := sc.topicLockMap[topic.Name]; exist {
504 sc.topicLockMap[topic.Name].Unlock()
505 }
506}
507
khenaidoo43c82122018-11-22 18:38:28 -0500508func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
509 sc.lockTopicToConsumerChannelMap.Lock()
510 defer sc.lockTopicToConsumerChannelMap.Unlock()
511 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
512 sc.topicToConsumerChannelMap[id] = arg
513 }
514}
515
516func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
517 sc.lockTopicToConsumerChannelMap.Lock()
518 defer sc.lockTopicToConsumerChannelMap.Unlock()
519 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
520 delete(sc.topicToConsumerChannelMap, id)
521 }
522}
523
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500524func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo43c82122018-11-22 18:38:28 -0500525 sc.lockTopicToConsumerChannelMap.Lock()
526 defer sc.lockTopicToConsumerChannelMap.Unlock()
527
528 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
529 return consumerCh
530 }
531 return nil
532}
533
khenaidoo79232702018-12-04 11:00:41 -0500534func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500535 sc.lockTopicToConsumerChannelMap.Lock()
536 defer sc.lockTopicToConsumerChannelMap.Unlock()
537 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
538 consumerCh.channels = append(consumerCh.channels, ch)
539 return
540 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500541 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
542}
543
544//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
545func closeConsumers(consumers []interface{}) error {
546 var err error
547 for _, consumer := range consumers {
548 // Is it a partition consumers?
549 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
550 if errTemp := partionConsumer.Close(); errTemp != nil {
551 log.Debugw("partition!!!", log.Fields{"err": errTemp})
552 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
553 // This can occur on race condition
554 err = nil
555 } else {
556 err = errTemp
557 }
558 }
559 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
560 if errTemp := groupConsumer.Close(); errTemp != nil {
561 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
562 // This can occur on race condition
563 err = nil
564 } else {
565 err = errTemp
566 }
567 }
568 }
569 }
570 return err
khenaidoo43c82122018-11-22 18:38:28 -0500571}
572
khenaidoo79232702018-12-04 11:00:41 -0500573func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500574 sc.lockTopicToConsumerChannelMap.Lock()
575 defer sc.lockTopicToConsumerChannelMap.Unlock()
576 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
577 // Channel will be closed in the removeChannel method
578 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500579 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500580 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500581 log.Debugw("closing-consumers", log.Fields{"topic": topic})
582 err := closeConsumers(consumerCh.consumers)
583 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500584 delete(sc.topicToConsumerChannelMap, topic.Name)
585 return err
586 }
587 return nil
588 }
589 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
590 return errors.New("topic-does-not-exist")
591}
592
593func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
594 sc.lockTopicToConsumerChannelMap.Lock()
595 defer sc.lockTopicToConsumerChannelMap.Unlock()
596 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
597 for _, ch := range consumerCh.channels {
598 // Channel will be closed in the removeChannel method
599 removeChannel(consumerCh.channels, ch)
600 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500601 err := closeConsumers(consumerCh.consumers)
602 //if err == sarama.ErrUnknownTopicOrPartition {
603 // // Not an error
604 // err = nil
605 //}
606 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500607 delete(sc.topicToConsumerChannelMap, topic.Name)
608 return err
609 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500610 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
611 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500612}
613
614func (sc *SaramaClient) clearConsumerChannelMap() error {
615 sc.lockTopicToConsumerChannelMap.Lock()
616 defer sc.lockTopicToConsumerChannelMap.Unlock()
617 var err error
618 for topic, consumerCh := range sc.topicToConsumerChannelMap {
619 for _, ch := range consumerCh.channels {
620 // Channel will be closed in the removeChannel method
621 removeChannel(consumerCh.channels, ch)
622 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500623 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
624 err = errTemp
625 }
626 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500627 delete(sc.topicToConsumerChannelMap, topic)
628 }
629 return err
630}
631
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500632//createPublisher creates the publisher which is used to send a message onto kafka
633func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500634 // This Creates the publisher
635 config := sarama.NewConfig()
636 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500637 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
638 config.Producer.Flush.Messages = sc.producerFlushMessages
639 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
640 config.Producer.Return.Errors = sc.producerReturnErrors
641 config.Producer.Return.Successes = sc.producerReturnSuccess
642 //config.Producer.RequiredAcks = sarama.WaitForAll
643 config.Producer.RequiredAcks = sarama.WaitForLocal
644
khenaidoo43c82122018-11-22 18:38:28 -0500645 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
646 brokers := []string{kafkaFullAddr}
647
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500648 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
649 log.Errorw("error-starting-publisher", log.Fields{"error": err})
650 return err
651 } else {
652 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500653 }
654 log.Info("Kafka-publisher-created")
655 return nil
656}
657
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500658func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500659 config := sarama.NewConfig()
660 config.Consumer.Return.Errors = true
661 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500662 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
663 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500664 config.Consumer.Offsets.Initial = sarama.OffsetNewest
665 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
666 brokers := []string{kafkaFullAddr}
667
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500668 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
669 log.Errorw("error-starting-consumers", log.Fields{"error": err})
670 return err
671 } else {
672 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500673 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500674 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500675 return nil
676}
677
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500678// createGroupConsumer creates a consumers group
khenaidooca301322019-01-09 23:06:32 -0500679func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500680 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500681 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500682 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500683 //config.Consumer.Return.Errors = true
684 //config.Group.Return.Notifications = false
685 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
686 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500687 config.Consumer.Offsets.Initial = sarama.OffsetNewest
khenaidooca301322019-01-09 23:06:32 -0500688 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500689 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
690 brokers := []string{kafkaFullAddr}
691
khenaidoo43c82122018-11-22 18:38:28 -0500692 topics := []string{topic.Name}
693 var consumer *scc.Consumer
694 var err error
695
khenaidooca301322019-01-09 23:06:32 -0500696 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
697 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500698 return nil, err
699 }
khenaidooca301322019-01-09 23:06:32 -0500700 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500701 //time.Sleep(10*time.Second)
khenaidooca301322019-01-09 23:06:32 -0500702 //sc.groupConsumer = consumer
703 sc.groupConsumers[topic.Name] = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500704 return consumer, nil
705}
706
khenaidoo43c82122018-11-22 18:38:28 -0500707// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
708// topic via the unique channel each subsciber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500709func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500710 // Need to go over all channels and publish messages to them - do we need to copy msg?
711 sc.lockTopicToConsumerChannelMap.Lock()
712 defer sc.lockTopicToConsumerChannelMap.Unlock()
713 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500714 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500715 c <- protoMessage
716 }(ch)
717 }
718}
719
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500720func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
721 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500722startloop:
723 for {
724 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500725 case err, ok := <-consumer.Errors():
726 if ok {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500727 log.Warnw("partition-consumers-error", log.Fields{"error": err})
728 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500729 // Channel is closed
730 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500731 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500732 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500733 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500734 if !ok {
735 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500736 break startloop
737 }
738 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500739 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500740 if err := proto.Unmarshal(msgBody, icm); err != nil {
741 log.Warnw("partition-invalid-message", log.Fields{"error": err})
742 continue
743 }
744 go sc.dispatchToConsumers(consumerChnls, icm)
745 case <-sc.doneCh:
746 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
747 break startloop
748 }
749 }
750 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
751}
752
753func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
754 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
755
756startloop:
757 for {
758 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500759 case err, ok := <-consumer.Errors():
760 if ok {
khenaidooca301322019-01-09 23:06:32 -0500761 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500762 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500763 // channel is closed
764 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500765 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500766 case msg, ok := <-consumer.Messages():
767 if !ok {
768 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500769 break startloop
770 }
khenaidooca301322019-01-09 23:06:32 -0500771 log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500772 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500773 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500774 if err := proto.Unmarshal(msgBody, icm); err != nil {
775 log.Warnw("invalid-message", log.Fields{"error": err})
776 continue
777 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500778 go sc.dispatchToConsumers(consumerChnls, icm)
779 consumer.MarkOffset(msg, "")
780 case ntf := <-consumer.Notifications():
781 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500782 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500783 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500784 break startloop
785 }
786 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500787 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500788}
789
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500790func (sc *SaramaClient) startConsumers(topic *Topic) error {
791 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
792 var consumerCh *consumerChannels
793 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
794 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
795 return errors.New("consumers-not-exist")
796 }
797 // For each consumer listening for that topic, start a consumption loop
798 for _, consumer := range consumerCh.consumers {
799 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
800 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
801 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
802 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
803 } else {
804 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
805 return errors.New("invalid-consumer")
806 }
807 }
808 return nil
809}
810
811//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
812//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500813func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500814 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500815 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500816
817 if pConsumers, err = sc.createPartionConsumers(topic, initialOffset); err != nil {
818 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500819 return nil, err
820 }
821
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500822 consumersIf := make([]interface{}, 0)
823 for _, pConsumer := range pConsumers {
824 consumersIf = append(consumersIf, pConsumer)
825 }
826
827 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500828 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500829 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500830 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500831 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500832 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500833 }
834
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500835 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500836 sc.addTopicToConsumerChannelMap(topic.Name, cc)
837
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500838 //Start a consumers to listen on that specific topic
839 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500840
841 return consumerListeningChannel, nil
842}
843
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500844// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
845// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500846func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500847 // TODO: Replace this development partition consumers with a group consumers
848 var pConsumer *scc.Consumer
849 var err error
khenaidooca301322019-01-09 23:06:32 -0500850 if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500851 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
852 return nil, err
853 }
854 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
855 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500856 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500857 cc := &consumerChannels{
858 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500859 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500860 }
861
862 // Add the consumers channel to the map
863 sc.addTopicToConsumerChannelMap(topic.Name, cc)
864
865 //Start a consumers to listen on that specific topic
866 go sc.startConsumers(topic)
867
868 return consumerListeningChannel, nil
869}
870
871func (sc *SaramaClient) createPartionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
872 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500873 partitionList, err := sc.consumer.Partitions(topic.Name)
874 if err != nil {
875 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
876 return nil, err
877 }
878
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500879 pConsumers := make([]sarama.PartitionConsumer, 0)
880 for _, partition := range partitionList {
881 var pConsumer sarama.PartitionConsumer
882 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
883 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
884 return nil, err
885 }
886 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500887 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500888 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500889}
890
khenaidoo79232702018-12-04 11:00:41 -0500891func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500892 var i int
khenaidoo79232702018-12-04 11:00:41 -0500893 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500894 for i, channel = range channels {
895 if channel == ch {
896 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
897 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500898 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -0500899 return channels[:len(channels)-1]
900 }
901 }
902 return channels
903}