blob: 1e4efae47e2aecb7b9cb8f36f9ac0116e44280b3 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Pauldd23a992019-11-14 07:06:31 +000019 "context"
William Kurkianea869482019-04-09 15:16:11 -040020 "errors"
21 "fmt"
Esin Karamanccb714b2019-11-29 15:02:06 +000022 "strings"
23 "sync"
24 "time"
25
kdarapub26b4502019-10-05 03:02:33 +053026 "github.com/Shopify/sarama"
William Kurkianea869482019-04-09 15:16:11 -040027 scc "github.com/bsm/sarama-cluster"
Devmalya Pauldd23a992019-11-14 07:06:31 +000028 "github.com/eapache/go-resiliency/breaker"
William Kurkianea869482019-04-09 15:16:11 -040029 "github.com/golang/protobuf/proto"
Scott Bakered4a8e72020-04-17 11:10:20 -070030 "github.com/golang/protobuf/ptypes"
William Kurkianea869482019-04-09 15:16:11 -040031 "github.com/google/uuid"
Girish Gowdraa09aeab2020-09-14 16:30:52 -070032 "github.com/opencord/voltha-lib-go/v4/pkg/log"
33 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
William Kurkianea869482019-04-09 15:16:11 -040034)
35
William Kurkianea869482019-04-09 15:16:11 -040036// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
37// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
38//consumer or a group consumer
39type consumerChannels struct {
40 consumers []interface{}
41 channels []chan *ic.InterContainerMessage
42}
43
npujarec5762e2020-01-01 14:08:48 +053044// static check to ensure SaramaClient implements Client
45var _ Client = &SaramaClient{}
46
William Kurkianea869482019-04-09 15:16:11 -040047// SaramaClient represents the messaging proxy
48type SaramaClient struct {
49 cAdmin sarama.ClusterAdmin
Neha Sharma3f221ae2020-04-29 19:02:12 +000050 KafkaAddress string
William Kurkianea869482019-04-09 15:16:11 -040051 producer sarama.AsyncProducer
52 consumer sarama.Consumer
53 groupConsumers map[string]*scc.Consumer
Matt Jeanneret384d8c92019-05-06 14:27:31 -040054 lockOfGroupConsumers sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040055 consumerGroupPrefix string
56 consumerType int
57 consumerGroupName string
58 producerFlushFrequency int
59 producerFlushMessages int
60 producerFlushMaxmessages int
61 producerRetryMax int
62 producerRetryBackOff time.Duration
63 producerReturnSuccess bool
64 producerReturnErrors bool
65 consumerMaxwait int
66 maxProcessingTime int
67 numPartitions int
68 numReplicas int
69 autoCreateTopic bool
70 doneCh chan int
Scott Bakered4a8e72020-04-17 11:10:20 -070071 metadataCallback func(fromTopic string, timestamp time.Time)
William Kurkianea869482019-04-09 15:16:11 -040072 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
74 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
Mahir Gunyele77977b2019-06-27 05:36:22 -070076 metadataMaxRetry int
cbabu95f21522019-11-13 14:25:18 +010077 alive bool
Girish Kumar935f7af2020-08-18 11:59:42 +000078 livenessMutex sync.Mutex
cbabu95f21522019-11-13 14:25:18 +010079 liveness chan bool
80 livenessChannelInterval time.Duration
81 lastLivenessTime time.Time
82 started bool
Girish Kumar935f7af2020-08-18 11:59:42 +000083 healthinessMutex sync.Mutex
Scott Baker86fce9a2019-12-12 09:47:17 -080084 healthy bool
85 healthiness chan bool
William Kurkianea869482019-04-09 15:16:11 -040086}
87
88type SaramaClientOption func(*SaramaClient)
89
Neha Sharma3f221ae2020-04-29 19:02:12 +000090func Address(address string) SaramaClientOption {
William Kurkianea869482019-04-09 15:16:11 -040091 return func(args *SaramaClient) {
Neha Sharma3f221ae2020-04-29 19:02:12 +000092 args.KafkaAddress = address
William Kurkianea869482019-04-09 15:16:11 -040093 }
94}
95
96func ConsumerGroupPrefix(prefix string) SaramaClientOption {
97 return func(args *SaramaClient) {
98 args.consumerGroupPrefix = prefix
99 }
100}
101
102func ConsumerGroupName(name string) SaramaClientOption {
103 return func(args *SaramaClient) {
104 args.consumerGroupName = name
105 }
106}
107
108func ConsumerType(consumer int) SaramaClientOption {
109 return func(args *SaramaClient) {
110 args.consumerType = consumer
111 }
112}
113
114func ProducerFlushFrequency(frequency int) SaramaClientOption {
115 return func(args *SaramaClient) {
116 args.producerFlushFrequency = frequency
117 }
118}
119
120func ProducerFlushMessages(num int) SaramaClientOption {
121 return func(args *SaramaClient) {
122 args.producerFlushMessages = num
123 }
124}
125
126func ProducerFlushMaxMessages(num int) SaramaClientOption {
127 return func(args *SaramaClient) {
128 args.producerFlushMaxmessages = num
129 }
130}
131
132func ProducerMaxRetries(num int) SaramaClientOption {
133 return func(args *SaramaClient) {
134 args.producerRetryMax = num
135 }
136}
137
138func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
139 return func(args *SaramaClient) {
140 args.producerRetryBackOff = duration
141 }
142}
143
144func ProducerReturnOnErrors(opt bool) SaramaClientOption {
145 return func(args *SaramaClient) {
146 args.producerReturnErrors = opt
147 }
148}
149
150func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
151 return func(args *SaramaClient) {
152 args.producerReturnSuccess = opt
153 }
154}
155
156func ConsumerMaxWait(wait int) SaramaClientOption {
157 return func(args *SaramaClient) {
158 args.consumerMaxwait = wait
159 }
160}
161
162func MaxProcessingTime(pTime int) SaramaClientOption {
163 return func(args *SaramaClient) {
164 args.maxProcessingTime = pTime
165 }
166}
167
168func NumPartitions(number int) SaramaClientOption {
169 return func(args *SaramaClient) {
170 args.numPartitions = number
171 }
172}
173
174func NumReplicas(number int) SaramaClientOption {
175 return func(args *SaramaClient) {
176 args.numReplicas = number
177 }
178}
179
180func AutoCreateTopic(opt bool) SaramaClientOption {
181 return func(args *SaramaClient) {
182 args.autoCreateTopic = opt
183 }
184}
185
Mahir Gunyele77977b2019-06-27 05:36:22 -0700186func MetadatMaxRetries(retry int) SaramaClientOption {
187 return func(args *SaramaClient) {
188 args.metadataMaxRetry = retry
189 }
190}
191
cbabu95f21522019-11-13 14:25:18 +0100192func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
193 return func(args *SaramaClient) {
194 args.livenessChannelInterval = opt
195 }
196}
197
William Kurkianea869482019-04-09 15:16:11 -0400198func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
199 client := &SaramaClient{
Neha Sharma3f221ae2020-04-29 19:02:12 +0000200 KafkaAddress: DefaultKafkaAddress,
William Kurkianea869482019-04-09 15:16:11 -0400201 }
202 client.consumerType = DefaultConsumerType
203 client.producerFlushFrequency = DefaultProducerFlushFrequency
204 client.producerFlushMessages = DefaultProducerFlushMessages
205 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
206 client.producerReturnErrors = DefaultProducerReturnErrors
207 client.producerReturnSuccess = DefaultProducerReturnSuccess
208 client.producerRetryMax = DefaultProducerRetryMax
209 client.producerRetryBackOff = DefaultProducerRetryBackoff
210 client.consumerMaxwait = DefaultConsumerMaxwait
211 client.maxProcessingTime = DefaultMaxProcessingTime
212 client.numPartitions = DefaultNumberPartitions
213 client.numReplicas = DefaultNumberReplicas
214 client.autoCreateTopic = DefaultAutoCreateTopic
Mahir Gunyele77977b2019-06-27 05:36:22 -0700215 client.metadataMaxRetry = DefaultMetadataMaxRetry
cbabu95f21522019-11-13 14:25:18 +0100216 client.livenessChannelInterval = DefaultLivenessChannelInterval
William Kurkianea869482019-04-09 15:16:11 -0400217
218 for _, option := range opts {
219 option(client)
220 }
221
222 client.groupConsumers = make(map[string]*scc.Consumer)
223
224 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
225 client.topicLockMap = make(map[string]*sync.RWMutex)
226 client.lockOfTopicLockMap = sync.RWMutex{}
227 client.lockOfGroupConsumers = sync.RWMutex{}
cbabu95f21522019-11-13 14:25:18 +0100228
Scott Baker86fce9a2019-12-12 09:47:17 -0800229 // healthy and alive until proven otherwise
cbabu95f21522019-11-13 14:25:18 +0100230 client.alive = true
Scott Baker86fce9a2019-12-12 09:47:17 -0800231 client.healthy = true
cbabu95f21522019-11-13 14:25:18 +0100232
William Kurkianea869482019-04-09 15:16:11 -0400233 return client
234}
235
Neha Sharma96b7bf22020-06-15 10:37:32 +0000236func (sc *SaramaClient) Start(ctx context.Context) error {
237 logger.Info(ctx, "Starting-kafka-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400238
239 // Create the Done channel
240 sc.doneCh = make(chan int, 1)
241
242 var err error
243
Devmalya Paul495b94a2019-08-27 19:42:00 -0400244 // Add a cleanup in case of failure to startup
245 defer func() {
246 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000247 sc.Stop(ctx)
Devmalya Paul495b94a2019-08-27 19:42:00 -0400248 }
249 }()
250
William Kurkianea869482019-04-09 15:16:11 -0400251 // Create the Cluster Admin
Neha Sharma96b7bf22020-06-15 10:37:32 +0000252 if err = sc.createClusterAdmin(ctx); err != nil {
253 logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400254 return err
255 }
256
257 // Create the Publisher
Neha Sharma96b7bf22020-06-15 10:37:32 +0000258 if err := sc.createPublisher(ctx); err != nil {
259 logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400260 return err
261 }
262
263 if sc.consumerType == DefaultConsumerType {
264 // Create the master consumers
Neha Sharma96b7bf22020-06-15 10:37:32 +0000265 if err := sc.createConsumer(ctx); err != nil {
266 logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400267 return err
268 }
269 }
270
271 // Create the topic to consumers/channel map
272 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
273
Neha Sharma96b7bf22020-06-15 10:37:32 +0000274 logger.Info(ctx, "kafka-sarama-client-started")
William Kurkianea869482019-04-09 15:16:11 -0400275
cbabu95f21522019-11-13 14:25:18 +0100276 sc.started = true
277
William Kurkianea869482019-04-09 15:16:11 -0400278 return nil
279}
280
Neha Sharma96b7bf22020-06-15 10:37:32 +0000281func (sc *SaramaClient) Stop(ctx context.Context) {
282 logger.Info(ctx, "stopping-sarama-client")
William Kurkianea869482019-04-09 15:16:11 -0400283
cbabu95f21522019-11-13 14:25:18 +0100284 sc.started = false
285
William Kurkianea869482019-04-09 15:16:11 -0400286 //Send a message over the done channel to close all long running routines
287 sc.doneCh <- 1
288
289 if sc.producer != nil {
290 if err := sc.producer.Close(); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000291 logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400292 }
293 }
294
295 if sc.consumer != nil {
296 if err := sc.consumer.Close(); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000297 logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400298 }
299 }
300
301 for key, val := range sc.groupConsumers {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000302 logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400303 if err := val.Close(); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000304 logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
William Kurkianea869482019-04-09 15:16:11 -0400305 }
306 }
307
308 if sc.cAdmin != nil {
309 if err := sc.cAdmin.Close(); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000310 logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400311 }
312 }
313
314 //TODO: Clear the consumers map
315 //sc.clearConsumerChannelMap()
316
Neha Sharma96b7bf22020-06-15 10:37:32 +0000317 logger.Info(ctx, "sarama-client-stopped")
William Kurkianea869482019-04-09 15:16:11 -0400318}
319
320//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
321// the invoking function must hold the lock
Neha Sharma96b7bf22020-06-15 10:37:32 +0000322func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
William Kurkianea869482019-04-09 15:16:11 -0400323 // Set the topic details
324 topicDetail := &sarama.TopicDetail{}
325 topicDetail.NumPartitions = int32(numPartition)
326 topicDetail.ReplicationFactor = int16(repFactor)
327 topicDetail.ConfigEntries = make(map[string]*string)
328 topicDetails := make(map[string]*sarama.TopicDetail)
329 topicDetails[topic.Name] = topicDetail
330
331 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
332 if err == sarama.ErrTopicAlreadyExists {
333 // Not an error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000334 logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400335 return nil
336 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000337 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400338 return err
339 }
340 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
341 // do so.
Neha Sharma96b7bf22020-06-15 10:37:32 +0000342 logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
William Kurkianea869482019-04-09 15:16:11 -0400343 return nil
344}
345
346//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
347// ensure no two go routines are performing operations on the same topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000348func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
William Kurkianea869482019-04-09 15:16:11 -0400349 sc.lockTopic(topic)
350 defer sc.unLockTopic(topic)
351
Neha Sharma96b7bf22020-06-15 10:37:32 +0000352 return sc.createTopic(ctx, topic, numPartition, repFactor)
William Kurkianea869482019-04-09 15:16:11 -0400353}
354
355//DeleteTopic removes a topic from the kafka Broker
Neha Sharma96b7bf22020-06-15 10:37:32 +0000356func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
William Kurkianea869482019-04-09 15:16:11 -0400357 sc.lockTopic(topic)
358 defer sc.unLockTopic(topic)
359
360 // Remove the topic from the broker
361 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
362 if err == sarama.ErrUnknownTopicOrPartition {
363 // Not an error as does not exist
Neha Sharma96b7bf22020-06-15 10:37:32 +0000364 logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400365 return nil
366 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000367 logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400368 return err
369 }
370
371 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
Neha Sharma96b7bf22020-06-15 10:37:32 +0000372 if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
373 logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400374 return err
375 }
376 return nil
377}
378
379// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
380// messages from that topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000381func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
William Kurkianea869482019-04-09 15:16:11 -0400382 sc.lockTopic(topic)
383 defer sc.unLockTopic(topic)
384
Neha Sharma96b7bf22020-06-15 10:37:32 +0000385 logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400386
387 // If a consumers already exist for that topic then resuse it
388 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000389 logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400390 // Create a channel specific for that consumers and add it to the consumers channel map
391 ch := make(chan *ic.InterContainerMessage)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000392 sc.addChannelToConsumerChannelMap(ctx, topic, ch)
William Kurkianea869482019-04-09 15:16:11 -0400393 return ch, nil
394 }
395
396 // Register for the topic and set it up
397 var consumerListeningChannel chan *ic.InterContainerMessage
398 var err error
399
400 // Use the consumerType option to figure out the type of consumer to launch
401 if sc.consumerType == PartitionConsumer {
402 if sc.autoCreateTopic {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000403 if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
404 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400405 return nil, err
406 }
407 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000408 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
409 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400410 return nil, err
411 }
412 } else if sc.consumerType == GroupCustomer {
413 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
414 // does not consume from a precreated topic in some scenarios
415 //if sc.autoCreateTopic {
416 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000417 // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400418 // return nil, err
419 // }
420 //}
421 //groupId := sc.consumerGroupName
422 groupId := getGroupId(kvArgs...)
423 // Include the group prefix
424 if groupId != "" {
425 groupId = sc.consumerGroupPrefix + groupId
426 } else {
427 // Need to use a unique group Id per topic
428 groupId = sc.consumerGroupPrefix + topic.Name
429 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000430 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
431 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400432 return nil, err
433 }
434
435 } else {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000436 logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
William Kurkianea869482019-04-09 15:16:11 -0400437 return nil, errors.New("unknown-consumer-type")
438 }
439
440 return consumerListeningChannel, nil
441}
442
443//UnSubscribe unsubscribe a consumer from a given topic
Neha Sharma96b7bf22020-06-15 10:37:32 +0000444func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
William Kurkianea869482019-04-09 15:16:11 -0400445 sc.lockTopic(topic)
446 defer sc.unLockTopic(topic)
447
Neha Sharma96b7bf22020-06-15 10:37:32 +0000448 logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400449 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +0000450 if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
451 logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400452 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000453 if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
454 logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400455 }
456 return err
457}
458
Neha Sharma96b7bf22020-06-15 10:37:32 +0000459func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
npujarec5762e2020-01-01 14:08:48 +0530460 sc.metadataCallback = callback
461}
462
Neha Sharma96b7bf22020-06-15 10:37:32 +0000463func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
cbabu95f21522019-11-13 14:25:18 +0100464 // Post a consistent stream of liveness data to the channel,
465 // so that in a live state, the core does not timeout and
466 // send a forced liveness message. Production of liveness
467 // events to the channel is rate-limited by livenessChannelInterval.
Girish Kumar935f7af2020-08-18 11:59:42 +0000468 sc.livenessMutex.Lock()
469 defer sc.livenessMutex.Unlock()
cbabu95f21522019-11-13 14:25:18 +0100470 if sc.liveness != nil {
471 if sc.alive != alive {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000472 logger.Info(ctx, "update-liveness-channel-because-change")
cbabu95f21522019-11-13 14:25:18 +0100473 sc.liveness <- alive
474 sc.lastLivenessTime = time.Now()
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000475 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000476 logger.Info(ctx, "update-liveness-channel-because-interval")
cbabu95f21522019-11-13 14:25:18 +0100477 sc.liveness <- alive
478 sc.lastLivenessTime = time.Now()
479 }
480 }
481
482 // Only emit a log message when the state changes
483 if sc.alive != alive {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000484 logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
cbabu95f21522019-11-13 14:25:18 +0100485 sc.alive = alive
486 }
487}
488
Scott Baker86fce9a2019-12-12 09:47:17 -0800489// Once unhealthy, we never go back
Neha Sharma96b7bf22020-06-15 10:37:32 +0000490func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
Scott Baker86fce9a2019-12-12 09:47:17 -0800491 sc.healthy = false
Girish Kumar935f7af2020-08-18 11:59:42 +0000492 sc.healthinessMutex.Lock()
493 defer sc.healthinessMutex.Unlock()
Scott Baker86fce9a2019-12-12 09:47:17 -0800494 if sc.healthiness != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000495 logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker86fce9a2019-12-12 09:47:17 -0800496 sc.healthiness <- sc.healthy
497 }
498}
499
Neha Sharma96b7bf22020-06-15 10:37:32 +0000500func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
Devmalya Pauldd23a992019-11-14 07:06:31 +0000501 // Sarama producers and consumers encapsulate the error inside
502 // a ProducerError or ConsumerError struct.
503 if prodError, ok := err.(*sarama.ProducerError); ok {
504 err = prodError.Err
505 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
506 err = consumerError.Err
507 }
508
509 // Sarama-Cluster will compose the error into a ClusterError struct,
510 // which we can't do a compare by reference. To handle that, we the
511 // best we can do is compare the error strings.
512
513 switch err.Error() {
514 case context.DeadlineExceeded.Error():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000515 logger.Info(ctx, "is-liveness-error-timeout")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000516 return true
517 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Neha Sharma96b7bf22020-06-15 10:37:32 +0000518 logger.Info(ctx, "is-liveness-error-no-brokers")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000519 return true
520 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Neha Sharma96b7bf22020-06-15 10:37:32 +0000521 logger.Info(ctx, "is-liveness-error-shutting-down")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000522 return true
523 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Neha Sharma96b7bf22020-06-15 10:37:32 +0000524 logger.Info(ctx, "is-liveness-error-not-available")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000525 return true
526 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Neha Sharma96b7bf22020-06-15 10:37:32 +0000527 logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000528 return true
529 }
530
531 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Neha Sharma96b7bf22020-06-15 10:37:32 +0000532 logger.Info(ctx, "is-liveness-error-connection-refused")
Devmalya Pauldd23a992019-11-14 07:06:31 +0000533 return true
534 }
535
Scott Bakeree7c0a02020-01-07 11:12:26 -0800536 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Neha Sharma96b7bf22020-06-15 10:37:32 +0000537 logger.Info(ctx, "is-liveness-error-io-timeout")
Scott Bakeree7c0a02020-01-07 11:12:26 -0800538 return true
539 }
540
Devmalya Pauldd23a992019-11-14 07:06:31 +0000541 // Other errors shouldn't trigger a loss of liveness
542
Neha Sharma96b7bf22020-06-15 10:37:32 +0000543 logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Pauldd23a992019-11-14 07:06:31 +0000544
545 return false
546}
547
William Kurkianea869482019-04-09 15:16:11 -0400548// send formats and sends the request onto the kafka messaging bus.
Neha Sharma96b7bf22020-06-15 10:37:32 +0000549func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
William Kurkianea869482019-04-09 15:16:11 -0400550
551 // Assert message is a proto message
552 var protoMsg proto.Message
553 var ok bool
554 // ascertain the value interface type is a proto.Message
555 if protoMsg, ok = msg.(proto.Message); !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000556 logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000557 return fmt.Errorf("not-a-proto-msg-%s", msg)
William Kurkianea869482019-04-09 15:16:11 -0400558 }
559
560 var marshalled []byte
561 var err error
562 // Create the Sarama producer message
563 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000564 logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400565 return err
566 }
567 key := ""
568 if len(keys) > 0 {
569 key = keys[0] // Only the first key is relevant
570 }
571 kafkaMsg := &sarama.ProducerMessage{
572 Topic: topic.Name,
573 Key: sarama.StringEncoder(key),
574 Value: sarama.ByteEncoder(marshalled),
575 }
576
577 // Send message to kafka
578 sc.producer.Input() <- kafkaMsg
William Kurkianea869482019-04-09 15:16:11 -0400579 // Wait for result
580 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
581 select {
582 case ok := <-sc.producer.Successes():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000583 logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
584 sc.updateLiveness(ctx, true)
William Kurkianea869482019-04-09 15:16:11 -0400585 case notOk := <-sc.producer.Errors():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000586 logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
587 if sc.isLivenessError(ctx, notOk) {
588 sc.updateLiveness(ctx, false)
cbabu95f21522019-11-13 14:25:18 +0100589 }
590 return notOk
591 }
592 return nil
593}
594
595// Enable the liveness monitor channel. This channel will report
596// a "true" or "false" on every publish, which indicates whether
597// or not the channel is still live. This channel is then picked up
598// by the service (i.e. rw_core / ro_core) to update readiness status
599// and/or take other actions.
Neha Sharma96b7bf22020-06-15 10:37:32 +0000600func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
601 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
cbabu95f21522019-11-13 14:25:18 +0100602 if enable {
Girish Kumar935f7af2020-08-18 11:59:42 +0000603 sc.livenessMutex.Lock()
604 defer sc.livenessMutex.Unlock()
cbabu95f21522019-11-13 14:25:18 +0100605 if sc.liveness == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000606 logger.Info(ctx, "kafka-create-liveness-channel")
cbabu95f21522019-11-13 14:25:18 +0100607 // At least 1, so we can immediately post to it without blocking
608 // Setting a bigger number (10) allows the monitor to fall behind
609 // without blocking others. The monitor shouldn't really fall
610 // behind...
611 sc.liveness = make(chan bool, 10)
612 // post intial state to the channel
613 sc.liveness <- sc.alive
614 }
615 } else {
616 // TODO: Think about whether we need the ability to turn off
617 // liveness monitoring
618 panic("Turning off liveness reporting is not supported")
619 }
620 return sc.liveness
621}
622
Scott Baker86fce9a2019-12-12 09:47:17 -0800623// Enable the Healthiness monitor channel. This channel will report "false"
624// if the kafka consumers die, or some other problem occurs which is
625// catastrophic that would require re-creating the client.
Neha Sharma96b7bf22020-06-15 10:37:32 +0000626func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
627 logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker86fce9a2019-12-12 09:47:17 -0800628 if enable {
Girish Kumar935f7af2020-08-18 11:59:42 +0000629 sc.healthinessMutex.Lock()
630 defer sc.healthinessMutex.Unlock()
Scott Baker86fce9a2019-12-12 09:47:17 -0800631 if sc.healthiness == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000632 logger.Info(ctx, "kafka-create-healthiness-channel")
Scott Baker86fce9a2019-12-12 09:47:17 -0800633 // At least 1, so we can immediately post to it without blocking
634 // Setting a bigger number (10) allows the monitor to fall behind
635 // without blocking others. The monitor shouldn't really fall
636 // behind...
637 sc.healthiness = make(chan bool, 10)
638 // post intial state to the channel
639 sc.healthiness <- sc.healthy
640 }
641 } else {
642 // TODO: Think about whether we need the ability to turn off
643 // liveness monitoring
644 panic("Turning off healthiness reporting is not supported")
645 }
646 return sc.healthiness
647}
648
cbabu95f21522019-11-13 14:25:18 +0100649// send an empty message on the liveness channel to check whether connectivity has
650// been restored.
Neha Sharma96b7bf22020-06-15 10:37:32 +0000651func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
cbabu95f21522019-11-13 14:25:18 +0100652 if !sc.started {
653 return fmt.Errorf("SendLiveness() called while not started")
654 }
655
656 kafkaMsg := &sarama.ProducerMessage{
657 Topic: "_liveness_test",
658 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
659 }
660
661 // Send message to kafka
662 sc.producer.Input() <- kafkaMsg
663 // Wait for result
664 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
665 select {
666 case ok := <-sc.producer.Successes():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000667 logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
668 sc.updateLiveness(ctx, true)
cbabu95f21522019-11-13 14:25:18 +0100669 case notOk := <-sc.producer.Errors():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000670 logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
671 if sc.isLivenessError(ctx, notOk) {
672 sc.updateLiveness(ctx, false)
cbabu95f21522019-11-13 14:25:18 +0100673 }
William Kurkianea869482019-04-09 15:16:11 -0400674 return notOk
675 }
676 return nil
677}
678
679// getGroupId returns the group id from the key-value args.
680func getGroupId(kvArgs ...*KVArg) string {
681 for _, arg := range kvArgs {
682 if arg.Key == GroupIdKey {
683 return arg.Value.(string)
684 }
685 }
686 return ""
687}
688
689// getOffset returns the offset from the key-value args.
690func getOffset(kvArgs ...*KVArg) int64 {
691 for _, arg := range kvArgs {
692 if arg.Key == Offset {
693 return arg.Value.(int64)
694 }
695 }
696 return sarama.OffsetNewest
697}
698
Neha Sharma96b7bf22020-06-15 10:37:32 +0000699func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
William Kurkianea869482019-04-09 15:16:11 -0400700 config := sarama.NewConfig()
701 config.Version = sarama.V1_0_0_0
702
703 // Create a cluster Admin
704 var cAdmin sarama.ClusterAdmin
705 var err error
Neha Sharma3f221ae2020-04-29 19:02:12 +0000706 if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000707 logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
William Kurkianea869482019-04-09 15:16:11 -0400708 return err
709 }
710 sc.cAdmin = cAdmin
711 return nil
712}
713
714func (sc *SaramaClient) lockTopic(topic *Topic) {
715 sc.lockOfTopicLockMap.Lock()
716 if _, exist := sc.topicLockMap[topic.Name]; exist {
717 sc.lockOfTopicLockMap.Unlock()
718 sc.topicLockMap[topic.Name].Lock()
719 } else {
720 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
721 sc.lockOfTopicLockMap.Unlock()
722 sc.topicLockMap[topic.Name].Lock()
723 }
724}
725
726func (sc *SaramaClient) unLockTopic(topic *Topic) {
727 sc.lockOfTopicLockMap.Lock()
728 defer sc.lockOfTopicLockMap.Unlock()
729 if _, exist := sc.topicLockMap[topic.Name]; exist {
730 sc.topicLockMap[topic.Name].Unlock()
731 }
732}
733
734func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
735 sc.lockTopicToConsumerChannelMap.Lock()
736 defer sc.lockTopicToConsumerChannelMap.Unlock()
737 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
738 sc.topicToConsumerChannelMap[id] = arg
739 }
740}
741
William Kurkianea869482019-04-09 15:16:11 -0400742func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
743 sc.lockTopicToConsumerChannelMap.RLock()
744 defer sc.lockTopicToConsumerChannelMap.RUnlock()
745
746 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
747 return consumerCh
748 }
749 return nil
750}
751
Neha Sharma96b7bf22020-06-15 10:37:32 +0000752func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
William Kurkianea869482019-04-09 15:16:11 -0400753 sc.lockTopicToConsumerChannelMap.Lock()
754 defer sc.lockTopicToConsumerChannelMap.Unlock()
755 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
756 consumerCh.channels = append(consumerCh.channels, ch)
757 return
758 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000759 logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400760}
761
762//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
Neha Sharma96b7bf22020-06-15 10:37:32 +0000763func closeConsumers(ctx context.Context, consumers []interface{}) error {
William Kurkianea869482019-04-09 15:16:11 -0400764 var err error
765 for _, consumer := range consumers {
766 // Is it a partition consumers?
767 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
768 if errTemp := partionConsumer.Close(); errTemp != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000769 logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
William Kurkianea869482019-04-09 15:16:11 -0400770 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
771 // This can occur on race condition
772 err = nil
773 } else {
774 err = errTemp
775 }
776 }
777 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
778 if errTemp := groupConsumer.Close(); errTemp != nil {
779 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
780 // This can occur on race condition
781 err = nil
782 } else {
783 err = errTemp
784 }
785 }
786 }
787 }
788 return err
789}
790
Neha Sharma96b7bf22020-06-15 10:37:32 +0000791func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
William Kurkianea869482019-04-09 15:16:11 -0400792 sc.lockTopicToConsumerChannelMap.Lock()
793 defer sc.lockTopicToConsumerChannelMap.Unlock()
794 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
795 // Channel will be closed in the removeChannel method
Neha Sharma96b7bf22020-06-15 10:37:32 +0000796 consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
William Kurkianea869482019-04-09 15:16:11 -0400797 // If there are no more channels then we can close the consumers itself
798 if len(consumerCh.channels) == 0 {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000799 logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
800 err := closeConsumers(ctx, consumerCh.consumers)
William Kurkianea869482019-04-09 15:16:11 -0400801 //err := consumerCh.consumers.Close()
802 delete(sc.topicToConsumerChannelMap, topic.Name)
803 return err
804 }
805 return nil
806 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000807 logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400808 return errors.New("topic-does-not-exist")
809}
810
Neha Sharma96b7bf22020-06-15 10:37:32 +0000811func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
William Kurkianea869482019-04-09 15:16:11 -0400812 sc.lockTopicToConsumerChannelMap.Lock()
813 defer sc.lockTopicToConsumerChannelMap.Unlock()
814 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
815 for _, ch := range consumerCh.channels {
816 // Channel will be closed in the removeChannel method
Neha Sharma96b7bf22020-06-15 10:37:32 +0000817 removeChannel(ctx, consumerCh.channels, ch)
William Kurkianea869482019-04-09 15:16:11 -0400818 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000819 err := closeConsumers(ctx, consumerCh.consumers)
William Kurkianea869482019-04-09 15:16:11 -0400820 //if err == sarama.ErrUnknownTopicOrPartition {
821 // // Not an error
822 // err = nil
823 //}
824 //err := consumerCh.consumers.Close()
825 delete(sc.topicToConsumerChannelMap, topic.Name)
826 return err
827 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000828 logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400829 return nil
830}
831
William Kurkianea869482019-04-09 15:16:11 -0400832//createPublisher creates the publisher which is used to send a message onto kafka
Neha Sharma96b7bf22020-06-15 10:37:32 +0000833func (sc *SaramaClient) createPublisher(ctx context.Context) error {
William Kurkianea869482019-04-09 15:16:11 -0400834 // This Creates the publisher
835 config := sarama.NewConfig()
836 config.Producer.Partitioner = sarama.NewRandomPartitioner
837 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
838 config.Producer.Flush.Messages = sc.producerFlushMessages
839 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
840 config.Producer.Return.Errors = sc.producerReturnErrors
841 config.Producer.Return.Successes = sc.producerReturnSuccess
842 //config.Producer.RequiredAcks = sarama.WaitForAll
843 config.Producer.RequiredAcks = sarama.WaitForLocal
844
Neha Sharma3f221ae2020-04-29 19:02:12 +0000845 brokers := []string{sc.KafkaAddress}
William Kurkianea869482019-04-09 15:16:11 -0400846
847 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000848 logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400849 return err
850 } else {
851 sc.producer = producer
852 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000853 logger.Info(ctx, "Kafka-publisher-created")
William Kurkianea869482019-04-09 15:16:11 -0400854 return nil
855}
856
Neha Sharma96b7bf22020-06-15 10:37:32 +0000857func (sc *SaramaClient) createConsumer(ctx context.Context) error {
William Kurkianea869482019-04-09 15:16:11 -0400858 config := sarama.NewConfig()
859 config.Consumer.Return.Errors = true
860 config.Consumer.Fetch.Min = 1
861 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
862 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
863 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Mahir Gunyele77977b2019-06-27 05:36:22 -0700864 config.Metadata.Retry.Max = sc.metadataMaxRetry
Neha Sharma3f221ae2020-04-29 19:02:12 +0000865 brokers := []string{sc.KafkaAddress}
William Kurkianea869482019-04-09 15:16:11 -0400866
867 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000868 logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400869 return err
870 } else {
871 sc.consumer = consumer
872 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000873 logger.Info(ctx, "Kafka-consumers-created")
William Kurkianea869482019-04-09 15:16:11 -0400874 return nil
875}
876
877// createGroupConsumer creates a consumers group
Neha Sharma96b7bf22020-06-15 10:37:32 +0000878func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
William Kurkianea869482019-04-09 15:16:11 -0400879 config := scc.NewConfig()
880 config.ClientID = uuid.New().String()
881 config.Group.Mode = scc.ConsumerModeMultiplex
cbabu95f21522019-11-13 14:25:18 +0100882 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
883 config.Consumer.Return.Errors = true
William Kurkianea869482019-04-09 15:16:11 -0400884 //config.Group.Return.Notifications = false
885 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
886 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
887 config.Consumer.Offsets.Initial = initialOffset
888 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
Neha Sharma3f221ae2020-04-29 19:02:12 +0000889 brokers := []string{sc.KafkaAddress}
William Kurkianea869482019-04-09 15:16:11 -0400890
891 topics := []string{topic.Name}
892 var consumer *scc.Consumer
893 var err error
894
895 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000896 logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400897 return nil, err
898 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000899 logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
William Kurkianea869482019-04-09 15:16:11 -0400900
901 //sc.groupConsumers[topic.Name] = consumer
902 sc.addToGroupConsumers(topic.Name, consumer)
903 return consumer, nil
904}
905
906// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
907// topic via the unique channel each subscriber received during subscription
908func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
909 // Need to go over all channels and publish messages to them - do we need to copy msg?
910 sc.lockTopicToConsumerChannelMap.RLock()
William Kurkianea869482019-04-09 15:16:11 -0400911 for _, ch := range consumerCh.channels {
912 go func(c chan *ic.InterContainerMessage) {
913 c <- protoMessage
914 }(ch)
915 }
npujarec5762e2020-01-01 14:08:48 +0530916 sc.lockTopicToConsumerChannelMap.RUnlock()
917
918 if callback := sc.metadataCallback; callback != nil {
Scott Bakered4a8e72020-04-17 11:10:20 -0700919 ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
920 callback(protoMessage.Header.FromTopic, ts)
npujarec5762e2020-01-01 14:08:48 +0530921 }
William Kurkianea869482019-04-09 15:16:11 -0400922}
923
Neha Sharma96b7bf22020-06-15 10:37:32 +0000924func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
925 logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400926startloop:
927 for {
928 select {
929 case err, ok := <-consumer.Errors():
930 if ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000931 if sc.isLivenessError(ctx, err) {
932 sc.updateLiveness(ctx, false)
933 logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
cbabu116b73f2019-12-10 17:56:32 +0530934 }
William Kurkianea869482019-04-09 15:16:11 -0400935 } else {
936 // Channel is closed
937 break startloop
938 }
939 case msg, ok := <-consumer.Messages():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000940 //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400941 if !ok {
942 // channel is closed
943 break startloop
944 }
945 msgBody := msg.Value
Neha Sharma96b7bf22020-06-15 10:37:32 +0000946 sc.updateLiveness(ctx, true)
947 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400948 icm := &ic.InterContainerMessage{}
949 if err := proto.Unmarshal(msgBody, icm); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000950 logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400951 continue
952 }
953 go sc.dispatchToConsumers(consumerChnls, icm)
954 case <-sc.doneCh:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000955 logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400956 break startloop
957 }
958 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000959 logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
960 sc.setUnhealthy(ctx)
William Kurkianea869482019-04-09 15:16:11 -0400961}
962
Neha Sharma96b7bf22020-06-15 10:37:32 +0000963func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
964 logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400965
966startloop:
967 for {
968 select {
969 case err, ok := <-consumer.Errors():
970 if ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000971 if sc.isLivenessError(ctx, err) {
972 sc.updateLiveness(ctx, false)
Devmalya Pauldd23a992019-11-14 07:06:31 +0000973 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000974 logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
William Kurkianea869482019-04-09 15:16:11 -0400975 } else {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000976 logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400977 // channel is closed
978 break startloop
979 }
980 case msg, ok := <-consumer.Messages():
981 if !ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000982 logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -0400983 // Channel closed
984 break startloop
985 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000986 sc.updateLiveness(ctx, true)
987 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
William Kurkianea869482019-04-09 15:16:11 -0400988 msgBody := msg.Value
989 icm := &ic.InterContainerMessage{}
990 if err := proto.Unmarshal(msgBody, icm); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000991 logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -0400992 continue
993 }
994 go sc.dispatchToConsumers(consumerChnls, icm)
995 consumer.MarkOffset(msg, "")
996 case ntf := <-consumer.Notifications():
Neha Sharma96b7bf22020-06-15 10:37:32 +0000997 logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
William Kurkianea869482019-04-09 15:16:11 -0400998 case <-sc.doneCh:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000999 logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001000 break startloop
1001 }
1002 }
Neha Sharma96b7bf22020-06-15 10:37:32 +00001003 logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
1004 sc.setUnhealthy(ctx)
William Kurkianea869482019-04-09 15:16:11 -04001005}
1006
Neha Sharma96b7bf22020-06-15 10:37:32 +00001007func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
1008 logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001009 var consumerCh *consumerChannels
1010 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001011 logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001012 return errors.New("consumers-not-exist")
1013 }
1014 // For each consumer listening for that topic, start a consumption loop
1015 for _, consumer := range consumerCh.consumers {
1016 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001017 go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
William Kurkianea869482019-04-09 15:16:11 -04001018 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001019 go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
William Kurkianea869482019-04-09 15:16:11 -04001020 } else {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001021 logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
William Kurkianea869482019-04-09 15:16:11 -04001022 return errors.New("invalid-consumer")
1023 }
1024 }
1025 return nil
1026}
1027
1028//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1029//// for that topic. It also starts the routine that listens for messages on that topic.
Neha Sharma96b7bf22020-06-15 10:37:32 +00001030func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
William Kurkianea869482019-04-09 15:16:11 -04001031 var pConsumers []sarama.PartitionConsumer
1032 var err error
1033
Neha Sharma96b7bf22020-06-15 10:37:32 +00001034 if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
1035 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001036 return nil, err
1037 }
1038
1039 consumersIf := make([]interface{}, 0)
1040 for _, pConsumer := range pConsumers {
1041 consumersIf = append(consumersIf, pConsumer)
1042 }
1043
1044 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1045 // unbuffered to verify race conditions.
1046 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1047 cc := &consumerChannels{
1048 consumers: consumersIf,
1049 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1050 }
1051
1052 // Add the consumers channel to the map
1053 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1054
1055 //Start a consumers to listen on that specific topic
Rohan Agrawal02f784d2020-02-14 09:34:02 +00001056 go func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001057 if err := sc.startConsumers(ctx, topic); err != nil {
1058 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +00001059 "topic": topic,
1060 "error": err})
1061 }
1062 }()
William Kurkianea869482019-04-09 15:16:11 -04001063
1064 return consumerListeningChannel, nil
1065}
1066
1067// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1068// for that topic. It also starts the routine that listens for messages on that topic.
Neha Sharma96b7bf22020-06-15 10:37:32 +00001069func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
William Kurkianea869482019-04-09 15:16:11 -04001070 // TODO: Replace this development partition consumers with a group consumers
1071 var pConsumer *scc.Consumer
1072 var err error
Neha Sharma96b7bf22020-06-15 10:37:32 +00001073 if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1074 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001075 return nil, err
1076 }
1077 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1078 // unbuffered to verify race conditions.
1079 consumerListeningChannel := make(chan *ic.InterContainerMessage)
1080 cc := &consumerChannels{
1081 consumers: []interface{}{pConsumer},
1082 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
1083 }
1084
1085 // Add the consumers channel to the map
1086 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1087
1088 //Start a consumers to listen on that specific topic
Rohan Agrawal02f784d2020-02-14 09:34:02 +00001089 go func() {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001090 if err := sc.startConsumers(ctx, topic); err != nil {
1091 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
Rohan Agrawal02f784d2020-02-14 09:34:02 +00001092 "topic": topic,
1093 "error": err})
1094 }
1095 }()
William Kurkianea869482019-04-09 15:16:11 -04001096
1097 return consumerListeningChannel, nil
1098}
1099
Neha Sharma96b7bf22020-06-15 10:37:32 +00001100func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1101 logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001102 partitionList, err := sc.consumer.Partitions(topic.Name)
1103 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001104 logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001105 return nil, err
1106 }
1107
1108 pConsumers := make([]sarama.PartitionConsumer, 0)
1109 for _, partition := range partitionList {
1110 var pConsumer sarama.PartitionConsumer
1111 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001112 logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
William Kurkianea869482019-04-09 15:16:11 -04001113 return nil, err
1114 }
1115 pConsumers = append(pConsumers, pConsumer)
1116 }
1117 return pConsumers, nil
1118}
1119
Neha Sharma96b7bf22020-06-15 10:37:32 +00001120func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
William Kurkianea869482019-04-09 15:16:11 -04001121 var i int
1122 var channel chan *ic.InterContainerMessage
1123 for i, channel = range channels {
1124 if channel == ch {
1125 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1126 close(channel)
Neha Sharma96b7bf22020-06-15 10:37:32 +00001127 logger.Debug(ctx, "channel-closed")
William Kurkianea869482019-04-09 15:16:11 -04001128 return channels[:len(channels)-1]
1129 }
1130 }
1131 return channels
1132}
1133
William Kurkianea869482019-04-09 15:16:11 -04001134func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1135 sc.lockOfGroupConsumers.Lock()
1136 defer sc.lockOfGroupConsumers.Unlock()
1137 if _, exist := sc.groupConsumers[topic]; !exist {
1138 sc.groupConsumers[topic] = consumer
1139 }
1140}
1141
Neha Sharma96b7bf22020-06-15 10:37:32 +00001142func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
William Kurkianea869482019-04-09 15:16:11 -04001143 sc.lockOfGroupConsumers.Lock()
1144 defer sc.lockOfGroupConsumers.Unlock()
1145 if _, exist := sc.groupConsumers[topic]; exist {
1146 consumer := sc.groupConsumers[topic]
1147 delete(sc.groupConsumers, topic)
Matt Jeanneret384d8c92019-05-06 14:27:31 -04001148 if err := consumer.Close(); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +00001149 logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -04001150 return err
1151 }
1152 }
1153 return nil
1154}