blob: 87c7ce4114d7da06fe6c307287059b8dc3c5e514 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Paulc594bb32019-11-06 07:34:27 +000019 "context"
khenaidoo43c82122018-11-22 18:38:28 -050020 "errors"
21 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 "strings"
23 "sync"
24 "time"
25
Scott Bakerf2596722019-09-27 12:39:56 -070026 "github.com/Shopify/sarama"
khenaidoo43c82122018-11-22 18:38:28 -050027 scc "github.com/bsm/sarama-cluster"
Devmalya Paulc594bb32019-11-06 07:34:27 +000028 "github.com/eapache/go-resiliency/breaker"
khenaidoo43c82122018-11-22 18:38:28 -050029 "github.com/golang/protobuf/proto"
Scott Baker504b4802020-04-17 10:12:20 -070030 "github.com/golang/protobuf/ptypes"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050031 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080032 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidoo43c82122018-11-22 18:38:28 -050034)
35
khenaidoo4c1a5bf2018-11-29 15:53:42 -050036// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
37// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
38//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050039type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050040 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050041 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050042}
43
npujar467fe752020-01-16 20:17:45 +053044// static check to ensure SaramaClient implements Client
45var _ Client = &SaramaClient{}
46
khenaidoo43c82122018-11-22 18:38:28 -050047// SaramaClient represents the messaging proxy
48type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050049 cAdmin sarama.ClusterAdmin
Neha Sharmad1387da2020-05-07 20:07:28 +000050 KafkaAddress string
khenaidoo43c82122018-11-22 18:38:28 -050051 producer sarama.AsyncProducer
52 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050053 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040054 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050055 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050056 consumerType int
khenaidooca301322019-01-09 23:06:32 -050057 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050058 producerFlushFrequency int
59 producerFlushMessages int
60 producerFlushMaxmessages int
61 producerRetryMax int
62 producerRetryBackOff time.Duration
63 producerReturnSuccess bool
64 producerReturnErrors bool
65 consumerMaxwait int
66 maxProcessingTime int
67 numPartitions int
68 numReplicas int
69 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050070 doneCh chan int
Scott Baker504b4802020-04-17 10:12:20 -070071 metadataCallback func(fromTopic string, timestamp time.Time)
khenaidoo43c82122018-11-22 18:38:28 -050072 topicToConsumerChannelMap map[string]*consumerChannels
73 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050074 topicLockMap map[string]*sync.RWMutex
75 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053076 metadataMaxRetry int
Scott Bakeree6a0872019-10-29 15:59:52 -070077 alive bool
78 liveness chan bool
79 livenessChannelInterval time.Duration
80 lastLivenessTime time.Time
81 started bool
serkant.uluderya2ae470f2020-01-21 11:13:09 -080082 healthy bool
83 healthiness chan bool
khenaidoo43c82122018-11-22 18:38:28 -050084}
85
86type SaramaClientOption func(*SaramaClient)
87
Neha Sharmad1387da2020-05-07 20:07:28 +000088func Address(address string) SaramaClientOption {
khenaidoo43c82122018-11-22 18:38:28 -050089 return func(args *SaramaClient) {
Neha Sharmad1387da2020-05-07 20:07:28 +000090 args.KafkaAddress = address
khenaidoo43c82122018-11-22 18:38:28 -050091 }
92}
93
khenaidooca301322019-01-09 23:06:32 -050094func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
khenaidoo90847922018-12-03 14:47:51 -0500130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
khenaidoo90847922018-12-03 14:47:51 -0500148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
Abhilash S.L294ff522019-06-26 18:14:33 +0530184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
Scott Bakeree6a0872019-10-29 15:59:52 -0700190func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.livenessChannelInterval = opt
193 }
194}
195
khenaidoo43c82122018-11-22 18:38:28 -0500196func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
197 client := &SaramaClient{
Neha Sharmad1387da2020-05-07 20:07:28 +0000198 KafkaAddress: DefaultKafkaAddress,
khenaidoo43c82122018-11-22 18:38:28 -0500199 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500200 client.consumerType = DefaultConsumerType
201 client.producerFlushFrequency = DefaultProducerFlushFrequency
202 client.producerFlushMessages = DefaultProducerFlushMessages
203 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
204 client.producerReturnErrors = DefaultProducerReturnErrors
205 client.producerReturnSuccess = DefaultProducerReturnSuccess
206 client.producerRetryMax = DefaultProducerRetryMax
207 client.producerRetryBackOff = DefaultProducerRetryBackoff
208 client.consumerMaxwait = DefaultConsumerMaxwait
209 client.maxProcessingTime = DefaultMaxProcessingTime
210 client.numPartitions = DefaultNumberPartitions
211 client.numReplicas = DefaultNumberReplicas
212 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530213 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Bakeree6a0872019-10-29 15:59:52 -0700214 client.livenessChannelInterval = DefaultLivenessChannelInterval
khenaidoo43c82122018-11-22 18:38:28 -0500215
216 for _, option := range opts {
217 option(client)
218 }
219
khenaidooca301322019-01-09 23:06:32 -0500220 client.groupConsumers = make(map[string]*scc.Consumer)
221
khenaidoo43c82122018-11-22 18:38:28 -0500222 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500223 client.topicLockMap = make(map[string]*sync.RWMutex)
224 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500225 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Bakeree6a0872019-10-29 15:59:52 -0700226
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800227 // healthy and alive until proven otherwise
Scott Bakeree6a0872019-10-29 15:59:52 -0700228 client.alive = true
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800229 client.healthy = true
Scott Bakeree6a0872019-10-29 15:59:52 -0700230
khenaidoo43c82122018-11-22 18:38:28 -0500231 return client
232}
233
Rohan Agrawal31f21802020-06-12 05:38:46 +0000234func (sc *SaramaClient) Start(ctx context.Context) error {
235 logger.Info(ctx, "Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500236
237 // Create the Done channel
238 sc.doneCh = make(chan int, 1)
239
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500240 var err error
241
khenaidoob3244212019-08-27 14:32:27 -0400242 // Add a cleanup in case of failure to startup
243 defer func() {
244 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000245 sc.Stop(ctx)
khenaidoob3244212019-08-27 14:32:27 -0400246 }
247 }()
248
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500249 // Create the Cluster Admin
Rohan Agrawal31f21802020-06-12 05:38:46 +0000250 if err = sc.createClusterAdmin(ctx); err != nil {
251 logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500252 return err
253 }
254
khenaidoo43c82122018-11-22 18:38:28 -0500255 // Create the Publisher
Rohan Agrawal31f21802020-06-12 05:38:46 +0000256 if err := sc.createPublisher(ctx); err != nil {
257 logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500258 return err
259 }
260
khenaidooca301322019-01-09 23:06:32 -0500261 if sc.consumerType == DefaultConsumerType {
262 // Create the master consumers
Rohan Agrawal31f21802020-06-12 05:38:46 +0000263 if err := sc.createConsumer(ctx); err != nil {
264 logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
khenaidooca301322019-01-09 23:06:32 -0500265 return err
266 }
khenaidoo43c82122018-11-22 18:38:28 -0500267 }
268
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500269 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500270 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
271
Rohan Agrawal31f21802020-06-12 05:38:46 +0000272 logger.Info(ctx, "kafka-sarama-client-started")
khenaidooca301322019-01-09 23:06:32 -0500273
Scott Bakeree6a0872019-10-29 15:59:52 -0700274 sc.started = true
275
khenaidoo43c82122018-11-22 18:38:28 -0500276 return nil
277}
278
Rohan Agrawal31f21802020-06-12 05:38:46 +0000279func (sc *SaramaClient) Stop(ctx context.Context) {
280 logger.Info(ctx, "stopping-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500281
Scott Bakeree6a0872019-10-29 15:59:52 -0700282 sc.started = false
283
khenaidoo43c82122018-11-22 18:38:28 -0500284 //Send a message over the done channel to close all long running routines
285 sc.doneCh <- 1
286
khenaidoo43c82122018-11-22 18:38:28 -0500287 if sc.producer != nil {
288 if err := sc.producer.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000289 logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500290 }
291 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500292
khenaidoo43c82122018-11-22 18:38:28 -0500293 if sc.consumer != nil {
294 if err := sc.consumer.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000295 logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500296 }
297 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500298
khenaidooca301322019-01-09 23:06:32 -0500299 for key, val := range sc.groupConsumers {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000300 logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
khenaidooca301322019-01-09 23:06:32 -0500301 if err := val.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000302 logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500303 }
304 }
305
306 if sc.cAdmin != nil {
307 if err := sc.cAdmin.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000308 logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500309 }
310 }
311
312 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500313 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500314
Rohan Agrawal31f21802020-06-12 05:38:46 +0000315 logger.Info(ctx, "sarama-client-stopped")
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500316}
317
khenaidooca301322019-01-09 23:06:32 -0500318//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
319// the invoking function must hold the lock
Rohan Agrawal31f21802020-06-12 05:38:46 +0000320func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500321 // Set the topic details
322 topicDetail := &sarama.TopicDetail{}
323 topicDetail.NumPartitions = int32(numPartition)
324 topicDetail.ReplicationFactor = int16(repFactor)
325 topicDetail.ConfigEntries = make(map[string]*string)
326 topicDetails := make(map[string]*sarama.TopicDetail)
327 topicDetails[topic.Name] = topicDetail
328
329 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
330 if err == sarama.ErrTopicAlreadyExists {
331 // Not an error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000332 logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500333 return nil
334 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000335 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500336 return err
337 }
338 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
339 // do so.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000340 logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500341 return nil
342}
343
khenaidooca301322019-01-09 23:06:32 -0500344//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
345// ensure no two go routines are performing operations on the same topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000346func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
khenaidooca301322019-01-09 23:06:32 -0500347 sc.lockTopic(topic)
348 defer sc.unLockTopic(topic)
349
Rohan Agrawal31f21802020-06-12 05:38:46 +0000350 return sc.createTopic(ctx, topic, numPartition, repFactor)
khenaidooca301322019-01-09 23:06:32 -0500351}
352
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500353//DeleteTopic removes a topic from the kafka Broker
Rohan Agrawal31f21802020-06-12 05:38:46 +0000354func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500355 sc.lockTopic(topic)
356 defer sc.unLockTopic(topic)
357
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500358 // Remove the topic from the broker
359 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
360 if err == sarama.ErrUnknownTopicOrPartition {
361 // Not an error as does not exist
Rohan Agrawal31f21802020-06-12 05:38:46 +0000362 logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500363 return nil
364 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000365 logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500366 return err
367 }
368
369 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000370 if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
371 logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500372 return err
373 }
374 return nil
375}
376
377// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
378// messages from that topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000379func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500380 sc.lockTopic(topic)
381 defer sc.unLockTopic(topic)
382
Rohan Agrawal31f21802020-06-12 05:38:46 +0000383 logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500384
385 // If a consumers already exist for that topic then resuse it
386 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000387 logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500388 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500389 ch := make(chan *ic.InterContainerMessage)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000390 sc.addChannelToConsumerChannelMap(ctx, topic, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500391 return ch, nil
392 }
393
394 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500395 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500396 var err error
397
398 // Use the consumerType option to figure out the type of consumer to launch
399 if sc.consumerType == PartitionConsumer {
400 if sc.autoCreateTopic {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000401 if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
402 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500403 return nil, err
404 }
405 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000406 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
407 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500408 return nil, err
409 }
410 } else if sc.consumerType == GroupCustomer {
411 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
412 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500413 //if sc.autoCreateTopic {
414 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000415 // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500416 // return nil, err
417 // }
418 //}
419 //groupId := sc.consumerGroupName
420 groupId := getGroupId(kvArgs...)
421 // Include the group prefix
422 if groupId != "" {
423 groupId = sc.consumerGroupPrefix + groupId
424 } else {
425 // Need to use a unique group Id per topic
426 groupId = sc.consumerGroupPrefix + topic.Name
427 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000428 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
429 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500430 return nil, err
431 }
khenaidooca301322019-01-09 23:06:32 -0500432
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500433 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000434 logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500435 return nil, errors.New("unknown-consumer-type")
436 }
437
438 return consumerListeningChannel, nil
439}
440
441//UnSubscribe unsubscribe a consumer from a given topic
Rohan Agrawal31f21802020-06-12 05:38:46 +0000442func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500443 sc.lockTopic(topic)
444 defer sc.unLockTopic(topic)
445
Rohan Agrawal31f21802020-06-12 05:38:46 +0000446 logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500447 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +0000448 if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
449 logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500450 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000451 if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
452 logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500453 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500454 return err
455}
456
Rohan Agrawal31f21802020-06-12 05:38:46 +0000457func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
npujar467fe752020-01-16 20:17:45 +0530458 sc.metadataCallback = callback
459}
460
Rohan Agrawal31f21802020-06-12 05:38:46 +0000461func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700462 // Post a consistent stream of liveness data to the channel,
463 // so that in a live state, the core does not timeout and
464 // send a forced liveness message. Production of liveness
465 // events to the channel is rate-limited by livenessChannelInterval.
466 if sc.liveness != nil {
467 if sc.alive != alive {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000468 logger.Info(ctx, "update-liveness-channel-because-change")
Scott Bakeree6a0872019-10-29 15:59:52 -0700469 sc.liveness <- alive
470 sc.lastLivenessTime = time.Now()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000471 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000472 logger.Info(ctx, "update-liveness-channel-because-interval")
Scott Bakeree6a0872019-10-29 15:59:52 -0700473 sc.liveness <- alive
474 sc.lastLivenessTime = time.Now()
475 }
476 }
477
478 // Only emit a log message when the state changes
479 if sc.alive != alive {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000480 logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
Scott Bakeree6a0872019-10-29 15:59:52 -0700481 sc.alive = alive
482 }
483}
484
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800485// Once unhealthy, we never go back
Rohan Agrawal31f21802020-06-12 05:38:46 +0000486func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800487 sc.healthy = false
488 if sc.healthiness != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000489 logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800490 sc.healthiness <- sc.healthy
491 }
492}
493
Rohan Agrawal31f21802020-06-12 05:38:46 +0000494func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
Devmalya Paulc594bb32019-11-06 07:34:27 +0000495 // Sarama producers and consumers encapsulate the error inside
496 // a ProducerError or ConsumerError struct.
497 if prodError, ok := err.(*sarama.ProducerError); ok {
498 err = prodError.Err
499 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
500 err = consumerError.Err
501 }
502
503 // Sarama-Cluster will compose the error into a ClusterError struct,
504 // which we can't do a compare by reference. To handle that, we the
505 // best we can do is compare the error strings.
506
507 switch err.Error() {
508 case context.DeadlineExceeded.Error():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000509 logger.Info(ctx, "is-liveness-error-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000510 return true
511 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000512 logger.Info(ctx, "is-liveness-error-no-brokers")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000513 return true
514 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000515 logger.Info(ctx, "is-liveness-error-shutting-down")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000516 return true
517 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000518 logger.Info(ctx, "is-liveness-error-not-available")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000519 return true
520 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000521 logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000522 return true
523 }
524
525 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000526 logger.Info(ctx, "is-liveness-error-connection-refused")
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800527 return true
528 }
529
530 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000531 logger.Info(ctx, "is-liveness-error-io-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000532 return true
533 }
534
535 // Other errors shouldn't trigger a loss of liveness
536
Rohan Agrawal31f21802020-06-12 05:38:46 +0000537 logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000538
539 return false
540}
541
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500542// send formats and sends the request onto the kafka messaging bus.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000543func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500544
545 // Assert message is a proto message
546 var protoMsg proto.Message
547 var ok bool
548 // ascertain the value interface type is a proto.Message
549 if protoMsg, ok = msg.(proto.Message); !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000550 logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000551 return fmt.Errorf("not-a-proto-msg-%s", msg)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500552 }
553
554 var marshalled []byte
555 var err error
556 // Create the Sarama producer message
557 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000558 logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500559 return err
560 }
561 key := ""
562 if len(keys) > 0 {
563 key = keys[0] // Only the first key is relevant
564 }
565 kafkaMsg := &sarama.ProducerMessage{
566 Topic: topic.Name,
567 Key: sarama.StringEncoder(key),
568 Value: sarama.ByteEncoder(marshalled),
569 }
570
571 // Send message to kafka
572 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500573 // Wait for result
574 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
575 select {
576 case ok := <-sc.producer.Successes():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000577 logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
578 sc.updateLiveness(ctx, true)
khenaidoo90847922018-12-03 14:47:51 -0500579 case notOk := <-sc.producer.Errors():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000580 logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
581 if sc.isLivenessError(ctx, notOk) {
582 sc.updateLiveness(ctx, false)
Scott Bakeree6a0872019-10-29 15:59:52 -0700583 }
584 return notOk
585 }
586 return nil
587}
588
589// Enable the liveness monitor channel. This channel will report
590// a "true" or "false" on every publish, which indicates whether
591// or not the channel is still live. This channel is then picked up
592// by the service (i.e. rw_core / ro_core) to update readiness status
593// and/or take other actions.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000594func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
595 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Bakeree6a0872019-10-29 15:59:52 -0700596 if enable {
597 if sc.liveness == nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000598 logger.Info(ctx, "kafka-create-liveness-channel")
Scott Bakeree6a0872019-10-29 15:59:52 -0700599 // At least 1, so we can immediately post to it without blocking
600 // Setting a bigger number (10) allows the monitor to fall behind
601 // without blocking others. The monitor shouldn't really fall
602 // behind...
603 sc.liveness = make(chan bool, 10)
604 // post intial state to the channel
605 sc.liveness <- sc.alive
606 }
607 } else {
608 // TODO: Think about whether we need the ability to turn off
609 // liveness monitoring
610 panic("Turning off liveness reporting is not supported")
611 }
612 return sc.liveness
613}
614
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800615// Enable the Healthiness monitor channel. This channel will report "false"
616// if the kafka consumers die, or some other problem occurs which is
617// catastrophic that would require re-creating the client.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000618func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
619 logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800620 if enable {
621 if sc.healthiness == nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000622 logger.Info(ctx, "kafka-create-healthiness-channel")
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800623 // At least 1, so we can immediately post to it without blocking
624 // Setting a bigger number (10) allows the monitor to fall behind
625 // without blocking others. The monitor shouldn't really fall
626 // behind...
627 sc.healthiness = make(chan bool, 10)
628 // post intial state to the channel
629 sc.healthiness <- sc.healthy
630 }
631 } else {
632 // TODO: Think about whether we need the ability to turn off
633 // liveness monitoring
634 panic("Turning off healthiness reporting is not supported")
635 }
636 return sc.healthiness
637}
638
Scott Bakeree6a0872019-10-29 15:59:52 -0700639// send an empty message on the liveness channel to check whether connectivity has
640// been restored.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000641func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
Scott Bakeree6a0872019-10-29 15:59:52 -0700642 if !sc.started {
643 return fmt.Errorf("SendLiveness() called while not started")
644 }
645
646 kafkaMsg := &sarama.ProducerMessage{
647 Topic: "_liveness_test",
648 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
649 }
650
651 // Send message to kafka
652 sc.producer.Input() <- kafkaMsg
653 // Wait for result
654 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
655 select {
656 case ok := <-sc.producer.Successes():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000657 logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
658 sc.updateLiveness(ctx, true)
Scott Bakeree6a0872019-10-29 15:59:52 -0700659 case notOk := <-sc.producer.Errors():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000660 logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
661 if sc.isLivenessError(ctx, notOk) {
662 sc.updateLiveness(ctx, false)
Scott Bakeree6a0872019-10-29 15:59:52 -0700663 }
khenaidoo90847922018-12-03 14:47:51 -0500664 return notOk
665 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500666 return nil
667}
668
khenaidooca301322019-01-09 23:06:32 -0500669// getGroupId returns the group id from the key-value args.
670func getGroupId(kvArgs ...*KVArg) string {
671 for _, arg := range kvArgs {
672 if arg.Key == GroupIdKey {
673 return arg.Value.(string)
674 }
675 }
676 return ""
677}
678
khenaidoo731697e2019-01-29 16:03:29 -0500679// getOffset returns the offset from the key-value args.
680func getOffset(kvArgs ...*KVArg) int64 {
681 for _, arg := range kvArgs {
682 if arg.Key == Offset {
683 return arg.Value.(int64)
684 }
685 }
686 return sarama.OffsetNewest
687}
688
Rohan Agrawal31f21802020-06-12 05:38:46 +0000689func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500690 config := sarama.NewConfig()
691 config.Version = sarama.V1_0_0_0
692
693 // Create a cluster Admin
694 var cAdmin sarama.ClusterAdmin
695 var err error
Neha Sharmad1387da2020-05-07 20:07:28 +0000696 if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000697 logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500698 return err
699 }
700 sc.cAdmin = cAdmin
701 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500702}
703
khenaidood2b6df92018-12-13 16:37:20 -0500704func (sc *SaramaClient) lockTopic(topic *Topic) {
705 sc.lockOfTopicLockMap.Lock()
706 if _, exist := sc.topicLockMap[topic.Name]; exist {
707 sc.lockOfTopicLockMap.Unlock()
708 sc.topicLockMap[topic.Name].Lock()
709 } else {
710 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
711 sc.lockOfTopicLockMap.Unlock()
712 sc.topicLockMap[topic.Name].Lock()
713 }
714}
715
716func (sc *SaramaClient) unLockTopic(topic *Topic) {
717 sc.lockOfTopicLockMap.Lock()
718 defer sc.lockOfTopicLockMap.Unlock()
719 if _, exist := sc.topicLockMap[topic.Name]; exist {
720 sc.topicLockMap[topic.Name].Unlock()
721 }
722}
723
khenaidoo43c82122018-11-22 18:38:28 -0500724func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
725 sc.lockTopicToConsumerChannelMap.Lock()
726 defer sc.lockTopicToConsumerChannelMap.Unlock()
727 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
728 sc.topicToConsumerChannelMap[id] = arg
729 }
730}
731
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500732func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400733 sc.lockTopicToConsumerChannelMap.RLock()
734 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500735
736 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
737 return consumerCh
738 }
739 return nil
740}
741
Rohan Agrawal31f21802020-06-12 05:38:46 +0000742func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500743 sc.lockTopicToConsumerChannelMap.Lock()
744 defer sc.lockTopicToConsumerChannelMap.Unlock()
745 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
746 consumerCh.channels = append(consumerCh.channels, ch)
747 return
748 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000749 logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500750}
751
752//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
Rohan Agrawal31f21802020-06-12 05:38:46 +0000753func closeConsumers(ctx context.Context, consumers []interface{}) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500754 var err error
755 for _, consumer := range consumers {
756 // Is it a partition consumers?
757 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
758 if errTemp := partionConsumer.Close(); errTemp != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000759 logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500760 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
761 // This can occur on race condition
762 err = nil
763 } else {
764 err = errTemp
765 }
766 }
767 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
768 if errTemp := groupConsumer.Close(); errTemp != nil {
769 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
770 // This can occur on race condition
771 err = nil
772 } else {
773 err = errTemp
774 }
775 }
776 }
777 }
778 return err
khenaidoo43c82122018-11-22 18:38:28 -0500779}
780
Rohan Agrawal31f21802020-06-12 05:38:46 +0000781func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500782 sc.lockTopicToConsumerChannelMap.Lock()
783 defer sc.lockTopicToConsumerChannelMap.Unlock()
784 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
785 // Channel will be closed in the removeChannel method
Rohan Agrawal31f21802020-06-12 05:38:46 +0000786 consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500787 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500788 if len(consumerCh.channels) == 0 {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000789 logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
790 err := closeConsumers(ctx, consumerCh.consumers)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500791 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500792 delete(sc.topicToConsumerChannelMap, topic.Name)
793 return err
794 }
795 return nil
796 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000797 logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500798 return errors.New("topic-does-not-exist")
799}
800
Rohan Agrawal31f21802020-06-12 05:38:46 +0000801func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
khenaidoo43c82122018-11-22 18:38:28 -0500802 sc.lockTopicToConsumerChannelMap.Lock()
803 defer sc.lockTopicToConsumerChannelMap.Unlock()
804 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
805 for _, ch := range consumerCh.channels {
806 // Channel will be closed in the removeChannel method
Rohan Agrawal31f21802020-06-12 05:38:46 +0000807 removeChannel(ctx, consumerCh.channels, ch)
khenaidoo43c82122018-11-22 18:38:28 -0500808 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000809 err := closeConsumers(ctx, consumerCh.consumers)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500810 //if err == sarama.ErrUnknownTopicOrPartition {
811 // // Not an error
812 // err = nil
813 //}
814 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500815 delete(sc.topicToConsumerChannelMap, topic.Name)
816 return err
817 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000818 logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500819 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500820}
821
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500822//createPublisher creates the publisher which is used to send a message onto kafka
Rohan Agrawal31f21802020-06-12 05:38:46 +0000823func (sc *SaramaClient) createPublisher(ctx context.Context) error {
khenaidoo43c82122018-11-22 18:38:28 -0500824 // This Creates the publisher
825 config := sarama.NewConfig()
826 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500827 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
828 config.Producer.Flush.Messages = sc.producerFlushMessages
829 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
830 config.Producer.Return.Errors = sc.producerReturnErrors
831 config.Producer.Return.Successes = sc.producerReturnSuccess
832 //config.Producer.RequiredAcks = sarama.WaitForAll
833 config.Producer.RequiredAcks = sarama.WaitForLocal
834
Neha Sharmad1387da2020-05-07 20:07:28 +0000835 brokers := []string{sc.KafkaAddress}
khenaidoo43c82122018-11-22 18:38:28 -0500836
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500837 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000838 logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500839 return err
840 } else {
841 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500842 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000843 logger.Info(ctx, "Kafka-publisher-created")
khenaidoo43c82122018-11-22 18:38:28 -0500844 return nil
845}
846
Rohan Agrawal31f21802020-06-12 05:38:46 +0000847func (sc *SaramaClient) createConsumer(ctx context.Context) error {
khenaidoo43c82122018-11-22 18:38:28 -0500848 config := sarama.NewConfig()
849 config.Consumer.Return.Errors = true
850 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500851 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
852 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500853 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530854 config.Metadata.Retry.Max = sc.metadataMaxRetry
Neha Sharmad1387da2020-05-07 20:07:28 +0000855 brokers := []string{sc.KafkaAddress}
khenaidoo43c82122018-11-22 18:38:28 -0500856
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500857 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000858 logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500859 return err
860 } else {
861 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500862 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000863 logger.Info(ctx, "Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500864 return nil
865}
866
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500867// createGroupConsumer creates a consumers group
Rohan Agrawal31f21802020-06-12 05:38:46 +0000868func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500869 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500870 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500871 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Bakeree6a0872019-10-29 15:59:52 -0700872 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
873 config.Consumer.Return.Errors = true
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500874 //config.Group.Return.Notifications = false
875 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
876 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500877 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500878 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
Neha Sharmad1387da2020-05-07 20:07:28 +0000879 brokers := []string{sc.KafkaAddress}
khenaidoo43c82122018-11-22 18:38:28 -0500880
khenaidoo43c82122018-11-22 18:38:28 -0500881 topics := []string{topic.Name}
882 var consumer *scc.Consumer
883 var err error
884
khenaidooca301322019-01-09 23:06:32 -0500885 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000886 logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500887 return nil, err
888 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000889 logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500890
891 //sc.groupConsumers[topic.Name] = consumer
892 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500893 return consumer, nil
894}
895
khenaidoo43c82122018-11-22 18:38:28 -0500896// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500897// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500898func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500899 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400900 sc.lockTopicToConsumerChannelMap.RLock()
khenaidoo43c82122018-11-22 18:38:28 -0500901 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500902 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500903 c <- protoMessage
904 }(ch)
905 }
npujar467fe752020-01-16 20:17:45 +0530906 sc.lockTopicToConsumerChannelMap.RUnlock()
907
908 if callback := sc.metadataCallback; callback != nil {
Scott Baker504b4802020-04-17 10:12:20 -0700909 ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
910 callback(protoMessage.Header.FromTopic, ts)
npujar467fe752020-01-16 20:17:45 +0530911 }
khenaidoo43c82122018-11-22 18:38:28 -0500912}
913
Rohan Agrawal31f21802020-06-12 05:38:46 +0000914func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
915 logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500916startloop:
917 for {
918 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500919 case err, ok := <-consumer.Errors():
920 if ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000921 if sc.isLivenessError(ctx, err) {
922 sc.updateLiveness(ctx, false)
923 logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
khenaidoo6e55d9e2019-12-12 18:26:26 -0500924 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500925 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500926 // Channel is closed
927 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500928 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500929 case msg, ok := <-consumer.Messages():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000930 //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500931 if !ok {
932 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500933 break startloop
934 }
935 msgBody := msg.Value
Rohan Agrawal31f21802020-06-12 05:38:46 +0000936 sc.updateLiveness(ctx, true)
937 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo79232702018-12-04 11:00:41 -0500938 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500939 if err := proto.Unmarshal(msgBody, icm); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000940 logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500941 continue
942 }
943 go sc.dispatchToConsumers(consumerChnls, icm)
944 case <-sc.doneCh:
Rohan Agrawal31f21802020-06-12 05:38:46 +0000945 logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500946 break startloop
947 }
948 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000949 logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
950 sc.setUnhealthy(ctx)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500951}
952
Rohan Agrawal31f21802020-06-12 05:38:46 +0000953func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
954 logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500955
956startloop:
957 for {
958 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500959 case err, ok := <-consumer.Errors():
960 if ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000961 if sc.isLivenessError(ctx, err) {
962 sc.updateLiveness(ctx, false)
Devmalya Paulc594bb32019-11-06 07:34:27 +0000963 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000964 logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500965 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000966 logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500967 // channel is closed
968 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500969 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500970 case msg, ok := <-consumer.Messages():
971 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000972 logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500973 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500974 break startloop
975 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000976 sc.updateLiveness(ctx, true)
977 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500978 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500979 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500980 if err := proto.Unmarshal(msgBody, icm); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000981 logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500982 continue
983 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500984 go sc.dispatchToConsumers(consumerChnls, icm)
985 consumer.MarkOffset(msg, "")
986 case ntf := <-consumer.Notifications():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000987 logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500988 case <-sc.doneCh:
Rohan Agrawal31f21802020-06-12 05:38:46 +0000989 logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500990 break startloop
991 }
992 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000993 logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
994 sc.setUnhealthy(ctx)
khenaidoo43c82122018-11-22 18:38:28 -0500995}
996
Rohan Agrawal31f21802020-06-12 05:38:46 +0000997func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
998 logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500999 var consumerCh *consumerChannels
1000 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001001 logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001002 return errors.New("consumers-not-exist")
1003 }
1004 // For each consumer listening for that topic, start a consumption loop
1005 for _, consumer := range consumerCh.consumers {
1006 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001007 go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001008 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001009 go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001010 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001011 logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001012 return errors.New("invalid-consumer")
1013 }
1014 }
1015 return nil
1016}
1017
1018//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1019//// for that topic. It also starts the routine that listens for messages on that topic.
Rohan Agrawal31f21802020-06-12 05:38:46 +00001020func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001021 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -05001022 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001023
Rohan Agrawal31f21802020-06-12 05:38:46 +00001024 if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
1025 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001026 return nil, err
1027 }
1028
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001029 consumersIf := make([]interface{}, 0)
1030 for _, pConsumer := range pConsumers {
1031 consumersIf = append(consumersIf, pConsumer)
1032 }
1033
1034 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -05001035 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001036 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -05001037 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001038 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -05001039 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -05001040 }
1041
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001042 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -05001043 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1044
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001045 //Start a consumers to listen on that specific topic
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001046 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001047 if err := sc.startConsumers(ctx, topic); err != nil {
1048 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001049 "topic": topic,
1050 "error": err})
1051 }
1052 }()
khenaidoo43c82122018-11-22 18:38:28 -05001053
1054 return consumerListeningChannel, nil
1055}
1056
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001057// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1058// for that topic. It also starts the routine that listens for messages on that topic.
Rohan Agrawal31f21802020-06-12 05:38:46 +00001059func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001060 // TODO: Replace this development partition consumers with a group consumers
1061 var pConsumer *scc.Consumer
1062 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +00001063 if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
1064 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001065 return nil, err
1066 }
1067 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1068 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001069 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001070 cc := &consumerChannels{
1071 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -05001072 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001073 }
1074
1075 // Add the consumers channel to the map
1076 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1077
1078 //Start a consumers to listen on that specific topic
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001079 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001080 if err := sc.startConsumers(ctx, topic); err != nil {
1081 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001082 "topic": topic,
1083 "error": err})
1084 }
1085 }()
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001086
1087 return consumerListeningChannel, nil
1088}
1089
Rohan Agrawal31f21802020-06-12 05:38:46 +00001090func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1091 logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001092 partitionList, err := sc.consumer.Partitions(topic.Name)
1093 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001094 logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001095 return nil, err
1096 }
1097
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001098 pConsumers := make([]sarama.PartitionConsumer, 0)
1099 for _, partition := range partitionList {
1100 var pConsumer sarama.PartitionConsumer
1101 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001102 logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001103 return nil, err
1104 }
1105 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -05001106 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001107 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -05001108}
1109
Rohan Agrawal31f21802020-06-12 05:38:46 +00001110func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -05001111 var i int
khenaidoo79232702018-12-04 11:00:41 -05001112 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -05001113 for i, channel = range channels {
1114 if channel == ch {
1115 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1116 close(channel)
Rohan Agrawal31f21802020-06-12 05:38:46 +00001117 logger.Debug(ctx, "channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -05001118 return channels[:len(channels)-1]
1119 }
1120 }
1121 return channels
1122}
khenaidoo7ff26c72019-01-16 14:55:48 -05001123
khenaidoo7ff26c72019-01-16 14:55:48 -05001124func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1125 sc.lockOfGroupConsumers.Lock()
1126 defer sc.lockOfGroupConsumers.Unlock()
1127 if _, exist := sc.groupConsumers[topic]; !exist {
1128 sc.groupConsumers[topic] = consumer
1129 }
1130}
1131
Rohan Agrawal31f21802020-06-12 05:38:46 +00001132func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
khenaidoo7ff26c72019-01-16 14:55:48 -05001133 sc.lockOfGroupConsumers.Lock()
1134 defer sc.lockOfGroupConsumers.Unlock()
1135 if _, exist := sc.groupConsumers[topic]; exist {
1136 consumer := sc.groupConsumers[topic]
1137 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -04001138 if err := consumer.Close(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +00001139 logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -05001140 return err
1141 }
1142 }
1143 return nil
1144}