blob: 2df19e5e5675bb2e66c3caef2c8c01a7196347cd [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
khenaidoo43c82122018-11-22 18:38:28 -050021 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050023 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050024 "github.com/opencord/voltha-go/common/log"
khenaidoo79232702018-12-04 11:00:41 -050025 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "gopkg.in/Shopify/sarama.v1"
27 "strings"
khenaidoo43c82122018-11-22 18:38:28 -050028 "sync"
29 "time"
30)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
33 log.AddPackage(log.JSON, log.WarnLevel, nil)
34}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050043 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
54 groupConsumer *scc.Consumer
khenaidoo4c1a5bf2018-11-29 15:53:42 -050055 consumerType int
56 groupName string
57 producerFlushFrequency int
58 producerFlushMessages int
59 producerFlushMaxmessages int
60 producerRetryMax int
61 producerRetryBackOff time.Duration
62 producerReturnSuccess bool
63 producerReturnErrors bool
64 consumerMaxwait int
65 maxProcessingTime int
66 numPartitions int
67 numReplicas int
68 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050069 doneCh chan int
70 topicToConsumerChannelMap map[string]*consumerChannels
71 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050072 topicLockMap map[string]*sync.RWMutex
73 lockOfTopicLockMap sync.RWMutex
khenaidoo43c82122018-11-22 18:38:28 -050074}
75
76type SaramaClientOption func(*SaramaClient)
77
78func Host(host string) SaramaClientOption {
79 return func(args *SaramaClient) {
80 args.KafkaHost = host
81 }
82}
83
84func Port(port int) SaramaClientOption {
85 return func(args *SaramaClient) {
86 args.KafkaPort = port
87 }
88}
89
khenaidoo4c1a5bf2018-11-29 15:53:42 -050090func ConsumerType(consumer int) SaramaClientOption {
91 return func(args *SaramaClient) {
92 args.consumerType = consumer
93 }
94}
95
96func ProducerFlushFrequency(frequency int) SaramaClientOption {
97 return func(args *SaramaClient) {
98 args.producerFlushFrequency = frequency
99 }
100}
101
102func ProducerFlushMessages(num int) SaramaClientOption {
103 return func(args *SaramaClient) {
104 args.producerFlushMessages = num
105 }
106}
107
108func ProducerFlushMaxMessages(num int) SaramaClientOption {
109 return func(args *SaramaClient) {
110 args.producerFlushMaxmessages = num
111 }
112}
113
khenaidoo90847922018-12-03 14:47:51 -0500114func ProducerMaxRetries(num int) SaramaClientOption {
115 return func(args *SaramaClient) {
116 args.producerRetryMax = num
117 }
118}
119
120func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
121 return func(args *SaramaClient) {
122 args.producerRetryBackOff = duration
123 }
124}
125
126func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500127 return func(args *SaramaClient) {
128 args.producerReturnErrors = opt
129 }
130}
131
khenaidoo90847922018-12-03 14:47:51 -0500132func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500133 return func(args *SaramaClient) {
134 args.producerReturnSuccess = opt
135 }
136}
137
138func ConsumerMaxWait(wait int) SaramaClientOption {
139 return func(args *SaramaClient) {
140 args.consumerMaxwait = wait
141 }
142}
143
144func MaxProcessingTime(pTime int) SaramaClientOption {
145 return func(args *SaramaClient) {
146 args.maxProcessingTime = pTime
147 }
148}
149
150func NumPartitions(number int) SaramaClientOption {
151 return func(args *SaramaClient) {
152 args.numPartitions = number
153 }
154}
155
156func NumReplicas(number int) SaramaClientOption {
157 return func(args *SaramaClient) {
158 args.numReplicas = number
159 }
160}
161
162func AutoCreateTopic(opt bool) SaramaClientOption {
163 return func(args *SaramaClient) {
164 args.autoCreateTopic = opt
165 }
166}
167
khenaidoo43c82122018-11-22 18:38:28 -0500168func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
169 client := &SaramaClient{
170 KafkaHost: DefaultKafkaHost,
171 KafkaPort: DefaultKafkaPort,
172 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500173 client.consumerType = DefaultConsumerType
174 client.producerFlushFrequency = DefaultProducerFlushFrequency
175 client.producerFlushMessages = DefaultProducerFlushMessages
176 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
177 client.producerReturnErrors = DefaultProducerReturnErrors
178 client.producerReturnSuccess = DefaultProducerReturnSuccess
179 client.producerRetryMax = DefaultProducerRetryMax
180 client.producerRetryBackOff = DefaultProducerRetryBackoff
181 client.consumerMaxwait = DefaultConsumerMaxwait
182 client.maxProcessingTime = DefaultMaxProcessingTime
183 client.numPartitions = DefaultNumberPartitions
184 client.numReplicas = DefaultNumberReplicas
185 client.autoCreateTopic = DefaultAutoCreateTopic
khenaidoo43c82122018-11-22 18:38:28 -0500186
187 for _, option := range opts {
188 option(client)
189 }
190
191 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500192 client.topicLockMap = make(map[string]*sync.RWMutex)
193 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500194 return client
195}
196
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500197func (sc *SaramaClient) Start() error {
198 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500199
200 // Create the Done channel
201 sc.doneCh = make(chan int, 1)
202
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500203 var err error
204
205 // Create the Cluster Admin
206 if err = sc.createClusterAdmin(); err != nil {
207 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
208 return err
209 }
210
khenaidoo43c82122018-11-22 18:38:28 -0500211 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500212 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500213 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
214 return err
215 }
216
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500217 // Create the master consumers
218 if err := sc.createConsumer(); err != nil {
219 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500220 return err
221 }
222
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500223 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500224 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
225
226 return nil
227}
228
229func (sc *SaramaClient) Stop() {
230 log.Info("stopping-sarama-client")
231
232 //Send a message over the done channel to close all long running routines
233 sc.doneCh <- 1
234
khenaidoo43c82122018-11-22 18:38:28 -0500235 if sc.producer != nil {
236 if err := sc.producer.Close(); err != nil {
237 panic(err)
238 }
239 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500240
khenaidoo43c82122018-11-22 18:38:28 -0500241 if sc.consumer != nil {
242 if err := sc.consumer.Close(); err != nil {
243 panic(err)
244 }
245 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500246
247 if sc.groupConsumer != nil {
248 if err := sc.groupConsumer.Close(); err != nil {
249 panic(err)
250 }
251 }
252
253 if sc.cAdmin != nil {
254 if err := sc.cAdmin.Close(); err != nil {
255 panic(err)
256 }
257 }
258
259 //TODO: Clear the consumers map
260 sc.clearConsumerChannelMap()
261
262 log.Info("sarama-client-stopped")
263}
264
265//CreateTopic creates a topic on the Kafka Broker.
266func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidood2b6df92018-12-13 16:37:20 -0500267 sc.lockTopic(topic)
268 defer sc.unLockTopic(topic)
269
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500270 // Set the topic details
271 topicDetail := &sarama.TopicDetail{}
272 topicDetail.NumPartitions = int32(numPartition)
273 topicDetail.ReplicationFactor = int16(repFactor)
274 topicDetail.ConfigEntries = make(map[string]*string)
275 topicDetails := make(map[string]*sarama.TopicDetail)
276 topicDetails[topic.Name] = topicDetail
277
278 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
279 if err == sarama.ErrTopicAlreadyExists {
280 // Not an error
281 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
282 return nil
283 }
284 log.Errorw("create-topic-failure", log.Fields{"error": err})
285 return err
286 }
287 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
288 // do so.
289 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
290 return nil
291}
292
293//DeleteTopic removes a topic from the kafka Broker
294func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500295 sc.lockTopic(topic)
296 defer sc.unLockTopic(topic)
297
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500298 // Remove the topic from the broker
299 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
300 if err == sarama.ErrUnknownTopicOrPartition {
301 // Not an error as does not exist
302 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
303 return nil
304 }
305 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
306 return err
307 }
308
309 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
310 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
311 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
312 return err
313 }
314 return nil
315}
316
317// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
318// messages from that topic
khenaidoo79232702018-12-04 11:00:41 -0500319func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500320 sc.lockTopic(topic)
321 defer sc.unLockTopic(topic)
322
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500323 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
324
325 // If a consumers already exist for that topic then resuse it
326 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
327 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
328 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500329 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500330 sc.addChannelToConsumerChannelMap(topic, ch)
331 return ch, nil
332 }
333
334 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500335 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500336 var err error
337
338 // Use the consumerType option to figure out the type of consumer to launch
339 if sc.consumerType == PartitionConsumer {
340 if sc.autoCreateTopic {
341 if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
342 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
343 return nil, err
344 }
345 }
346 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
347 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
348 return nil, err
349 }
350 } else if sc.consumerType == GroupCustomer {
351 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
352 // does not consume from a precreated topic in some scenarios
353 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
354 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
355 return nil, err
356 }
357 } else {
358 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
359 return nil, errors.New("unknown-consumer-type")
360 }
361
362 return consumerListeningChannel, nil
363}
364
365//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500366func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500367 sc.lockTopic(topic)
368 defer sc.unLockTopic(topic)
369
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500370 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
371 err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
372 return err
373}
374
375// send formats and sends the request onto the kafka messaging bus.
376func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
377
378 // Assert message is a proto message
379 var protoMsg proto.Message
380 var ok bool
381 // ascertain the value interface type is a proto.Message
382 if protoMsg, ok = msg.(proto.Message); !ok {
383 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
384 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
385 }
386
387 var marshalled []byte
388 var err error
389 // Create the Sarama producer message
390 if marshalled, err = proto.Marshal(protoMsg); err != nil {
391 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
392 return err
393 }
394 key := ""
395 if len(keys) > 0 {
396 key = keys[0] // Only the first key is relevant
397 }
398 kafkaMsg := &sarama.ProducerMessage{
399 Topic: topic.Name,
400 Key: sarama.StringEncoder(key),
401 Value: sarama.ByteEncoder(marshalled),
402 }
403
404 // Send message to kafka
405 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500406
407 // Wait for result
408 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
409 select {
410 case ok := <-sc.producer.Successes():
khenaidoo79232702018-12-04 11:00:41 -0500411 log.Debugw("message-sent", log.Fields{"status": ok})
khenaidoo90847922018-12-03 14:47:51 -0500412 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500413 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500414 return notOk
415 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500416 return nil
417}
418
419func (sc *SaramaClient) createClusterAdmin() error {
420 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
421 config := sarama.NewConfig()
422 config.Version = sarama.V1_0_0_0
423
424 // Create a cluster Admin
425 var cAdmin sarama.ClusterAdmin
426 var err error
427 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
428 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
429 return err
430 }
431 sc.cAdmin = cAdmin
432 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500433}
434
khenaidood2b6df92018-12-13 16:37:20 -0500435func (sc *SaramaClient) lockTopic(topic *Topic) {
436 sc.lockOfTopicLockMap.Lock()
437 if _, exist := sc.topicLockMap[topic.Name]; exist {
438 sc.lockOfTopicLockMap.Unlock()
439 sc.topicLockMap[topic.Name].Lock()
440 } else {
441 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
442 sc.lockOfTopicLockMap.Unlock()
443 sc.topicLockMap[topic.Name].Lock()
444 }
445}
446
447func (sc *SaramaClient) unLockTopic(topic *Topic) {
448 sc.lockOfTopicLockMap.Lock()
449 defer sc.lockOfTopicLockMap.Unlock()
450 if _, exist := sc.topicLockMap[topic.Name]; exist {
451 sc.topicLockMap[topic.Name].Unlock()
452 }
453}
454
khenaidoo43c82122018-11-22 18:38:28 -0500455func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
456 sc.lockTopicToConsumerChannelMap.Lock()
457 defer sc.lockTopicToConsumerChannelMap.Unlock()
458 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
459 sc.topicToConsumerChannelMap[id] = arg
460 }
461}
462
463func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
464 sc.lockTopicToConsumerChannelMap.Lock()
465 defer sc.lockTopicToConsumerChannelMap.Unlock()
466 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
467 delete(sc.topicToConsumerChannelMap, id)
468 }
469}
470
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500471func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo43c82122018-11-22 18:38:28 -0500472 sc.lockTopicToConsumerChannelMap.Lock()
473 defer sc.lockTopicToConsumerChannelMap.Unlock()
474
475 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
476 return consumerCh
477 }
478 return nil
479}
480
khenaidoo79232702018-12-04 11:00:41 -0500481func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500482 sc.lockTopicToConsumerChannelMap.Lock()
483 defer sc.lockTopicToConsumerChannelMap.Unlock()
484 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
485 consumerCh.channels = append(consumerCh.channels, ch)
486 return
487 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500488 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
489}
490
491//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
492func closeConsumers(consumers []interface{}) error {
493 var err error
494 for _, consumer := range consumers {
495 // Is it a partition consumers?
496 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
497 if errTemp := partionConsumer.Close(); errTemp != nil {
498 log.Debugw("partition!!!", log.Fields{"err": errTemp})
499 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
500 // This can occur on race condition
501 err = nil
502 } else {
503 err = errTemp
504 }
505 }
506 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
507 if errTemp := groupConsumer.Close(); errTemp != nil {
508 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
509 // This can occur on race condition
510 err = nil
511 } else {
512 err = errTemp
513 }
514 }
515 }
516 }
517 return err
khenaidoo43c82122018-11-22 18:38:28 -0500518}
519
khenaidoo79232702018-12-04 11:00:41 -0500520func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500521 sc.lockTopicToConsumerChannelMap.Lock()
522 defer sc.lockTopicToConsumerChannelMap.Unlock()
523 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
524 // Channel will be closed in the removeChannel method
525 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500526 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500527 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500528 log.Debugw("closing-consumers", log.Fields{"topic": topic})
529 err := closeConsumers(consumerCh.consumers)
530 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500531 delete(sc.topicToConsumerChannelMap, topic.Name)
532 return err
533 }
534 return nil
535 }
536 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
537 return errors.New("topic-does-not-exist")
538}
539
540func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
541 sc.lockTopicToConsumerChannelMap.Lock()
542 defer sc.lockTopicToConsumerChannelMap.Unlock()
543 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
544 for _, ch := range consumerCh.channels {
545 // Channel will be closed in the removeChannel method
546 removeChannel(consumerCh.channels, ch)
547 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500548 err := closeConsumers(consumerCh.consumers)
549 //if err == sarama.ErrUnknownTopicOrPartition {
550 // // Not an error
551 // err = nil
552 //}
553 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500554 delete(sc.topicToConsumerChannelMap, topic.Name)
555 return err
556 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500557 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
558 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500559}
560
561func (sc *SaramaClient) clearConsumerChannelMap() error {
562 sc.lockTopicToConsumerChannelMap.Lock()
563 defer sc.lockTopicToConsumerChannelMap.Unlock()
564 var err error
565 for topic, consumerCh := range sc.topicToConsumerChannelMap {
566 for _, ch := range consumerCh.channels {
567 // Channel will be closed in the removeChannel method
568 removeChannel(consumerCh.channels, ch)
569 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500570 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
571 err = errTemp
572 }
573 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500574 delete(sc.topicToConsumerChannelMap, topic)
575 }
576 return err
577}
578
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500579//createPublisher creates the publisher which is used to send a message onto kafka
580func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500581 // This Creates the publisher
582 config := sarama.NewConfig()
583 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500584 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
585 config.Producer.Flush.Messages = sc.producerFlushMessages
586 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
587 config.Producer.Return.Errors = sc.producerReturnErrors
588 config.Producer.Return.Successes = sc.producerReturnSuccess
589 //config.Producer.RequiredAcks = sarama.WaitForAll
590 config.Producer.RequiredAcks = sarama.WaitForLocal
591
khenaidoo43c82122018-11-22 18:38:28 -0500592 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
593 brokers := []string{kafkaFullAddr}
594
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500595 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
596 log.Errorw("error-starting-publisher", log.Fields{"error": err})
597 return err
598 } else {
599 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500600 }
601 log.Info("Kafka-publisher-created")
602 return nil
603}
604
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500605func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500606 config := sarama.NewConfig()
607 config.Consumer.Return.Errors = true
608 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500609 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
610 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500611 config.Consumer.Offsets.Initial = sarama.OffsetNewest
612 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
613 brokers := []string{kafkaFullAddr}
614
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500615 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
616 log.Errorw("error-starting-consumers", log.Fields{"error": err})
617 return err
618 } else {
619 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500620 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500621 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500622 return nil
623}
624
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500625// createGroupConsumer creates a consumers group
khenaidoo43c82122018-11-22 18:38:28 -0500626func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
627 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500628 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500629 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500630 //config.Consumer.Return.Errors = true
631 //config.Group.Return.Notifications = false
632 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
633 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500634 config.Consumer.Offsets.Initial = sarama.OffsetNewest
635 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
636 brokers := []string{kafkaFullAddr}
637
638 if groupId == nil {
639 g := DefaultGroupName
640 groupId = &g
641 }
642 topics := []string{topic.Name}
643 var consumer *scc.Consumer
644 var err error
645
khenaidoo43c82122018-11-22 18:38:28 -0500646 if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500647 log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500648 return nil, err
649 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500650 log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500651 //time.Sleep(10*time.Second)
652 sc.groupConsumer = consumer
653 return consumer, nil
654}
655
khenaidoo43c82122018-11-22 18:38:28 -0500656// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
657// topic via the unique channel each subsciber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500658func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500659 // Need to go over all channels and publish messages to them - do we need to copy msg?
660 sc.lockTopicToConsumerChannelMap.Lock()
661 defer sc.lockTopicToConsumerChannelMap.Unlock()
662 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500663 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500664 c <- protoMessage
665 }(ch)
666 }
667}
668
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500669func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
670 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500671startloop:
672 for {
673 select {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500674 case err := <-consumer.Errors():
675 if err != nil {
676 log.Warnw("partition-consumers-error", log.Fields{"error": err})
677 } else {
678 // There is a race condition when this loop is stopped and the consumer is closed where
679 // the actual error comes as nil
680 log.Warn("partition-consumers-error")
681 }
682 case msg := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500683 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500684 if msg == nil {
685 // There is a race condition when this loop is stopped and the consumer is closed where
686 // the actual msg comes as nil
687 break startloop
688 }
689 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500690 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500691 if err := proto.Unmarshal(msgBody, icm); err != nil {
692 log.Warnw("partition-invalid-message", log.Fields{"error": err})
693 continue
694 }
695 go sc.dispatchToConsumers(consumerChnls, icm)
696 case <-sc.doneCh:
697 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
698 break startloop
699 }
700 }
701 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
702}
703
704func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
705 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
706
707startloop:
708 for {
709 select {
710 case err := <-consumer.Errors():
711 if err != nil {
712 log.Warnw("group-consumers-error", log.Fields{"error": err})
713 } else {
714 // There is a race condition when this loop is stopped and the consumer is closed where
715 // the actual error comes as nil
716 log.Warn("group-consumers-error")
717 }
718 case msg := <-consumer.Messages():
719 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
720 if msg == nil {
721 // There is a race condition when this loop is stopped and the consumer is closed where
722 // the actual msg comes as nil
723 break startloop
724 }
khenaidoo43c82122018-11-22 18:38:28 -0500725 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500726 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500727 if err := proto.Unmarshal(msgBody, icm); err != nil {
728 log.Warnw("invalid-message", log.Fields{"error": err})
729 continue
730 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500731 go sc.dispatchToConsumers(consumerChnls, icm)
732 consumer.MarkOffset(msg, "")
733 case ntf := <-consumer.Notifications():
734 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500735 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500736 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500737 break startloop
738 }
739 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500740 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500741}
742
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500743func (sc *SaramaClient) startConsumers(topic *Topic) error {
744 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
745 var consumerCh *consumerChannels
746 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
747 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
748 return errors.New("consumers-not-exist")
749 }
750 // For each consumer listening for that topic, start a consumption loop
751 for _, consumer := range consumerCh.consumers {
752 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
753 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
754 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
755 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
756 } else {
757 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
758 return errors.New("invalid-consumer")
759 }
760 }
761 return nil
762}
763
764//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
765//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500766func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500767 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500768 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500769
770 if pConsumers, err = sc.createPartionConsumers(topic, initialOffset); err != nil {
771 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500772 return nil, err
773 }
774
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500775 consumersIf := make([]interface{}, 0)
776 for _, pConsumer := range pConsumers {
777 consumersIf = append(consumersIf, pConsumer)
778 }
779
780 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500781 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500782 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500783 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500784 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500785 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500786 }
787
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500788 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500789 sc.addTopicToConsumerChannelMap(topic.Name, cc)
790
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500791 //Start a consumers to listen on that specific topic
792 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500793
794 return consumerListeningChannel, nil
795}
796
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500797// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
798// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500799func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500800 // TODO: Replace this development partition consumers with a group consumers
801 var pConsumer *scc.Consumer
802 var err error
803 if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
804 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
805 return nil, err
806 }
807 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
808 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500809 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500810 cc := &consumerChannels{
811 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500812 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500813 }
814
815 // Add the consumers channel to the map
816 sc.addTopicToConsumerChannelMap(topic.Name, cc)
817
818 //Start a consumers to listen on that specific topic
819 go sc.startConsumers(topic)
820
821 return consumerListeningChannel, nil
822}
823
824func (sc *SaramaClient) createPartionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
825 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500826 partitionList, err := sc.consumer.Partitions(topic.Name)
827 if err != nil {
828 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
829 return nil, err
830 }
831
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500832 pConsumers := make([]sarama.PartitionConsumer, 0)
833 for _, partition := range partitionList {
834 var pConsumer sarama.PartitionConsumer
835 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
836 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
837 return nil, err
838 }
839 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500840 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500841 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500842}
843
khenaidoo79232702018-12-04 11:00:41 -0500844func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500845 var i int
khenaidoo79232702018-12-04 11:00:41 -0500846 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500847 for i, channel = range channels {
848 if channel == ch {
849 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
850 close(channel)
851 return channels[:len(channels)-1]
852 }
853 }
854 return channels
855}