blob: 73025d97898f8a9da3bb231eeb404eedef4b00a3 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Paulc594bb32019-11-06 07:34:27 +000019 "context"
khenaidoo43c82122018-11-22 18:38:28 -050020 "errors"
21 "fmt"
Scott Bakerf2596722019-09-27 12:39:56 -070022 "github.com/Shopify/sarama"
khenaidoo43c82122018-11-22 18:38:28 -050023 scc "github.com/bsm/sarama-cluster"
Devmalya Paulc594bb32019-11-06 07:34:27 +000024 "github.com/eapache/go-resiliency/breaker"
khenaidoo43c82122018-11-22 18:38:28 -050025 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "github.com/google/uuid"
Scott Baker807addd2019-10-24 15:16:21 -070027 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Baker555307d2019-11-04 08:58:01 -080028 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Bakerf2596722019-09-27 12:39:56 -070029 "strings"
30 "sync"
31 "time"
khenaidoo43c82122018-11-22 18:38:28 -050032)
33
khenaidoo4c1a5bf2018-11-29 15:53:42 -050034func init() {
khenaidooca301322019-01-09 23:06:32 -050035 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050036}
37
38type returnErrorFunction func() error
39
40// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
41// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
42//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050043type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050044 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050045 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050046}
47
48// SaramaClient represents the messaging proxy
49type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050050 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050051 client sarama.Client
52 KafkaHost string
53 KafkaPort int
54 producer sarama.AsyncProducer
55 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050056 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040057 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050058 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050059 consumerType int
khenaidooca301322019-01-09 23:06:32 -050060 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050061 producerFlushFrequency int
62 producerFlushMessages int
63 producerFlushMaxmessages int
64 producerRetryMax int
65 producerRetryBackOff time.Duration
66 producerReturnSuccess bool
67 producerReturnErrors bool
68 consumerMaxwait int
69 maxProcessingTime int
70 numPartitions int
71 numReplicas int
72 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050073 doneCh chan int
74 topicToConsumerChannelMap map[string]*consumerChannels
75 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050076 topicLockMap map[string]*sync.RWMutex
77 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053078 metadataMaxRetry int
Scott Bakeree6a0872019-10-29 15:59:52 -070079 alive bool
80 liveness chan bool
81 livenessChannelInterval time.Duration
82 lastLivenessTime time.Time
83 started bool
khenaidoo43c82122018-11-22 18:38:28 -050084}
85
86type SaramaClientOption func(*SaramaClient)
87
88func Host(host string) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaHost = host
91 }
92}
93
94func Port(port int) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.KafkaPort = port
97 }
98}
99
khenaidooca301322019-01-09 23:06:32 -0500100func ConsumerGroupPrefix(prefix string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupPrefix = prefix
103 }
104}
105
106func ConsumerGroupName(name string) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerGroupName = name
109 }
110}
111
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500112func ConsumerType(consumer int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.consumerType = consumer
115 }
116}
117
118func ProducerFlushFrequency(frequency int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushFrequency = frequency
121 }
122}
123
124func ProducerFlushMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMessages = num
127 }
128}
129
130func ProducerFlushMaxMessages(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerFlushMaxmessages = num
133 }
134}
135
khenaidoo90847922018-12-03 14:47:51 -0500136func ProducerMaxRetries(num int) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryMax = num
139 }
140}
141
142func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerRetryBackOff = duration
145 }
146}
147
148func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500149 return func(args *SaramaClient) {
150 args.producerReturnErrors = opt
151 }
152}
153
khenaidoo90847922018-12-03 14:47:51 -0500154func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500155 return func(args *SaramaClient) {
156 args.producerReturnSuccess = opt
157 }
158}
159
160func ConsumerMaxWait(wait int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.consumerMaxwait = wait
163 }
164}
165
166func MaxProcessingTime(pTime int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.maxProcessingTime = pTime
169 }
170}
171
172func NumPartitions(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numPartitions = number
175 }
176}
177
178func NumReplicas(number int) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.numReplicas = number
181 }
182}
183
184func AutoCreateTopic(opt bool) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.autoCreateTopic = opt
187 }
188}
189
Abhilash S.L294ff522019-06-26 18:14:33 +0530190func MetadatMaxRetries(retry int) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.metadataMaxRetry = retry
193 }
194}
195
Scott Bakeree6a0872019-10-29 15:59:52 -0700196func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
197 return func(args *SaramaClient) {
198 args.livenessChannelInterval = opt
199 }
200}
201
khenaidoo43c82122018-11-22 18:38:28 -0500202func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
203 client := &SaramaClient{
204 KafkaHost: DefaultKafkaHost,
205 KafkaPort: DefaultKafkaPort,
206 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500207 client.consumerType = DefaultConsumerType
208 client.producerFlushFrequency = DefaultProducerFlushFrequency
209 client.producerFlushMessages = DefaultProducerFlushMessages
210 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
211 client.producerReturnErrors = DefaultProducerReturnErrors
212 client.producerReturnSuccess = DefaultProducerReturnSuccess
213 client.producerRetryMax = DefaultProducerRetryMax
214 client.producerRetryBackOff = DefaultProducerRetryBackoff
215 client.consumerMaxwait = DefaultConsumerMaxwait
216 client.maxProcessingTime = DefaultMaxProcessingTime
217 client.numPartitions = DefaultNumberPartitions
218 client.numReplicas = DefaultNumberReplicas
219 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530220 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Bakeree6a0872019-10-29 15:59:52 -0700221 client.livenessChannelInterval = DefaultLivenessChannelInterval
khenaidoo43c82122018-11-22 18:38:28 -0500222
223 for _, option := range opts {
224 option(client)
225 }
226
khenaidooca301322019-01-09 23:06:32 -0500227 client.groupConsumers = make(map[string]*scc.Consumer)
228
khenaidoo43c82122018-11-22 18:38:28 -0500229 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500230 client.topicLockMap = make(map[string]*sync.RWMutex)
231 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500232 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Bakeree6a0872019-10-29 15:59:52 -0700233
234 // alive until proven otherwise
235 client.alive = true
236
khenaidoo43c82122018-11-22 18:38:28 -0500237 return client
238}
239
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500240func (sc *SaramaClient) Start() error {
241 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500242
243 // Create the Done channel
244 sc.doneCh = make(chan int, 1)
245
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500246 var err error
247
khenaidoob3244212019-08-27 14:32:27 -0400248 // Add a cleanup in case of failure to startup
249 defer func() {
250 if err != nil {
251 sc.Stop()
252 }
253 }()
254
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500255 // Create the Cluster Admin
256 if err = sc.createClusterAdmin(); err != nil {
257 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
258 return err
259 }
260
khenaidoo43c82122018-11-22 18:38:28 -0500261 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500262 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500263 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
264 return err
265 }
266
khenaidooca301322019-01-09 23:06:32 -0500267 if sc.consumerType == DefaultConsumerType {
268 // Create the master consumers
269 if err := sc.createConsumer(); err != nil {
270 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
271 return err
272 }
khenaidoo43c82122018-11-22 18:38:28 -0500273 }
274
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500275 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500276 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
277
khenaidooca301322019-01-09 23:06:32 -0500278 log.Info("kafka-sarama-client-started")
279
Scott Bakeree6a0872019-10-29 15:59:52 -0700280 sc.started = true
281
khenaidoo43c82122018-11-22 18:38:28 -0500282 return nil
283}
284
285func (sc *SaramaClient) Stop() {
286 log.Info("stopping-sarama-client")
287
Scott Bakeree6a0872019-10-29 15:59:52 -0700288 sc.started = false
289
khenaidoo43c82122018-11-22 18:38:28 -0500290 //Send a message over the done channel to close all long running routines
291 sc.doneCh <- 1
292
khenaidoo43c82122018-11-22 18:38:28 -0500293 if sc.producer != nil {
294 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500295 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500296 }
297 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500298
khenaidoo43c82122018-11-22 18:38:28 -0500299 if sc.consumer != nil {
300 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500301 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500302 }
303 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500304
khenaidooca301322019-01-09 23:06:32 -0500305 for key, val := range sc.groupConsumers {
306 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
307 if err := val.Close(); err != nil {
308 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500309 }
310 }
311
312 if sc.cAdmin != nil {
313 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500314 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500315 }
316 }
317
318 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500319 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500320
321 log.Info("sarama-client-stopped")
322}
323
khenaidooca301322019-01-09 23:06:32 -0500324//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
325// the invoking function must hold the lock
326func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500327 // Set the topic details
328 topicDetail := &sarama.TopicDetail{}
329 topicDetail.NumPartitions = int32(numPartition)
330 topicDetail.ReplicationFactor = int16(repFactor)
331 topicDetail.ConfigEntries = make(map[string]*string)
332 topicDetails := make(map[string]*sarama.TopicDetail)
333 topicDetails[topic.Name] = topicDetail
334
335 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
336 if err == sarama.ErrTopicAlreadyExists {
337 // Not an error
338 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
339 return nil
340 }
341 log.Errorw("create-topic-failure", log.Fields{"error": err})
342 return err
343 }
344 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
345 // do so.
346 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
347 return nil
348}
349
khenaidooca301322019-01-09 23:06:32 -0500350//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
351// ensure no two go routines are performing operations on the same topic
352func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
353 sc.lockTopic(topic)
354 defer sc.unLockTopic(topic)
355
356 return sc.createTopic(topic, numPartition, repFactor)
357}
358
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500359//DeleteTopic removes a topic from the kafka Broker
360func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500361 sc.lockTopic(topic)
362 defer sc.unLockTopic(topic)
363
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500364 // Remove the topic from the broker
365 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
366 if err == sarama.ErrUnknownTopicOrPartition {
367 // Not an error as does not exist
368 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
369 return nil
370 }
371 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
372 return err
373 }
374
375 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
376 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
377 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
378 return err
379 }
380 return nil
381}
382
383// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
384// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500385func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500386 sc.lockTopic(topic)
387 defer sc.unLockTopic(topic)
388
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500389 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
390
391 // If a consumers already exist for that topic then resuse it
392 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
393 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
394 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500395 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500396 sc.addChannelToConsumerChannelMap(topic, ch)
397 return ch, nil
398 }
399
400 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500401 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500402 var err error
403
404 // Use the consumerType option to figure out the type of consumer to launch
405 if sc.consumerType == PartitionConsumer {
406 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500407 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500408 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
409 return nil, err
410 }
411 }
khenaidoo731697e2019-01-29 16:03:29 -0500412 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500413 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
414 return nil, err
415 }
416 } else if sc.consumerType == GroupCustomer {
417 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
418 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500419 //if sc.autoCreateTopic {
420 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
421 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
422 // return nil, err
423 // }
424 //}
425 //groupId := sc.consumerGroupName
426 groupId := getGroupId(kvArgs...)
427 // Include the group prefix
428 if groupId != "" {
429 groupId = sc.consumerGroupPrefix + groupId
430 } else {
431 // Need to use a unique group Id per topic
432 groupId = sc.consumerGroupPrefix + topic.Name
433 }
khenaidoo731697e2019-01-29 16:03:29 -0500434 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500435 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500436 return nil, err
437 }
khenaidooca301322019-01-09 23:06:32 -0500438
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500439 } else {
440 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
441 return nil, errors.New("unknown-consumer-type")
442 }
443
444 return consumerListeningChannel, nil
445}
446
447//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500448func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500449 sc.lockTopic(topic)
450 defer sc.unLockTopic(topic)
451
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500452 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500453 var err error
454 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
455 log.Errorw("failed-removing-channel", log.Fields{"error": err})
456 }
457 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
458 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
459 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500460 return err
461}
462
Scott Bakeree6a0872019-10-29 15:59:52 -0700463func (sc *SaramaClient) updateLiveness(alive bool) {
464 // Post a consistent stream of liveness data to the channel,
465 // so that in a live state, the core does not timeout and
466 // send a forced liveness message. Production of liveness
467 // events to the channel is rate-limited by livenessChannelInterval.
468 if sc.liveness != nil {
469 if sc.alive != alive {
470 log.Info("update-liveness-channel-because-change")
471 sc.liveness <- alive
472 sc.lastLivenessTime = time.Now()
473 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
474 log.Info("update-liveness-channel-because-interval")
475 sc.liveness <- alive
476 sc.lastLivenessTime = time.Now()
477 }
478 }
479
480 // Only emit a log message when the state changes
481 if sc.alive != alive {
482 log.Info("set-client-alive", log.Fields{"alive": alive})
483 sc.alive = alive
484 }
485}
486
Devmalya Paulc594bb32019-11-06 07:34:27 +0000487func (sc *SaramaClient) isLivenessError(err error) bool {
488 // Sarama producers and consumers encapsulate the error inside
489 // a ProducerError or ConsumerError struct.
490 if prodError, ok := err.(*sarama.ProducerError); ok {
491 err = prodError.Err
492 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
493 err = consumerError.Err
494 }
495
496 // Sarama-Cluster will compose the error into a ClusterError struct,
497 // which we can't do a compare by reference. To handle that, we the
498 // best we can do is compare the error strings.
499
500 switch err.Error() {
501 case context.DeadlineExceeded.Error():
502 log.Info("is-liveness-error-timeout")
503 return true
504 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
505 log.Info("is-liveness-error-no-brokers")
506 return true
507 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
508 log.Info("is-liveness-error-shutting-down")
509 return true
510 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
511 log.Info("is-liveness-error-not-available")
512 return true
513 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
514 log.Info("is-liveness-error-circuit-breaker-open")
515 return true
516 }
517
518 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
519 log.Info("is-liveness-error-connection-refused")
520 return true
521 }
522
523 // Other errors shouldn't trigger a loss of liveness
524
525 log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
526
527 return false
528}
529
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500530// send formats and sends the request onto the kafka messaging bus.
531func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
532
533 // Assert message is a proto message
534 var protoMsg proto.Message
535 var ok bool
536 // ascertain the value interface type is a proto.Message
537 if protoMsg, ok = msg.(proto.Message); !ok {
538 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
539 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
540 }
541
542 var marshalled []byte
543 var err error
544 // Create the Sarama producer message
545 if marshalled, err = proto.Marshal(protoMsg); err != nil {
546 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
547 return err
548 }
549 key := ""
550 if len(keys) > 0 {
551 key = keys[0] // Only the first key is relevant
552 }
553 kafkaMsg := &sarama.ProducerMessage{
554 Topic: topic.Name,
555 Key: sarama.StringEncoder(key),
556 Value: sarama.ByteEncoder(marshalled),
557 }
558
559 // Send message to kafka
560 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500561 // Wait for result
562 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
563 select {
564 case ok := <-sc.producer.Successes():
khenaidoo297cd252019-02-07 22:10:23 -0500565 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Bakeree6a0872019-10-29 15:59:52 -0700566 sc.updateLiveness(true)
khenaidoo90847922018-12-03 14:47:51 -0500567 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500568 log.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000569 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700570 sc.updateLiveness(false)
571 }
572 return notOk
573 }
574 return nil
575}
576
577// Enable the liveness monitor channel. This channel will report
578// a "true" or "false" on every publish, which indicates whether
579// or not the channel is still live. This channel is then picked up
580// by the service (i.e. rw_core / ro_core) to update readiness status
581// and/or take other actions.
582func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
583 log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
584 if enable {
585 if sc.liveness == nil {
586 log.Info("kafka-create-liveness-channel")
587 // At least 1, so we can immediately post to it without blocking
588 // Setting a bigger number (10) allows the monitor to fall behind
589 // without blocking others. The monitor shouldn't really fall
590 // behind...
591 sc.liveness = make(chan bool, 10)
592 // post intial state to the channel
593 sc.liveness <- sc.alive
594 }
595 } else {
596 // TODO: Think about whether we need the ability to turn off
597 // liveness monitoring
598 panic("Turning off liveness reporting is not supported")
599 }
600 return sc.liveness
601}
602
603// send an empty message on the liveness channel to check whether connectivity has
604// been restored.
605func (sc *SaramaClient) SendLiveness() error {
606 if !sc.started {
607 return fmt.Errorf("SendLiveness() called while not started")
608 }
609
610 kafkaMsg := &sarama.ProducerMessage{
611 Topic: "_liveness_test",
612 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
613 }
614
615 // Send message to kafka
616 sc.producer.Input() <- kafkaMsg
617 // Wait for result
618 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
619 select {
620 case ok := <-sc.producer.Successes():
621 log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
622 sc.updateLiveness(true)
623 case notOk := <-sc.producer.Errors():
624 log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000625 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700626 sc.updateLiveness(false)
627 }
khenaidoo90847922018-12-03 14:47:51 -0500628 return notOk
629 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500630 return nil
631}
632
khenaidooca301322019-01-09 23:06:32 -0500633// getGroupId returns the group id from the key-value args.
634func getGroupId(kvArgs ...*KVArg) string {
635 for _, arg := range kvArgs {
636 if arg.Key == GroupIdKey {
637 return arg.Value.(string)
638 }
639 }
640 return ""
641}
642
khenaidoo731697e2019-01-29 16:03:29 -0500643// getOffset returns the offset from the key-value args.
644func getOffset(kvArgs ...*KVArg) int64 {
645 for _, arg := range kvArgs {
646 if arg.Key == Offset {
647 return arg.Value.(int64)
648 }
649 }
650 return sarama.OffsetNewest
651}
652
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500653func (sc *SaramaClient) createClusterAdmin() error {
654 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
655 config := sarama.NewConfig()
656 config.Version = sarama.V1_0_0_0
657
658 // Create a cluster Admin
659 var cAdmin sarama.ClusterAdmin
660 var err error
661 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
662 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
663 return err
664 }
665 sc.cAdmin = cAdmin
666 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500667}
668
khenaidood2b6df92018-12-13 16:37:20 -0500669func (sc *SaramaClient) lockTopic(topic *Topic) {
670 sc.lockOfTopicLockMap.Lock()
671 if _, exist := sc.topicLockMap[topic.Name]; exist {
672 sc.lockOfTopicLockMap.Unlock()
673 sc.topicLockMap[topic.Name].Lock()
674 } else {
675 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
676 sc.lockOfTopicLockMap.Unlock()
677 sc.topicLockMap[topic.Name].Lock()
678 }
679}
680
681func (sc *SaramaClient) unLockTopic(topic *Topic) {
682 sc.lockOfTopicLockMap.Lock()
683 defer sc.lockOfTopicLockMap.Unlock()
684 if _, exist := sc.topicLockMap[topic.Name]; exist {
685 sc.topicLockMap[topic.Name].Unlock()
686 }
687}
688
khenaidoo43c82122018-11-22 18:38:28 -0500689func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
690 sc.lockTopicToConsumerChannelMap.Lock()
691 defer sc.lockTopicToConsumerChannelMap.Unlock()
692 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
693 sc.topicToConsumerChannelMap[id] = arg
694 }
695}
696
697func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
698 sc.lockTopicToConsumerChannelMap.Lock()
699 defer sc.lockTopicToConsumerChannelMap.Unlock()
700 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
701 delete(sc.topicToConsumerChannelMap, id)
702 }
703}
704
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500705func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400706 sc.lockTopicToConsumerChannelMap.RLock()
707 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500708
709 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
710 return consumerCh
711 }
712 return nil
713}
714
khenaidoo79232702018-12-04 11:00:41 -0500715func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500716 sc.lockTopicToConsumerChannelMap.Lock()
717 defer sc.lockTopicToConsumerChannelMap.Unlock()
718 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
719 consumerCh.channels = append(consumerCh.channels, ch)
720 return
721 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500722 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
723}
724
725//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
726func closeConsumers(consumers []interface{}) error {
727 var err error
728 for _, consumer := range consumers {
729 // Is it a partition consumers?
730 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
731 if errTemp := partionConsumer.Close(); errTemp != nil {
732 log.Debugw("partition!!!", log.Fields{"err": errTemp})
733 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
734 // This can occur on race condition
735 err = nil
736 } else {
737 err = errTemp
738 }
739 }
740 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
741 if errTemp := groupConsumer.Close(); errTemp != nil {
742 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
743 // This can occur on race condition
744 err = nil
745 } else {
746 err = errTemp
747 }
748 }
749 }
750 }
751 return err
khenaidoo43c82122018-11-22 18:38:28 -0500752}
753
khenaidoo79232702018-12-04 11:00:41 -0500754func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500755 sc.lockTopicToConsumerChannelMap.Lock()
756 defer sc.lockTopicToConsumerChannelMap.Unlock()
757 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
758 // Channel will be closed in the removeChannel method
759 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500760 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500761 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500762 log.Debugw("closing-consumers", log.Fields{"topic": topic})
763 err := closeConsumers(consumerCh.consumers)
764 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500765 delete(sc.topicToConsumerChannelMap, topic.Name)
766 return err
767 }
768 return nil
769 }
770 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
771 return errors.New("topic-does-not-exist")
772}
773
774func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
775 sc.lockTopicToConsumerChannelMap.Lock()
776 defer sc.lockTopicToConsumerChannelMap.Unlock()
777 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
778 for _, ch := range consumerCh.channels {
779 // Channel will be closed in the removeChannel method
780 removeChannel(consumerCh.channels, ch)
781 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500782 err := closeConsumers(consumerCh.consumers)
783 //if err == sarama.ErrUnknownTopicOrPartition {
784 // // Not an error
785 // err = nil
786 //}
787 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500788 delete(sc.topicToConsumerChannelMap, topic.Name)
789 return err
790 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500791 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
792 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500793}
794
795func (sc *SaramaClient) clearConsumerChannelMap() error {
796 sc.lockTopicToConsumerChannelMap.Lock()
797 defer sc.lockTopicToConsumerChannelMap.Unlock()
798 var err error
799 for topic, consumerCh := range sc.topicToConsumerChannelMap {
800 for _, ch := range consumerCh.channels {
801 // Channel will be closed in the removeChannel method
802 removeChannel(consumerCh.channels, ch)
803 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500804 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
805 err = errTemp
806 }
807 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500808 delete(sc.topicToConsumerChannelMap, topic)
809 }
810 return err
811}
812
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500813//createPublisher creates the publisher which is used to send a message onto kafka
814func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500815 // This Creates the publisher
816 config := sarama.NewConfig()
817 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500818 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
819 config.Producer.Flush.Messages = sc.producerFlushMessages
820 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
821 config.Producer.Return.Errors = sc.producerReturnErrors
822 config.Producer.Return.Successes = sc.producerReturnSuccess
823 //config.Producer.RequiredAcks = sarama.WaitForAll
824 config.Producer.RequiredAcks = sarama.WaitForLocal
825
khenaidoo43c82122018-11-22 18:38:28 -0500826 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
827 brokers := []string{kafkaFullAddr}
828
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500829 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
830 log.Errorw("error-starting-publisher", log.Fields{"error": err})
831 return err
832 } else {
833 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500834 }
835 log.Info("Kafka-publisher-created")
836 return nil
837}
838
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500839func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500840 config := sarama.NewConfig()
841 config.Consumer.Return.Errors = true
842 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500843 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
844 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500845 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530846 config.Metadata.Retry.Max = sc.metadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500847 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
848 brokers := []string{kafkaFullAddr}
849
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500850 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
851 log.Errorw("error-starting-consumers", log.Fields{"error": err})
852 return err
853 } else {
854 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500855 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500856 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500857 return nil
858}
859
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500860// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500861func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500862 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500863 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500864 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Bakeree6a0872019-10-29 15:59:52 -0700865 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
866 config.Consumer.Return.Errors = true
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500867 //config.Group.Return.Notifications = false
868 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
869 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500870 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500871 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500872 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
873 brokers := []string{kafkaFullAddr}
874
khenaidoo43c82122018-11-22 18:38:28 -0500875 topics := []string{topic.Name}
876 var consumer *scc.Consumer
877 var err error
878
khenaidooca301322019-01-09 23:06:32 -0500879 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
880 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500881 return nil, err
882 }
khenaidooca301322019-01-09 23:06:32 -0500883 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500884
885 //sc.groupConsumers[topic.Name] = consumer
886 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500887 return consumer, nil
888}
889
khenaidoo43c82122018-11-22 18:38:28 -0500890// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500891// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500892func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500893 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400894 sc.lockTopicToConsumerChannelMap.RLock()
895 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500896 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500897 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500898 c <- protoMessage
899 }(ch)
900 }
901}
902
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500903func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
904 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500905startloop:
906 for {
907 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500908 case err, ok := <-consumer.Errors():
909 if ok {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500910 log.Warnw("partition-consumers-error", log.Fields{"error": err})
911 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500912 // Channel is closed
913 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500914 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500915 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500916 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500917 if !ok {
918 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500919 break startloop
920 }
921 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500922 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500923 if err := proto.Unmarshal(msgBody, icm); err != nil {
924 log.Warnw("partition-invalid-message", log.Fields{"error": err})
925 continue
926 }
927 go sc.dispatchToConsumers(consumerChnls, icm)
928 case <-sc.doneCh:
929 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
930 break startloop
931 }
932 }
933 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
934}
935
936func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
937 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
938
939startloop:
940 for {
941 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500942 case err, ok := <-consumer.Errors():
943 if ok {
Devmalya Paulc594bb32019-11-06 07:34:27 +0000944 if sc.isLivenessError(err) {
945 sc.updateLiveness(false)
946 }
khenaidooca301322019-01-09 23:06:32 -0500947 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500948 } else {
Scott Bakeree6a0872019-10-29 15:59:52 -0700949 log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500950 // channel is closed
951 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500952 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500953 case msg, ok := <-consumer.Messages():
954 if !ok {
Scott Bakeree6a0872019-10-29 15:59:52 -0700955 log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500956 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500957 break startloop
958 }
Scott Bakeree6a0872019-10-29 15:59:52 -0700959 sc.updateLiveness(true)
khenaidoo297cd252019-02-07 22:10:23 -0500960 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500961 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500962 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500963 if err := proto.Unmarshal(msgBody, icm); err != nil {
964 log.Warnw("invalid-message", log.Fields{"error": err})
965 continue
966 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500967 go sc.dispatchToConsumers(consumerChnls, icm)
968 consumer.MarkOffset(msg, "")
969 case ntf := <-consumer.Notifications():
970 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500971 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500972 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500973 break startloop
974 }
975 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500976 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500977}
978
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500979func (sc *SaramaClient) startConsumers(topic *Topic) error {
980 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
981 var consumerCh *consumerChannels
982 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
983 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
984 return errors.New("consumers-not-exist")
985 }
986 // For each consumer listening for that topic, start a consumption loop
987 for _, consumer := range consumerCh.consumers {
988 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
989 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
990 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
991 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
992 } else {
993 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
994 return errors.New("invalid-consumer")
995 }
996 }
997 return nil
998}
999
1000//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1001//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -05001002func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001003 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -05001004 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001005
khenaidoo7ff26c72019-01-16 14:55:48 -05001006 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001007 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001008 return nil, err
1009 }
1010
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001011 consumersIf := make([]interface{}, 0)
1012 for _, pConsumer := range pConsumers {
1013 consumersIf = append(consumersIf, pConsumer)
1014 }
1015
1016 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -05001017 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001018 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -05001019 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001020 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -05001021 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -05001022 }
1023
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001024 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -05001025 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1026
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001027 //Start a consumers to listen on that specific topic
1028 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -05001029
1030 return consumerListeningChannel, nil
1031}
1032
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001033// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1034// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -05001035func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001036 // TODO: Replace this development partition consumers with a group consumers
1037 var pConsumer *scc.Consumer
1038 var err error
khenaidoo731697e2019-01-29 16:03:29 -05001039 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001040 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1041 return nil, err
1042 }
1043 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1044 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001045 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001046 cc := &consumerChannels{
1047 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -05001048 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001049 }
1050
1051 // Add the consumers channel to the map
1052 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1053
1054 //Start a consumers to listen on that specific topic
1055 go sc.startConsumers(topic)
1056
1057 return consumerListeningChannel, nil
1058}
1059
khenaidoo7ff26c72019-01-16 14:55:48 -05001060func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001061 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001062 partitionList, err := sc.consumer.Partitions(topic.Name)
1063 if err != nil {
1064 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1065 return nil, err
1066 }
1067
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001068 pConsumers := make([]sarama.PartitionConsumer, 0)
1069 for _, partition := range partitionList {
1070 var pConsumer sarama.PartitionConsumer
1071 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1072 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1073 return nil, err
1074 }
1075 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -05001076 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001077 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -05001078}
1079
khenaidoo79232702018-12-04 11:00:41 -05001080func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -05001081 var i int
khenaidoo79232702018-12-04 11:00:41 -05001082 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -05001083 for i, channel = range channels {
1084 if channel == ch {
1085 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1086 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -05001087 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -05001088 return channels[:len(channels)-1]
1089 }
1090 }
1091 return channels
1092}
khenaidoo7ff26c72019-01-16 14:55:48 -05001093
khenaidoo7ff26c72019-01-16 14:55:48 -05001094func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1095 sc.lockOfGroupConsumers.Lock()
1096 defer sc.lockOfGroupConsumers.Unlock()
1097 if _, exist := sc.groupConsumers[topic]; !exist {
1098 sc.groupConsumers[topic] = consumer
1099 }
1100}
1101
1102func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1103 sc.lockOfGroupConsumers.Lock()
1104 defer sc.lockOfGroupConsumers.Unlock()
1105 if _, exist := sc.groupConsumers[topic]; exist {
1106 consumer := sc.groupConsumers[topic]
1107 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -04001108 if err := consumer.Close(); err != nil {
khenaidoo7ff26c72019-01-16 14:55:48 -05001109 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
1110 return err
1111 }
1112 }
1113 return nil
1114}