blob: ff521a734f3888ffd1efc919c06d6b6520da2755 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Paulc594bb32019-11-06 07:34:27 +000019 "context"
khenaidoo43c82122018-11-22 18:38:28 -050020 "errors"
21 "fmt"
Scott Bakerf2596722019-09-27 12:39:56 -070022 "github.com/Shopify/sarama"
khenaidoo43c82122018-11-22 18:38:28 -050023 scc "github.com/bsm/sarama-cluster"
Devmalya Paulc594bb32019-11-06 07:34:27 +000024 "github.com/eapache/go-resiliency/breaker"
khenaidoo43c82122018-11-22 18:38:28 -050025 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "github.com/google/uuid"
Scott Baker807addd2019-10-24 15:16:21 -070027 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Baker555307d2019-11-04 08:58:01 -080028 ic "github.com/opencord/voltha-protos/v2/go/inter_container"
Scott Bakerf2596722019-09-27 12:39:56 -070029 "strings"
30 "sync"
31 "time"
khenaidoo43c82122018-11-22 18:38:28 -050032)
33
khenaidoo4c1a5bf2018-11-29 15:53:42 -050034func init() {
khenaidooca301322019-01-09 23:06:32 -050035 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050036}
37
38type returnErrorFunction func() error
39
40// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
41// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
42//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050043type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050044 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050045 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050046}
47
48// SaramaClient represents the messaging proxy
49type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050050 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050051 client sarama.Client
52 KafkaHost string
53 KafkaPort int
54 producer sarama.AsyncProducer
55 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050056 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040057 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050058 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050059 consumerType int
khenaidooca301322019-01-09 23:06:32 -050060 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050061 producerFlushFrequency int
62 producerFlushMessages int
63 producerFlushMaxmessages int
64 producerRetryMax int
65 producerRetryBackOff time.Duration
66 producerReturnSuccess bool
67 producerReturnErrors bool
68 consumerMaxwait int
69 maxProcessingTime int
70 numPartitions int
71 numReplicas int
72 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050073 doneCh chan int
74 topicToConsumerChannelMap map[string]*consumerChannels
75 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050076 topicLockMap map[string]*sync.RWMutex
77 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053078 metadataMaxRetry int
Scott Bakeree6a0872019-10-29 15:59:52 -070079 alive bool
80 liveness chan bool
81 livenessChannelInterval time.Duration
82 lastLivenessTime time.Time
83 started bool
khenaidoo43c82122018-11-22 18:38:28 -050084}
85
86type SaramaClientOption func(*SaramaClient)
87
88func Host(host string) SaramaClientOption {
89 return func(args *SaramaClient) {
90 args.KafkaHost = host
91 }
92}
93
94func Port(port int) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.KafkaPort = port
97 }
98}
99
khenaidooca301322019-01-09 23:06:32 -0500100func ConsumerGroupPrefix(prefix string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupPrefix = prefix
103 }
104}
105
106func ConsumerGroupName(name string) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerGroupName = name
109 }
110}
111
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500112func ConsumerType(consumer int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.consumerType = consumer
115 }
116}
117
118func ProducerFlushFrequency(frequency int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushFrequency = frequency
121 }
122}
123
124func ProducerFlushMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMessages = num
127 }
128}
129
130func ProducerFlushMaxMessages(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerFlushMaxmessages = num
133 }
134}
135
khenaidoo90847922018-12-03 14:47:51 -0500136func ProducerMaxRetries(num int) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryMax = num
139 }
140}
141
142func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerRetryBackOff = duration
145 }
146}
147
148func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500149 return func(args *SaramaClient) {
150 args.producerReturnErrors = opt
151 }
152}
153
khenaidoo90847922018-12-03 14:47:51 -0500154func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500155 return func(args *SaramaClient) {
156 args.producerReturnSuccess = opt
157 }
158}
159
160func ConsumerMaxWait(wait int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.consumerMaxwait = wait
163 }
164}
165
166func MaxProcessingTime(pTime int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.maxProcessingTime = pTime
169 }
170}
171
172func NumPartitions(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numPartitions = number
175 }
176}
177
178func NumReplicas(number int) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.numReplicas = number
181 }
182}
183
184func AutoCreateTopic(opt bool) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.autoCreateTopic = opt
187 }
188}
189
Abhilash S.L294ff522019-06-26 18:14:33 +0530190func MetadatMaxRetries(retry int) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.metadataMaxRetry = retry
193 }
194}
195
Scott Bakeree6a0872019-10-29 15:59:52 -0700196func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
197 return func(args *SaramaClient) {
198 args.livenessChannelInterval = opt
199 }
200}
201
khenaidoo43c82122018-11-22 18:38:28 -0500202func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
203 client := &SaramaClient{
204 KafkaHost: DefaultKafkaHost,
205 KafkaPort: DefaultKafkaPort,
206 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500207 client.consumerType = DefaultConsumerType
208 client.producerFlushFrequency = DefaultProducerFlushFrequency
209 client.producerFlushMessages = DefaultProducerFlushMessages
210 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
211 client.producerReturnErrors = DefaultProducerReturnErrors
212 client.producerReturnSuccess = DefaultProducerReturnSuccess
213 client.producerRetryMax = DefaultProducerRetryMax
214 client.producerRetryBackOff = DefaultProducerRetryBackoff
215 client.consumerMaxwait = DefaultConsumerMaxwait
216 client.maxProcessingTime = DefaultMaxProcessingTime
217 client.numPartitions = DefaultNumberPartitions
218 client.numReplicas = DefaultNumberReplicas
219 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530220 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Bakeree6a0872019-10-29 15:59:52 -0700221 client.livenessChannelInterval = DefaultLivenessChannelInterval
khenaidoo43c82122018-11-22 18:38:28 -0500222
223 for _, option := range opts {
224 option(client)
225 }
226
khenaidooca301322019-01-09 23:06:32 -0500227 client.groupConsumers = make(map[string]*scc.Consumer)
228
khenaidoo43c82122018-11-22 18:38:28 -0500229 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500230 client.topicLockMap = make(map[string]*sync.RWMutex)
231 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500232 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Bakeree6a0872019-10-29 15:59:52 -0700233
234 // alive until proven otherwise
235 client.alive = true
236
khenaidoo43c82122018-11-22 18:38:28 -0500237 return client
238}
239
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500240func (sc *SaramaClient) Start() error {
241 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500242
243 // Create the Done channel
244 sc.doneCh = make(chan int, 1)
245
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500246 var err error
247
khenaidoob3244212019-08-27 14:32:27 -0400248 // Add a cleanup in case of failure to startup
249 defer func() {
250 if err != nil {
251 sc.Stop()
252 }
253 }()
254
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500255 // Create the Cluster Admin
256 if err = sc.createClusterAdmin(); err != nil {
257 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
258 return err
259 }
260
khenaidoo43c82122018-11-22 18:38:28 -0500261 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500262 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500263 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
264 return err
265 }
266
khenaidooca301322019-01-09 23:06:32 -0500267 if sc.consumerType == DefaultConsumerType {
268 // Create the master consumers
269 if err := sc.createConsumer(); err != nil {
270 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
271 return err
272 }
khenaidoo43c82122018-11-22 18:38:28 -0500273 }
274
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500275 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500276 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
277
khenaidooca301322019-01-09 23:06:32 -0500278 log.Info("kafka-sarama-client-started")
279
Scott Bakeree6a0872019-10-29 15:59:52 -0700280 sc.started = true
281
khenaidoo43c82122018-11-22 18:38:28 -0500282 return nil
283}
284
285func (sc *SaramaClient) Stop() {
286 log.Info("stopping-sarama-client")
287
Scott Bakeree6a0872019-10-29 15:59:52 -0700288 sc.started = false
289
khenaidoo43c82122018-11-22 18:38:28 -0500290 //Send a message over the done channel to close all long running routines
291 sc.doneCh <- 1
292
khenaidoo43c82122018-11-22 18:38:28 -0500293 if sc.producer != nil {
294 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500295 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500296 }
297 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500298
khenaidoo43c82122018-11-22 18:38:28 -0500299 if sc.consumer != nil {
300 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500301 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500302 }
303 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500304
khenaidooca301322019-01-09 23:06:32 -0500305 for key, val := range sc.groupConsumers {
306 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
307 if err := val.Close(); err != nil {
308 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500309 }
310 }
311
312 if sc.cAdmin != nil {
313 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500314 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500315 }
316 }
317
318 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500319 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500320
321 log.Info("sarama-client-stopped")
322}
323
khenaidooca301322019-01-09 23:06:32 -0500324//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
325// the invoking function must hold the lock
326func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500327 // Set the topic details
328 topicDetail := &sarama.TopicDetail{}
329 topicDetail.NumPartitions = int32(numPartition)
330 topicDetail.ReplicationFactor = int16(repFactor)
331 topicDetail.ConfigEntries = make(map[string]*string)
332 topicDetails := make(map[string]*sarama.TopicDetail)
333 topicDetails[topic.Name] = topicDetail
334
335 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
336 if err == sarama.ErrTopicAlreadyExists {
337 // Not an error
338 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
339 return nil
340 }
341 log.Errorw("create-topic-failure", log.Fields{"error": err})
342 return err
343 }
344 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
345 // do so.
346 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
347 return nil
348}
349
khenaidooca301322019-01-09 23:06:32 -0500350//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
351// ensure no two go routines are performing operations on the same topic
352func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
353 sc.lockTopic(topic)
354 defer sc.unLockTopic(topic)
355
356 return sc.createTopic(topic, numPartition, repFactor)
357}
358
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500359//DeleteTopic removes a topic from the kafka Broker
360func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500361 sc.lockTopic(topic)
362 defer sc.unLockTopic(topic)
363
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500364 // Remove the topic from the broker
365 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
366 if err == sarama.ErrUnknownTopicOrPartition {
367 // Not an error as does not exist
368 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
369 return nil
370 }
371 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
372 return err
373 }
374
375 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
376 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
377 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
378 return err
379 }
380 return nil
381}
382
383// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
384// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500385func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500386 sc.lockTopic(topic)
387 defer sc.unLockTopic(topic)
388
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500389 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
390
391 // If a consumers already exist for that topic then resuse it
392 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
393 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
394 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500395 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500396 sc.addChannelToConsumerChannelMap(topic, ch)
397 return ch, nil
398 }
399
400 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500401 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500402 var err error
403
404 // Use the consumerType option to figure out the type of consumer to launch
405 if sc.consumerType == PartitionConsumer {
406 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500407 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500408 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
409 return nil, err
410 }
411 }
khenaidoo731697e2019-01-29 16:03:29 -0500412 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500413 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
414 return nil, err
415 }
416 } else if sc.consumerType == GroupCustomer {
417 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
418 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500419 //if sc.autoCreateTopic {
420 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
421 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
422 // return nil, err
423 // }
424 //}
425 //groupId := sc.consumerGroupName
426 groupId := getGroupId(kvArgs...)
427 // Include the group prefix
428 if groupId != "" {
429 groupId = sc.consumerGroupPrefix + groupId
430 } else {
431 // Need to use a unique group Id per topic
432 groupId = sc.consumerGroupPrefix + topic.Name
433 }
khenaidoo731697e2019-01-29 16:03:29 -0500434 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500435 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500436 return nil, err
437 }
khenaidooca301322019-01-09 23:06:32 -0500438
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500439 } else {
440 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
441 return nil, errors.New("unknown-consumer-type")
442 }
443
444 return consumerListeningChannel, nil
445}
446
447//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500448func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500449 sc.lockTopic(topic)
450 defer sc.unLockTopic(topic)
451
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500452 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500453 var err error
454 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
455 log.Errorw("failed-removing-channel", log.Fields{"error": err})
456 }
457 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
458 log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
459 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500460 return err
461}
462
Scott Bakeree6a0872019-10-29 15:59:52 -0700463func (sc *SaramaClient) updateLiveness(alive bool) {
464 // Post a consistent stream of liveness data to the channel,
465 // so that in a live state, the core does not timeout and
466 // send a forced liveness message. Production of liveness
467 // events to the channel is rate-limited by livenessChannelInterval.
468 if sc.liveness != nil {
469 if sc.alive != alive {
470 log.Info("update-liveness-channel-because-change")
471 sc.liveness <- alive
472 sc.lastLivenessTime = time.Now()
473 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
474 log.Info("update-liveness-channel-because-interval")
475 sc.liveness <- alive
476 sc.lastLivenessTime = time.Now()
477 }
478 }
479
480 // Only emit a log message when the state changes
481 if sc.alive != alive {
482 log.Info("set-client-alive", log.Fields{"alive": alive})
483 sc.alive = alive
484 }
485}
486
Devmalya Paulc594bb32019-11-06 07:34:27 +0000487func (sc *SaramaClient) isLivenessError(err error) bool {
488 // Sarama producers and consumers encapsulate the error inside
489 // a ProducerError or ConsumerError struct.
490 if prodError, ok := err.(*sarama.ProducerError); ok {
491 err = prodError.Err
492 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
493 err = consumerError.Err
494 }
495
496 // Sarama-Cluster will compose the error into a ClusterError struct,
497 // which we can't do a compare by reference. To handle that, we the
498 // best we can do is compare the error strings.
499
500 switch err.Error() {
501 case context.DeadlineExceeded.Error():
502 log.Info("is-liveness-error-timeout")
503 return true
504 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
505 log.Info("is-liveness-error-no-brokers")
506 return true
507 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
508 log.Info("is-liveness-error-shutting-down")
509 return true
510 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
511 log.Info("is-liveness-error-not-available")
512 return true
513 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
514 log.Info("is-liveness-error-circuit-breaker-open")
515 return true
516 }
517
518 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
519 log.Info("is-liveness-error-connection-refused")
520 return true
521 }
522
523 // Other errors shouldn't trigger a loss of liveness
524
525 log.Infow("is-liveness-error-ignored", log.Fields{"err": err})
526
527 return false
528}
529
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500530// send formats and sends the request onto the kafka messaging bus.
531func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
532
533 // Assert message is a proto message
534 var protoMsg proto.Message
535 var ok bool
536 // ascertain the value interface type is a proto.Message
537 if protoMsg, ok = msg.(proto.Message); !ok {
538 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
539 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
540 }
541
542 var marshalled []byte
543 var err error
544 // Create the Sarama producer message
545 if marshalled, err = proto.Marshal(protoMsg); err != nil {
546 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
547 return err
548 }
549 key := ""
550 if len(keys) > 0 {
551 key = keys[0] // Only the first key is relevant
552 }
553 kafkaMsg := &sarama.ProducerMessage{
554 Topic: topic.Name,
555 Key: sarama.StringEncoder(key),
556 Value: sarama.ByteEncoder(marshalled),
557 }
558
559 // Send message to kafka
560 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500561 // Wait for result
562 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
563 select {
564 case ok := <-sc.producer.Successes():
khenaidoo297cd252019-02-07 22:10:23 -0500565 log.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Bakeree6a0872019-10-29 15:59:52 -0700566 sc.updateLiveness(true)
khenaidoo90847922018-12-03 14:47:51 -0500567 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500568 log.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000569 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700570 sc.updateLiveness(false)
571 }
572 return notOk
573 }
574 return nil
575}
576
577// Enable the liveness monitor channel. This channel will report
578// a "true" or "false" on every publish, which indicates whether
579// or not the channel is still live. This channel is then picked up
580// by the service (i.e. rw_core / ro_core) to update readiness status
581// and/or take other actions.
582func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
583 log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
584 if enable {
585 if sc.liveness == nil {
586 log.Info("kafka-create-liveness-channel")
587 // At least 1, so we can immediately post to it without blocking
588 // Setting a bigger number (10) allows the monitor to fall behind
589 // without blocking others. The monitor shouldn't really fall
590 // behind...
591 sc.liveness = make(chan bool, 10)
592 // post intial state to the channel
593 sc.liveness <- sc.alive
594 }
595 } else {
596 // TODO: Think about whether we need the ability to turn off
597 // liveness monitoring
598 panic("Turning off liveness reporting is not supported")
599 }
600 return sc.liveness
601}
602
603// send an empty message on the liveness channel to check whether connectivity has
604// been restored.
605func (sc *SaramaClient) SendLiveness() error {
606 if !sc.started {
607 return fmt.Errorf("SendLiveness() called while not started")
608 }
609
610 kafkaMsg := &sarama.ProducerMessage{
611 Topic: "_liveness_test",
612 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
613 }
614
615 // Send message to kafka
616 sc.producer.Input() <- kafkaMsg
617 // Wait for result
618 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
619 select {
620 case ok := <-sc.producer.Successes():
621 log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
622 sc.updateLiveness(true)
623 case notOk := <-sc.producer.Errors():
624 log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000625 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700626 sc.updateLiveness(false)
627 }
khenaidoo90847922018-12-03 14:47:51 -0500628 return notOk
629 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500630 return nil
631}
632
khenaidooca301322019-01-09 23:06:32 -0500633// getGroupId returns the group id from the key-value args.
634func getGroupId(kvArgs ...*KVArg) string {
635 for _, arg := range kvArgs {
636 if arg.Key == GroupIdKey {
637 return arg.Value.(string)
638 }
639 }
640 return ""
641}
642
khenaidoo731697e2019-01-29 16:03:29 -0500643// getOffset returns the offset from the key-value args.
644func getOffset(kvArgs ...*KVArg) int64 {
645 for _, arg := range kvArgs {
646 if arg.Key == Offset {
647 return arg.Value.(int64)
648 }
649 }
650 return sarama.OffsetNewest
651}
652
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500653func (sc *SaramaClient) createClusterAdmin() error {
654 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
655 config := sarama.NewConfig()
656 config.Version = sarama.V1_0_0_0
657
658 // Create a cluster Admin
659 var cAdmin sarama.ClusterAdmin
660 var err error
661 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
662 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
663 return err
664 }
665 sc.cAdmin = cAdmin
666 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500667}
668
khenaidood2b6df92018-12-13 16:37:20 -0500669func (sc *SaramaClient) lockTopic(topic *Topic) {
670 sc.lockOfTopicLockMap.Lock()
671 if _, exist := sc.topicLockMap[topic.Name]; exist {
672 sc.lockOfTopicLockMap.Unlock()
673 sc.topicLockMap[topic.Name].Lock()
674 } else {
675 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
676 sc.lockOfTopicLockMap.Unlock()
677 sc.topicLockMap[topic.Name].Lock()
678 }
679}
680
681func (sc *SaramaClient) unLockTopic(topic *Topic) {
682 sc.lockOfTopicLockMap.Lock()
683 defer sc.lockOfTopicLockMap.Unlock()
684 if _, exist := sc.topicLockMap[topic.Name]; exist {
685 sc.topicLockMap[topic.Name].Unlock()
686 }
687}
688
khenaidoo43c82122018-11-22 18:38:28 -0500689func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
690 sc.lockTopicToConsumerChannelMap.Lock()
691 defer sc.lockTopicToConsumerChannelMap.Unlock()
692 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
693 sc.topicToConsumerChannelMap[id] = arg
694 }
695}
696
697func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
698 sc.lockTopicToConsumerChannelMap.Lock()
699 defer sc.lockTopicToConsumerChannelMap.Unlock()
700 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
701 delete(sc.topicToConsumerChannelMap, id)
702 }
703}
704
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500705func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400706 sc.lockTopicToConsumerChannelMap.RLock()
707 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500708
709 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
710 return consumerCh
711 }
712 return nil
713}
714
khenaidoo79232702018-12-04 11:00:41 -0500715func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500716 sc.lockTopicToConsumerChannelMap.Lock()
717 defer sc.lockTopicToConsumerChannelMap.Unlock()
718 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
719 consumerCh.channels = append(consumerCh.channels, ch)
720 return
721 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500722 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
723}
724
725//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
726func closeConsumers(consumers []interface{}) error {
727 var err error
728 for _, consumer := range consumers {
729 // Is it a partition consumers?
730 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
731 if errTemp := partionConsumer.Close(); errTemp != nil {
732 log.Debugw("partition!!!", log.Fields{"err": errTemp})
733 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
734 // This can occur on race condition
735 err = nil
736 } else {
737 err = errTemp
738 }
739 }
740 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
741 if errTemp := groupConsumer.Close(); errTemp != nil {
742 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
743 // This can occur on race condition
744 err = nil
745 } else {
746 err = errTemp
747 }
748 }
749 }
750 }
751 return err
khenaidoo43c82122018-11-22 18:38:28 -0500752}
753
khenaidoo79232702018-12-04 11:00:41 -0500754func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500755 sc.lockTopicToConsumerChannelMap.Lock()
756 defer sc.lockTopicToConsumerChannelMap.Unlock()
757 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
758 // Channel will be closed in the removeChannel method
759 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500760 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500761 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500762 log.Debugw("closing-consumers", log.Fields{"topic": topic})
763 err := closeConsumers(consumerCh.consumers)
764 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500765 delete(sc.topicToConsumerChannelMap, topic.Name)
766 return err
767 }
768 return nil
769 }
770 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
771 return errors.New("topic-does-not-exist")
772}
773
774func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
775 sc.lockTopicToConsumerChannelMap.Lock()
776 defer sc.lockTopicToConsumerChannelMap.Unlock()
777 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
778 for _, ch := range consumerCh.channels {
779 // Channel will be closed in the removeChannel method
780 removeChannel(consumerCh.channels, ch)
781 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500782 err := closeConsumers(consumerCh.consumers)
783 //if err == sarama.ErrUnknownTopicOrPartition {
784 // // Not an error
785 // err = nil
786 //}
787 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500788 delete(sc.topicToConsumerChannelMap, topic.Name)
789 return err
790 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500791 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
792 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500793}
794
795func (sc *SaramaClient) clearConsumerChannelMap() error {
796 sc.lockTopicToConsumerChannelMap.Lock()
797 defer sc.lockTopicToConsumerChannelMap.Unlock()
798 var err error
799 for topic, consumerCh := range sc.topicToConsumerChannelMap {
800 for _, ch := range consumerCh.channels {
801 // Channel will be closed in the removeChannel method
802 removeChannel(consumerCh.channels, ch)
803 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500804 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
805 err = errTemp
806 }
807 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500808 delete(sc.topicToConsumerChannelMap, topic)
809 }
810 return err
811}
812
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500813//createPublisher creates the publisher which is used to send a message onto kafka
814func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500815 // This Creates the publisher
816 config := sarama.NewConfig()
817 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500818 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
819 config.Producer.Flush.Messages = sc.producerFlushMessages
820 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
821 config.Producer.Return.Errors = sc.producerReturnErrors
822 config.Producer.Return.Successes = sc.producerReturnSuccess
823 //config.Producer.RequiredAcks = sarama.WaitForAll
824 config.Producer.RequiredAcks = sarama.WaitForLocal
825
khenaidoo43c82122018-11-22 18:38:28 -0500826 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
827 brokers := []string{kafkaFullAddr}
828
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500829 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
830 log.Errorw("error-starting-publisher", log.Fields{"error": err})
831 return err
832 } else {
833 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500834 }
835 log.Info("Kafka-publisher-created")
836 return nil
837}
838
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500839func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500840 config := sarama.NewConfig()
841 config.Consumer.Return.Errors = true
842 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500843 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
844 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500845 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530846 config.Metadata.Retry.Max = sc.metadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500847 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
848 brokers := []string{kafkaFullAddr}
849
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500850 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
851 log.Errorw("error-starting-consumers", log.Fields{"error": err})
852 return err
853 } else {
854 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500855 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500856 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500857 return nil
858}
859
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500860// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500861func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500862 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500863 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500864 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Bakeree6a0872019-10-29 15:59:52 -0700865 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
866 config.Consumer.Return.Errors = true
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500867 //config.Group.Return.Notifications = false
868 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
869 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500870 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500871 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500872 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
873 brokers := []string{kafkaFullAddr}
874
khenaidoo43c82122018-11-22 18:38:28 -0500875 topics := []string{topic.Name}
876 var consumer *scc.Consumer
877 var err error
878
khenaidooca301322019-01-09 23:06:32 -0500879 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
880 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500881 return nil, err
882 }
khenaidooca301322019-01-09 23:06:32 -0500883 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500884
885 //sc.groupConsumers[topic.Name] = consumer
886 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500887 return consumer, nil
888}
889
khenaidoo43c82122018-11-22 18:38:28 -0500890// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500891// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500892func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500893 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400894 sc.lockTopicToConsumerChannelMap.RLock()
895 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500896 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500897 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500898 c <- protoMessage
899 }(ch)
900 }
901}
902
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500903func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
904 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500905startloop:
906 for {
907 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500908 case err, ok := <-consumer.Errors():
909 if ok {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500910 if sc.isLivenessError(err) {
911 sc.updateLiveness(false)
912 log.Warnw("partition-consumers-error", log.Fields{"error": err})
913 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500914 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500915 // Channel is closed
916 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500917 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500918 case msg, ok := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500919 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500920 if !ok {
921 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500922 break startloop
923 }
924 msgBody := msg.Value
khenaidoo6e55d9e2019-12-12 18:26:26 -0500925 sc.updateLiveness(true)
926 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo79232702018-12-04 11:00:41 -0500927 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500928 if err := proto.Unmarshal(msgBody, icm); err != nil {
929 log.Warnw("partition-invalid-message", log.Fields{"error": err})
930 continue
931 }
932 go sc.dispatchToConsumers(consumerChnls, icm)
933 case <-sc.doneCh:
934 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
935 break startloop
936 }
937 }
938 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
939}
940
941func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
942 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
943
944startloop:
945 for {
946 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500947 case err, ok := <-consumer.Errors():
948 if ok {
Devmalya Paulc594bb32019-11-06 07:34:27 +0000949 if sc.isLivenessError(err) {
950 sc.updateLiveness(false)
951 }
khenaidooca301322019-01-09 23:06:32 -0500952 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500953 } else {
Scott Bakeree6a0872019-10-29 15:59:52 -0700954 log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500955 // channel is closed
956 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500957 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500958 case msg, ok := <-consumer.Messages():
959 if !ok {
Scott Bakeree6a0872019-10-29 15:59:52 -0700960 log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500961 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500962 break startloop
963 }
Scott Bakeree6a0872019-10-29 15:59:52 -0700964 sc.updateLiveness(true)
khenaidoo297cd252019-02-07 22:10:23 -0500965 log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500966 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500967 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500968 if err := proto.Unmarshal(msgBody, icm); err != nil {
969 log.Warnw("invalid-message", log.Fields{"error": err})
970 continue
971 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500972 go sc.dispatchToConsumers(consumerChnls, icm)
973 consumer.MarkOffset(msg, "")
974 case ntf := <-consumer.Notifications():
975 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500976 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500977 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500978 break startloop
979 }
980 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500981 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500982}
983
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500984func (sc *SaramaClient) startConsumers(topic *Topic) error {
985 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
986 var consumerCh *consumerChannels
987 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
988 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
989 return errors.New("consumers-not-exist")
990 }
991 // For each consumer listening for that topic, start a consumption loop
992 for _, consumer := range consumerCh.consumers {
993 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
994 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
995 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
996 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
997 } else {
998 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
999 return errors.New("invalid-consumer")
1000 }
1001 }
1002 return nil
1003}
1004
1005//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1006//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -05001007func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001008 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -05001009 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001010
khenaidoo7ff26c72019-01-16 14:55:48 -05001011 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001012 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001013 return nil, err
1014 }
1015
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001016 consumersIf := make([]interface{}, 0)
1017 for _, pConsumer := range pConsumers {
1018 consumersIf = append(consumersIf, pConsumer)
1019 }
1020
1021 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -05001022 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001023 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -05001024 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001025 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -05001026 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -05001027 }
1028
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001029 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -05001030 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1031
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001032 //Start a consumers to listen on that specific topic
1033 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -05001034
1035 return consumerListeningChannel, nil
1036}
1037
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001038// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1039// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -05001040func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001041 // TODO: Replace this development partition consumers with a group consumers
1042 var pConsumer *scc.Consumer
1043 var err error
khenaidoo731697e2019-01-29 16:03:29 -05001044 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001045 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
1046 return nil, err
1047 }
1048 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1049 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001050 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001051 cc := &consumerChannels{
1052 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -05001053 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001054 }
1055
1056 // Add the consumers channel to the map
1057 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1058
1059 //Start a consumers to listen on that specific topic
1060 go sc.startConsumers(topic)
1061
1062 return consumerListeningChannel, nil
1063}
1064
khenaidoo7ff26c72019-01-16 14:55:48 -05001065func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001066 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001067 partitionList, err := sc.consumer.Partitions(topic.Name)
1068 if err != nil {
1069 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1070 return nil, err
1071 }
1072
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001073 pConsumers := make([]sarama.PartitionConsumer, 0)
1074 for _, partition := range partitionList {
1075 var pConsumer sarama.PartitionConsumer
1076 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
1077 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
1078 return nil, err
1079 }
1080 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -05001081 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001082 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -05001083}
1084
khenaidoo79232702018-12-04 11:00:41 -05001085func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -05001086 var i int
khenaidoo79232702018-12-04 11:00:41 -05001087 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -05001088 for i, channel = range channels {
1089 if channel == ch {
1090 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1091 close(channel)
khenaidoo3dfc8bc2019-01-10 16:48:25 -05001092 log.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -05001093 return channels[:len(channels)-1]
1094 }
1095 }
1096 return channels
1097}
khenaidoo7ff26c72019-01-16 14:55:48 -05001098
khenaidoo7ff26c72019-01-16 14:55:48 -05001099func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1100 sc.lockOfGroupConsumers.Lock()
1101 defer sc.lockOfGroupConsumers.Unlock()
1102 if _, exist := sc.groupConsumers[topic]; !exist {
1103 sc.groupConsumers[topic] = consumer
1104 }
1105}
1106
1107func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1108 sc.lockOfGroupConsumers.Lock()
1109 defer sc.lockOfGroupConsumers.Unlock()
1110 if _, exist := sc.groupConsumers[topic]; exist {
1111 consumer := sc.groupConsumers[topic]
1112 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -04001113 if err := consumer.Close(); err != nil {
khenaidoo7ff26c72019-01-16 14:55:48 -05001114 log.Errorw("failure-closing-consumer", log.Fields{"error": err})
1115 return err
1116 }
1117 }
1118 return nil
1119}