blob: 468e546ec4222f304dd2a99ab40d454fa34995e5 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Paulc594bb32019-11-06 07:34:27 +000019 "context"
khenaidoo43c82122018-11-22 18:38:28 -050020 "errors"
21 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 "strings"
23 "sync"
24 "time"
25
Scott Bakerf2596722019-09-27 12:39:56 -070026 "github.com/Shopify/sarama"
khenaidoo43c82122018-11-22 18:38:28 -050027 scc "github.com/bsm/sarama-cluster"
Devmalya Paulc594bb32019-11-06 07:34:27 +000028 "github.com/eapache/go-resiliency/breaker"
khenaidoo43c82122018-11-22 18:38:28 -050029 "github.com/golang/protobuf/proto"
Scott Baker504b4802020-04-17 10:12:20 -070030 "github.com/golang/protobuf/ptypes"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050031 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080032 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidoo43c82122018-11-22 18:38:28 -050034)
35
khenaidoo4c1a5bf2018-11-29 15:53:42 -050036// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
37// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
38//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050039type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050040 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050041 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050042}
43
npujar467fe752020-01-16 20:17:45 +053044// static check to ensure SaramaClient implements Client
45var _ Client = &SaramaClient{}
46
khenaidoo43c82122018-11-22 18:38:28 -050047// SaramaClient represents the messaging proxy
48type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050049 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050050 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050054 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040055 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050056 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050057 consumerType int
khenaidooca301322019-01-09 23:06:32 -050058 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050059 producerFlushFrequency int
60 producerFlushMessages int
61 producerFlushMaxmessages int
62 producerRetryMax int
63 producerRetryBackOff time.Duration
64 producerReturnSuccess bool
65 producerReturnErrors bool
66 consumerMaxwait int
67 maxProcessingTime int
68 numPartitions int
69 numReplicas int
70 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050071 doneCh chan int
Scott Baker504b4802020-04-17 10:12:20 -070072 metadataCallback func(fromTopic string, timestamp time.Time)
khenaidoo43c82122018-11-22 18:38:28 -050073 topicToConsumerChannelMap map[string]*consumerChannels
74 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050075 topicLockMap map[string]*sync.RWMutex
76 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053077 metadataMaxRetry int
Scott Bakeree6a0872019-10-29 15:59:52 -070078 alive bool
79 liveness chan bool
80 livenessChannelInterval time.Duration
81 lastLivenessTime time.Time
82 started bool
serkant.uluderya2ae470f2020-01-21 11:13:09 -080083 healthy bool
84 healthiness chan bool
khenaidoo43c82122018-11-22 18:38:28 -050085}
86
87type SaramaClientOption func(*SaramaClient)
88
89func Host(host string) SaramaClientOption {
90 return func(args *SaramaClient) {
91 args.KafkaHost = host
92 }
93}
94
95func Port(port int) SaramaClientOption {
96 return func(args *SaramaClient) {
97 args.KafkaPort = port
98 }
99}
100
khenaidooca301322019-01-09 23:06:32 -0500101func ConsumerGroupPrefix(prefix string) SaramaClientOption {
102 return func(args *SaramaClient) {
103 args.consumerGroupPrefix = prefix
104 }
105}
106
107func ConsumerGroupName(name string) SaramaClientOption {
108 return func(args *SaramaClient) {
109 args.consumerGroupName = name
110 }
111}
112
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500113func ConsumerType(consumer int) SaramaClientOption {
114 return func(args *SaramaClient) {
115 args.consumerType = consumer
116 }
117}
118
119func ProducerFlushFrequency(frequency int) SaramaClientOption {
120 return func(args *SaramaClient) {
121 args.producerFlushFrequency = frequency
122 }
123}
124
125func ProducerFlushMessages(num int) SaramaClientOption {
126 return func(args *SaramaClient) {
127 args.producerFlushMessages = num
128 }
129}
130
131func ProducerFlushMaxMessages(num int) SaramaClientOption {
132 return func(args *SaramaClient) {
133 args.producerFlushMaxmessages = num
134 }
135}
136
khenaidoo90847922018-12-03 14:47:51 -0500137func ProducerMaxRetries(num int) SaramaClientOption {
138 return func(args *SaramaClient) {
139 args.producerRetryMax = num
140 }
141}
142
143func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
144 return func(args *SaramaClient) {
145 args.producerRetryBackOff = duration
146 }
147}
148
149func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500150 return func(args *SaramaClient) {
151 args.producerReturnErrors = opt
152 }
153}
154
khenaidoo90847922018-12-03 14:47:51 -0500155func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500156 return func(args *SaramaClient) {
157 args.producerReturnSuccess = opt
158 }
159}
160
161func ConsumerMaxWait(wait int) SaramaClientOption {
162 return func(args *SaramaClient) {
163 args.consumerMaxwait = wait
164 }
165}
166
167func MaxProcessingTime(pTime int) SaramaClientOption {
168 return func(args *SaramaClient) {
169 args.maxProcessingTime = pTime
170 }
171}
172
173func NumPartitions(number int) SaramaClientOption {
174 return func(args *SaramaClient) {
175 args.numPartitions = number
176 }
177}
178
179func NumReplicas(number int) SaramaClientOption {
180 return func(args *SaramaClient) {
181 args.numReplicas = number
182 }
183}
184
185func AutoCreateTopic(opt bool) SaramaClientOption {
186 return func(args *SaramaClient) {
187 args.autoCreateTopic = opt
188 }
189}
190
Abhilash S.L294ff522019-06-26 18:14:33 +0530191func MetadatMaxRetries(retry int) SaramaClientOption {
192 return func(args *SaramaClient) {
193 args.metadataMaxRetry = retry
194 }
195}
196
Scott Bakeree6a0872019-10-29 15:59:52 -0700197func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
198 return func(args *SaramaClient) {
199 args.livenessChannelInterval = opt
200 }
201}
202
khenaidoo43c82122018-11-22 18:38:28 -0500203func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
204 client := &SaramaClient{
205 KafkaHost: DefaultKafkaHost,
206 KafkaPort: DefaultKafkaPort,
207 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500208 client.consumerType = DefaultConsumerType
209 client.producerFlushFrequency = DefaultProducerFlushFrequency
210 client.producerFlushMessages = DefaultProducerFlushMessages
211 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
212 client.producerReturnErrors = DefaultProducerReturnErrors
213 client.producerReturnSuccess = DefaultProducerReturnSuccess
214 client.producerRetryMax = DefaultProducerRetryMax
215 client.producerRetryBackOff = DefaultProducerRetryBackoff
216 client.consumerMaxwait = DefaultConsumerMaxwait
217 client.maxProcessingTime = DefaultMaxProcessingTime
218 client.numPartitions = DefaultNumberPartitions
219 client.numReplicas = DefaultNumberReplicas
220 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530221 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Bakeree6a0872019-10-29 15:59:52 -0700222 client.livenessChannelInterval = DefaultLivenessChannelInterval
khenaidoo43c82122018-11-22 18:38:28 -0500223
224 for _, option := range opts {
225 option(client)
226 }
227
khenaidooca301322019-01-09 23:06:32 -0500228 client.groupConsumers = make(map[string]*scc.Consumer)
229
khenaidoo43c82122018-11-22 18:38:28 -0500230 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500231 client.topicLockMap = make(map[string]*sync.RWMutex)
232 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500233 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Bakeree6a0872019-10-29 15:59:52 -0700234
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800235 // healthy and alive until proven otherwise
Scott Bakeree6a0872019-10-29 15:59:52 -0700236 client.alive = true
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800237 client.healthy = true
Scott Bakeree6a0872019-10-29 15:59:52 -0700238
khenaidoo43c82122018-11-22 18:38:28 -0500239 return client
240}
241
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500242func (sc *SaramaClient) Start() error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800243 logger.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500244
245 // Create the Done channel
246 sc.doneCh = make(chan int, 1)
247
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500248 var err error
249
khenaidoob3244212019-08-27 14:32:27 -0400250 // Add a cleanup in case of failure to startup
251 defer func() {
252 if err != nil {
253 sc.Stop()
254 }
255 }()
256
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500257 // Create the Cluster Admin
258 if err = sc.createClusterAdmin(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800259 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500260 return err
261 }
262
khenaidoo43c82122018-11-22 18:38:28 -0500263 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500264 if err := sc.createPublisher(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800265 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500266 return err
267 }
268
khenaidooca301322019-01-09 23:06:32 -0500269 if sc.consumerType == DefaultConsumerType {
270 // Create the master consumers
271 if err := sc.createConsumer(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800272 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
khenaidooca301322019-01-09 23:06:32 -0500273 return err
274 }
khenaidoo43c82122018-11-22 18:38:28 -0500275 }
276
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500277 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500278 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
279
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800280 logger.Info("kafka-sarama-client-started")
khenaidooca301322019-01-09 23:06:32 -0500281
Scott Bakeree6a0872019-10-29 15:59:52 -0700282 sc.started = true
283
khenaidoo43c82122018-11-22 18:38:28 -0500284 return nil
285}
286
287func (sc *SaramaClient) Stop() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800288 logger.Info("stopping-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500289
Scott Bakeree6a0872019-10-29 15:59:52 -0700290 sc.started = false
291
khenaidoo43c82122018-11-22 18:38:28 -0500292 //Send a message over the done channel to close all long running routines
293 sc.doneCh <- 1
294
khenaidoo43c82122018-11-22 18:38:28 -0500295 if sc.producer != nil {
296 if err := sc.producer.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800297 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500298 }
299 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500300
khenaidoo43c82122018-11-22 18:38:28 -0500301 if sc.consumer != nil {
302 if err := sc.consumer.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800303 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500304 }
305 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500306
khenaidooca301322019-01-09 23:06:32 -0500307 for key, val := range sc.groupConsumers {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800308 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
khenaidooca301322019-01-09 23:06:32 -0500309 if err := val.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800310 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500311 }
312 }
313
314 if sc.cAdmin != nil {
315 if err := sc.cAdmin.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800316 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500317 }
318 }
319
320 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500321 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500322
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800323 logger.Info("sarama-client-stopped")
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500324}
325
khenaidooca301322019-01-09 23:06:32 -0500326//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
327// the invoking function must hold the lock
328func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500329 // Set the topic details
330 topicDetail := &sarama.TopicDetail{}
331 topicDetail.NumPartitions = int32(numPartition)
332 topicDetail.ReplicationFactor = int16(repFactor)
333 topicDetail.ConfigEntries = make(map[string]*string)
334 topicDetails := make(map[string]*sarama.TopicDetail)
335 topicDetails[topic.Name] = topicDetail
336
337 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
338 if err == sarama.ErrTopicAlreadyExists {
339 // Not an error
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800340 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500341 return nil
342 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800343 logger.Errorw("create-topic-failure", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500344 return err
345 }
346 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
347 // do so.
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800348 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500349 return nil
350}
351
khenaidooca301322019-01-09 23:06:32 -0500352//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
353// ensure no two go routines are performing operations on the same topic
354func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
355 sc.lockTopic(topic)
356 defer sc.unLockTopic(topic)
357
358 return sc.createTopic(topic, numPartition, repFactor)
359}
360
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500361//DeleteTopic removes a topic from the kafka Broker
362func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500363 sc.lockTopic(topic)
364 defer sc.unLockTopic(topic)
365
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500366 // Remove the topic from the broker
367 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
368 if err == sarama.ErrUnknownTopicOrPartition {
369 // Not an error as does not exist
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800370 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500371 return nil
372 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800373 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500374 return err
375 }
376
377 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
378 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800379 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500380 return err
381 }
382 return nil
383}
384
385// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
386// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500387func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500388 sc.lockTopic(topic)
389 defer sc.unLockTopic(topic)
390
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800391 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500392
393 // If a consumers already exist for that topic then resuse it
394 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800395 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500396 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500397 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500398 sc.addChannelToConsumerChannelMap(topic, ch)
399 return ch, nil
400 }
401
402 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500403 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500404 var err error
405
406 // Use the consumerType option to figure out the type of consumer to launch
407 if sc.consumerType == PartitionConsumer {
408 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500409 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800410 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500411 return nil, err
412 }
413 }
khenaidoo731697e2019-01-29 16:03:29 -0500414 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800415 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500416 return nil, err
417 }
418 } else if sc.consumerType == GroupCustomer {
419 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
420 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500421 //if sc.autoCreateTopic {
422 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800423 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500424 // return nil, err
425 // }
426 //}
427 //groupId := sc.consumerGroupName
428 groupId := getGroupId(kvArgs...)
429 // Include the group prefix
430 if groupId != "" {
431 groupId = sc.consumerGroupPrefix + groupId
432 } else {
433 // Need to use a unique group Id per topic
434 groupId = sc.consumerGroupPrefix + topic.Name
435 }
khenaidoo731697e2019-01-29 16:03:29 -0500436 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800437 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500438 return nil, err
439 }
khenaidooca301322019-01-09 23:06:32 -0500440
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500441 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800442 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500443 return nil, errors.New("unknown-consumer-type")
444 }
445
446 return consumerListeningChannel, nil
447}
448
449//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500450func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500451 sc.lockTopic(topic)
452 defer sc.unLockTopic(topic)
453
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800454 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500455 var err error
456 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800457 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500458 }
459 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800460 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500461 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500462 return err
463}
464
Scott Baker504b4802020-04-17 10:12:20 -0700465func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp time.Time)) {
npujar467fe752020-01-16 20:17:45 +0530466 sc.metadataCallback = callback
467}
468
Scott Bakeree6a0872019-10-29 15:59:52 -0700469func (sc *SaramaClient) updateLiveness(alive bool) {
470 // Post a consistent stream of liveness data to the channel,
471 // so that in a live state, the core does not timeout and
472 // send a forced liveness message. Production of liveness
473 // events to the channel is rate-limited by livenessChannelInterval.
474 if sc.liveness != nil {
475 if sc.alive != alive {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800476 logger.Info("update-liveness-channel-because-change")
Scott Bakeree6a0872019-10-29 15:59:52 -0700477 sc.liveness <- alive
478 sc.lastLivenessTime = time.Now()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000479 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800480 logger.Info("update-liveness-channel-because-interval")
Scott Bakeree6a0872019-10-29 15:59:52 -0700481 sc.liveness <- alive
482 sc.lastLivenessTime = time.Now()
483 }
484 }
485
486 // Only emit a log message when the state changes
487 if sc.alive != alive {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800488 logger.Info("set-client-alive", log.Fields{"alive": alive})
Scott Bakeree6a0872019-10-29 15:59:52 -0700489 sc.alive = alive
490 }
491}
492
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800493// Once unhealthy, we never go back
494func (sc *SaramaClient) setUnhealthy() {
495 sc.healthy = false
496 if sc.healthiness != nil {
497 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
498 sc.healthiness <- sc.healthy
499 }
500}
501
Devmalya Paulc594bb32019-11-06 07:34:27 +0000502func (sc *SaramaClient) isLivenessError(err error) bool {
503 // Sarama producers and consumers encapsulate the error inside
504 // a ProducerError or ConsumerError struct.
505 if prodError, ok := err.(*sarama.ProducerError); ok {
506 err = prodError.Err
507 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
508 err = consumerError.Err
509 }
510
511 // Sarama-Cluster will compose the error into a ClusterError struct,
512 // which we can't do a compare by reference. To handle that, we the
513 // best we can do is compare the error strings.
514
515 switch err.Error() {
516 case context.DeadlineExceeded.Error():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800517 logger.Info("is-liveness-error-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000518 return true
519 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800520 logger.Info("is-liveness-error-no-brokers")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000521 return true
522 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800523 logger.Info("is-liveness-error-shutting-down")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000524 return true
525 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800526 logger.Info("is-liveness-error-not-available")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000527 return true
528 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800529 logger.Info("is-liveness-error-circuit-breaker-open")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000530 return true
531 }
532
533 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800534 logger.Info("is-liveness-error-connection-refused")
535 return true
536 }
537
538 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
539 logger.Info("is-liveness-error-io-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000540 return true
541 }
542
543 // Other errors shouldn't trigger a loss of liveness
544
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800545 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000546
547 return false
548}
549
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500550// send formats and sends the request onto the kafka messaging bus.
551func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
552
553 // Assert message is a proto message
554 var protoMsg proto.Message
555 var ok bool
556 // ascertain the value interface type is a proto.Message
557 if protoMsg, ok = msg.(proto.Message); !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800558 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000559 return fmt.Errorf("not-a-proto-msg-%s", msg)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500560 }
561
562 var marshalled []byte
563 var err error
564 // Create the Sarama producer message
565 if marshalled, err = proto.Marshal(protoMsg); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800566 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500567 return err
568 }
569 key := ""
570 if len(keys) > 0 {
571 key = keys[0] // Only the first key is relevant
572 }
573 kafkaMsg := &sarama.ProducerMessage{
574 Topic: topic.Name,
575 Key: sarama.StringEncoder(key),
576 Value: sarama.ByteEncoder(marshalled),
577 }
578
579 // Send message to kafka
580 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500581 // Wait for result
582 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
583 select {
584 case ok := <-sc.producer.Successes():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800585 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Bakeree6a0872019-10-29 15:59:52 -0700586 sc.updateLiveness(true)
khenaidoo90847922018-12-03 14:47:51 -0500587 case notOk := <-sc.producer.Errors():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800588 logger.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000589 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700590 sc.updateLiveness(false)
591 }
592 return notOk
593 }
594 return nil
595}
596
597// Enable the liveness monitor channel. This channel will report
598// a "true" or "false" on every publish, which indicates whether
599// or not the channel is still live. This channel is then picked up
600// by the service (i.e. rw_core / ro_core) to update readiness status
601// and/or take other actions.
602func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800603 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Bakeree6a0872019-10-29 15:59:52 -0700604 if enable {
605 if sc.liveness == nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800606 logger.Info("kafka-create-liveness-channel")
Scott Bakeree6a0872019-10-29 15:59:52 -0700607 // At least 1, so we can immediately post to it without blocking
608 // Setting a bigger number (10) allows the monitor to fall behind
609 // without blocking others. The monitor shouldn't really fall
610 // behind...
611 sc.liveness = make(chan bool, 10)
612 // post intial state to the channel
613 sc.liveness <- sc.alive
614 }
615 } else {
616 // TODO: Think about whether we need the ability to turn off
617 // liveness monitoring
618 panic("Turning off liveness reporting is not supported")
619 }
620 return sc.liveness
621}
622
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800623// Enable the Healthiness monitor channel. This channel will report "false"
624// if the kafka consumers die, or some other problem occurs which is
625// catastrophic that would require re-creating the client.
626func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
627 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
628 if enable {
629 if sc.healthiness == nil {
630 logger.Info("kafka-create-healthiness-channel")
631 // At least 1, so we can immediately post to it without blocking
632 // Setting a bigger number (10) allows the monitor to fall behind
633 // without blocking others. The monitor shouldn't really fall
634 // behind...
635 sc.healthiness = make(chan bool, 10)
636 // post intial state to the channel
637 sc.healthiness <- sc.healthy
638 }
639 } else {
640 // TODO: Think about whether we need the ability to turn off
641 // liveness monitoring
642 panic("Turning off healthiness reporting is not supported")
643 }
644 return sc.healthiness
645}
646
Scott Bakeree6a0872019-10-29 15:59:52 -0700647// send an empty message on the liveness channel to check whether connectivity has
648// been restored.
649func (sc *SaramaClient) SendLiveness() error {
650 if !sc.started {
651 return fmt.Errorf("SendLiveness() called while not started")
652 }
653
654 kafkaMsg := &sarama.ProducerMessage{
655 Topic: "_liveness_test",
656 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
657 }
658
659 // Send message to kafka
660 sc.producer.Input() <- kafkaMsg
661 // Wait for result
662 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
663 select {
664 case ok := <-sc.producer.Successes():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800665 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
Scott Bakeree6a0872019-10-29 15:59:52 -0700666 sc.updateLiveness(true)
667 case notOk := <-sc.producer.Errors():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800668 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000669 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700670 sc.updateLiveness(false)
671 }
khenaidoo90847922018-12-03 14:47:51 -0500672 return notOk
673 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500674 return nil
675}
676
khenaidooca301322019-01-09 23:06:32 -0500677// getGroupId returns the group id from the key-value args.
678func getGroupId(kvArgs ...*KVArg) string {
679 for _, arg := range kvArgs {
680 if arg.Key == GroupIdKey {
681 return arg.Value.(string)
682 }
683 }
684 return ""
685}
686
khenaidoo731697e2019-01-29 16:03:29 -0500687// getOffset returns the offset from the key-value args.
688func getOffset(kvArgs ...*KVArg) int64 {
689 for _, arg := range kvArgs {
690 if arg.Key == Offset {
691 return arg.Value.(int64)
692 }
693 }
694 return sarama.OffsetNewest
695}
696
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500697func (sc *SaramaClient) createClusterAdmin() error {
698 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
699 config := sarama.NewConfig()
700 config.Version = sarama.V1_0_0_0
701
702 // Create a cluster Admin
703 var cAdmin sarama.ClusterAdmin
704 var err error
705 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800706 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500707 return err
708 }
709 sc.cAdmin = cAdmin
710 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500711}
712
khenaidood2b6df92018-12-13 16:37:20 -0500713func (sc *SaramaClient) lockTopic(topic *Topic) {
714 sc.lockOfTopicLockMap.Lock()
715 if _, exist := sc.topicLockMap[topic.Name]; exist {
716 sc.lockOfTopicLockMap.Unlock()
717 sc.topicLockMap[topic.Name].Lock()
718 } else {
719 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
720 sc.lockOfTopicLockMap.Unlock()
721 sc.topicLockMap[topic.Name].Lock()
722 }
723}
724
725func (sc *SaramaClient) unLockTopic(topic *Topic) {
726 sc.lockOfTopicLockMap.Lock()
727 defer sc.lockOfTopicLockMap.Unlock()
728 if _, exist := sc.topicLockMap[topic.Name]; exist {
729 sc.topicLockMap[topic.Name].Unlock()
730 }
731}
732
khenaidoo43c82122018-11-22 18:38:28 -0500733func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
734 sc.lockTopicToConsumerChannelMap.Lock()
735 defer sc.lockTopicToConsumerChannelMap.Unlock()
736 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
737 sc.topicToConsumerChannelMap[id] = arg
738 }
739}
740
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500741func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400742 sc.lockTopicToConsumerChannelMap.RLock()
743 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500744
745 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
746 return consumerCh
747 }
748 return nil
749}
750
khenaidoo79232702018-12-04 11:00:41 -0500751func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500752 sc.lockTopicToConsumerChannelMap.Lock()
753 defer sc.lockTopicToConsumerChannelMap.Unlock()
754 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
755 consumerCh.channels = append(consumerCh.channels, ch)
756 return
757 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800758 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500759}
760
761//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
762func closeConsumers(consumers []interface{}) error {
763 var err error
764 for _, consumer := range consumers {
765 // Is it a partition consumers?
766 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
767 if errTemp := partionConsumer.Close(); errTemp != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800768 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500769 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
770 // This can occur on race condition
771 err = nil
772 } else {
773 err = errTemp
774 }
775 }
776 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
777 if errTemp := groupConsumer.Close(); errTemp != nil {
778 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
779 // This can occur on race condition
780 err = nil
781 } else {
782 err = errTemp
783 }
784 }
785 }
786 }
787 return err
khenaidoo43c82122018-11-22 18:38:28 -0500788}
789
khenaidoo79232702018-12-04 11:00:41 -0500790func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500791 sc.lockTopicToConsumerChannelMap.Lock()
792 defer sc.lockTopicToConsumerChannelMap.Unlock()
793 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
794 // Channel will be closed in the removeChannel method
795 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500796 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500797 if len(consumerCh.channels) == 0 {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800798 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500799 err := closeConsumers(consumerCh.consumers)
800 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500801 delete(sc.topicToConsumerChannelMap, topic.Name)
802 return err
803 }
804 return nil
805 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800806 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500807 return errors.New("topic-does-not-exist")
808}
809
810func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
811 sc.lockTopicToConsumerChannelMap.Lock()
812 defer sc.lockTopicToConsumerChannelMap.Unlock()
813 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
814 for _, ch := range consumerCh.channels {
815 // Channel will be closed in the removeChannel method
816 removeChannel(consumerCh.channels, ch)
817 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500818 err := closeConsumers(consumerCh.consumers)
819 //if err == sarama.ErrUnknownTopicOrPartition {
820 // // Not an error
821 // err = nil
822 //}
823 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500824 delete(sc.topicToConsumerChannelMap, topic.Name)
825 return err
826 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800827 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500828 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500829}
830
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500831//createPublisher creates the publisher which is used to send a message onto kafka
832func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500833 // This Creates the publisher
834 config := sarama.NewConfig()
835 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500836 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
837 config.Producer.Flush.Messages = sc.producerFlushMessages
838 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
839 config.Producer.Return.Errors = sc.producerReturnErrors
840 config.Producer.Return.Successes = sc.producerReturnSuccess
841 //config.Producer.RequiredAcks = sarama.WaitForAll
842 config.Producer.RequiredAcks = sarama.WaitForLocal
843
khenaidoo43c82122018-11-22 18:38:28 -0500844 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
845 brokers := []string{kafkaFullAddr}
846
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500847 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800848 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500849 return err
850 } else {
851 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500852 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800853 logger.Info("Kafka-publisher-created")
khenaidoo43c82122018-11-22 18:38:28 -0500854 return nil
855}
856
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500857func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500858 config := sarama.NewConfig()
859 config.Consumer.Return.Errors = true
860 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500861 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
862 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500863 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530864 config.Metadata.Retry.Max = sc.metadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500865 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
866 brokers := []string{kafkaFullAddr}
867
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500868 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800869 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500870 return err
871 } else {
872 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500873 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800874 logger.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500875 return nil
876}
877
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500878// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500879func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500880 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500881 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500882 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Bakeree6a0872019-10-29 15:59:52 -0700883 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
884 config.Consumer.Return.Errors = true
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500885 //config.Group.Return.Notifications = false
886 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
887 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500888 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500889 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500890 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
891 brokers := []string{kafkaFullAddr}
892
khenaidoo43c82122018-11-22 18:38:28 -0500893 topics := []string{topic.Name}
894 var consumer *scc.Consumer
895 var err error
896
khenaidooca301322019-01-09 23:06:32 -0500897 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800898 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500899 return nil, err
900 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800901 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500902
903 //sc.groupConsumers[topic.Name] = consumer
904 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500905 return consumer, nil
906}
907
khenaidoo43c82122018-11-22 18:38:28 -0500908// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500909// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500910func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500911 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400912 sc.lockTopicToConsumerChannelMap.RLock()
khenaidoo43c82122018-11-22 18:38:28 -0500913 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500914 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500915 c <- protoMessage
916 }(ch)
917 }
npujar467fe752020-01-16 20:17:45 +0530918 sc.lockTopicToConsumerChannelMap.RUnlock()
919
920 if callback := sc.metadataCallback; callback != nil {
Scott Baker504b4802020-04-17 10:12:20 -0700921 ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
922 callback(protoMessage.Header.FromTopic, ts)
npujar467fe752020-01-16 20:17:45 +0530923 }
khenaidoo43c82122018-11-22 18:38:28 -0500924}
925
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500926func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800927 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500928startloop:
929 for {
930 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500931 case err, ok := <-consumer.Errors():
932 if ok {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500933 if sc.isLivenessError(err) {
934 sc.updateLiveness(false)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800935 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
khenaidoo6e55d9e2019-12-12 18:26:26 -0500936 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500937 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500938 // Channel is closed
939 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500940 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500941 case msg, ok := <-consumer.Messages():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800942 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500943 if !ok {
944 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500945 break startloop
946 }
947 msgBody := msg.Value
khenaidoo6e55d9e2019-12-12 18:26:26 -0500948 sc.updateLiveness(true)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800949 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo79232702018-12-04 11:00:41 -0500950 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500951 if err := proto.Unmarshal(msgBody, icm); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800952 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500953 continue
954 }
955 go sc.dispatchToConsumers(consumerChnls, icm)
956 case <-sc.doneCh:
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800957 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500958 break startloop
959 }
960 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800961 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
962 sc.setUnhealthy()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500963}
964
965func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800966 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500967
968startloop:
969 for {
970 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500971 case err, ok := <-consumer.Errors():
972 if ok {
Devmalya Paulc594bb32019-11-06 07:34:27 +0000973 if sc.isLivenessError(err) {
974 sc.updateLiveness(false)
975 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800976 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500977 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800978 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500979 // channel is closed
980 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500981 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500982 case msg, ok := <-consumer.Messages():
983 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800984 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500985 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500986 break startloop
987 }
Scott Bakeree6a0872019-10-29 15:59:52 -0700988 sc.updateLiveness(true)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800989 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500990 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500991 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500992 if err := proto.Unmarshal(msgBody, icm); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800993 logger.Warnw("invalid-message", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500994 continue
995 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500996 go sc.dispatchToConsumers(consumerChnls, icm)
997 consumer.MarkOffset(msg, "")
998 case ntf := <-consumer.Notifications():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800999 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -05001000 case <-sc.doneCh:
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001001 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001002 break startloop
1003 }
1004 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001005 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
1006 sc.setUnhealthy()
khenaidoo43c82122018-11-22 18:38:28 -05001007}
1008
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001009func (sc *SaramaClient) startConsumers(topic *Topic) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001010 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001011 var consumerCh *consumerChannels
1012 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001013 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001014 return errors.New("consumers-not-exist")
1015 }
1016 // For each consumer listening for that topic, start a consumption loop
1017 for _, consumer := range consumerCh.consumers {
1018 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1019 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1020 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1021 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1022 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001023 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001024 return errors.New("invalid-consumer")
1025 }
1026 }
1027 return nil
1028}
1029
1030//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1031//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -05001032func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001033 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -05001034 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001035
khenaidoo7ff26c72019-01-16 14:55:48 -05001036 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001037 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001038 return nil, err
1039 }
1040
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001041 consumersIf := make([]interface{}, 0)
1042 for _, pConsumer := range pConsumers {
1043 consumersIf = append(consumersIf, pConsumer)
1044 }
1045
1046 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -05001047 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001048 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -05001049 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001050 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -05001051 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -05001052 }
1053
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001054 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -05001055 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1056
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001057 //Start a consumers to listen on that specific topic
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001058 go func() {
1059 if err := sc.startConsumers(topic); err != nil {
1060 logger.Errorw("start-consumers-failed", log.Fields{
1061 "topic": topic,
1062 "error": err})
1063 }
1064 }()
khenaidoo43c82122018-11-22 18:38:28 -05001065
1066 return consumerListeningChannel, nil
1067}
1068
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001069// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1070// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -05001071func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001072 // TODO: Replace this development partition consumers with a group consumers
1073 var pConsumer *scc.Consumer
1074 var err error
khenaidoo731697e2019-01-29 16:03:29 -05001075 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001076 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001077 return nil, err
1078 }
1079 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1080 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001081 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001082 cc := &consumerChannels{
1083 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -05001084 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001085 }
1086
1087 // Add the consumers channel to the map
1088 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1089
1090 //Start a consumers to listen on that specific topic
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +00001091 go func() {
1092 if err := sc.startConsumers(topic); err != nil {
1093 logger.Errorw("start-consumers-failed", log.Fields{
1094 "topic": topic,
1095 "error": err})
1096 }
1097 }()
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001098
1099 return consumerListeningChannel, nil
1100}
1101
khenaidoo7ff26c72019-01-16 14:55:48 -05001102func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001103 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001104 partitionList, err := sc.consumer.Partitions(topic.Name)
1105 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001106 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001107 return nil, err
1108 }
1109
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001110 pConsumers := make([]sarama.PartitionConsumer, 0)
1111 for _, partition := range partitionList {
1112 var pConsumer sarama.PartitionConsumer
1113 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001114 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001115 return nil, err
1116 }
1117 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -05001118 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001119 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -05001120}
1121
khenaidoo79232702018-12-04 11:00:41 -05001122func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -05001123 var i int
khenaidoo79232702018-12-04 11:00:41 -05001124 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -05001125 for i, channel = range channels {
1126 if channel == ch {
1127 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1128 close(channel)
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001129 logger.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -05001130 return channels[:len(channels)-1]
1131 }
1132 }
1133 return channels
1134}
khenaidoo7ff26c72019-01-16 14:55:48 -05001135
khenaidoo7ff26c72019-01-16 14:55:48 -05001136func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1137 sc.lockOfGroupConsumers.Lock()
1138 defer sc.lockOfGroupConsumers.Unlock()
1139 if _, exist := sc.groupConsumers[topic]; !exist {
1140 sc.groupConsumers[topic] = consumer
1141 }
1142}
1143
1144func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1145 sc.lockOfGroupConsumers.Lock()
1146 defer sc.lockOfGroupConsumers.Unlock()
1147 if _, exist := sc.groupConsumers[topic]; exist {
1148 consumer := sc.groupConsumers[topic]
1149 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -04001150 if err := consumer.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001151 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -05001152 return err
1153 }
1154 }
1155 return nil
1156}