blob: 9d4ab5209848b523ab625f0e1b574b202462e6df [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Paulc594bb32019-11-06 07:34:27 +000019 "context"
khenaidoo43c82122018-11-22 18:38:28 -050020 "errors"
21 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 "strings"
23 "sync"
24 "time"
25
Scott Bakerf2596722019-09-27 12:39:56 -070026 "github.com/Shopify/sarama"
khenaidoo43c82122018-11-22 18:38:28 -050027 scc "github.com/bsm/sarama-cluster"
Devmalya Paulc594bb32019-11-06 07:34:27 +000028 "github.com/eapache/go-resiliency/breaker"
khenaidoo43c82122018-11-22 18:38:28 -050029 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050030 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080031 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidoo43c82122018-11-22 18:38:28 -050033)
34
khenaidoo4c1a5bf2018-11-29 15:53:42 -050035type returnErrorFunction func() error
36
37// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
38// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
39//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050040type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050041 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050042 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050043}
44
45// SaramaClient represents the messaging proxy
46type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050047 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050048 client sarama.Client
49 KafkaHost string
50 KafkaPort int
51 producer sarama.AsyncProducer
52 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050053 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040054 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050055 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050056 consumerType int
khenaidooca301322019-01-09 23:06:32 -050057 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050058 producerFlushFrequency int
59 producerFlushMessages int
60 producerFlushMaxmessages int
61 producerRetryMax int
62 producerRetryBackOff time.Duration
63 producerReturnSuccess bool
64 producerReturnErrors bool
65 consumerMaxwait int
66 maxProcessingTime int
67 numPartitions int
68 numReplicas int
69 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050070 doneCh chan int
71 topicToConsumerChannelMap map[string]*consumerChannels
72 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050073 topicLockMap map[string]*sync.RWMutex
74 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053075 metadataMaxRetry int
Scott Bakeree6a0872019-10-29 15:59:52 -070076 alive bool
77 liveness chan bool
78 livenessChannelInterval time.Duration
79 lastLivenessTime time.Time
80 started bool
serkant.uluderya2ae470f2020-01-21 11:13:09 -080081 healthy bool
82 healthiness chan bool
khenaidoo43c82122018-11-22 18:38:28 -050083}
84
85type SaramaClientOption func(*SaramaClient)
86
87func Host(host string) SaramaClientOption {
88 return func(args *SaramaClient) {
89 args.KafkaHost = host
90 }
91}
92
93func Port(port int) SaramaClientOption {
94 return func(args *SaramaClient) {
95 args.KafkaPort = port
96 }
97}
98
khenaidooca301322019-01-09 23:06:32 -050099func ConsumerGroupPrefix(prefix string) SaramaClientOption {
100 return func(args *SaramaClient) {
101 args.consumerGroupPrefix = prefix
102 }
103}
104
105func ConsumerGroupName(name string) SaramaClientOption {
106 return func(args *SaramaClient) {
107 args.consumerGroupName = name
108 }
109}
110
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500111func ConsumerType(consumer int) SaramaClientOption {
112 return func(args *SaramaClient) {
113 args.consumerType = consumer
114 }
115}
116
117func ProducerFlushFrequency(frequency int) SaramaClientOption {
118 return func(args *SaramaClient) {
119 args.producerFlushFrequency = frequency
120 }
121}
122
123func ProducerFlushMessages(num int) SaramaClientOption {
124 return func(args *SaramaClient) {
125 args.producerFlushMessages = num
126 }
127}
128
129func ProducerFlushMaxMessages(num int) SaramaClientOption {
130 return func(args *SaramaClient) {
131 args.producerFlushMaxmessages = num
132 }
133}
134
khenaidoo90847922018-12-03 14:47:51 -0500135func ProducerMaxRetries(num int) SaramaClientOption {
136 return func(args *SaramaClient) {
137 args.producerRetryMax = num
138 }
139}
140
141func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
142 return func(args *SaramaClient) {
143 args.producerRetryBackOff = duration
144 }
145}
146
147func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500148 return func(args *SaramaClient) {
149 args.producerReturnErrors = opt
150 }
151}
152
khenaidoo90847922018-12-03 14:47:51 -0500153func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500154 return func(args *SaramaClient) {
155 args.producerReturnSuccess = opt
156 }
157}
158
159func ConsumerMaxWait(wait int) SaramaClientOption {
160 return func(args *SaramaClient) {
161 args.consumerMaxwait = wait
162 }
163}
164
165func MaxProcessingTime(pTime int) SaramaClientOption {
166 return func(args *SaramaClient) {
167 args.maxProcessingTime = pTime
168 }
169}
170
171func NumPartitions(number int) SaramaClientOption {
172 return func(args *SaramaClient) {
173 args.numPartitions = number
174 }
175}
176
177func NumReplicas(number int) SaramaClientOption {
178 return func(args *SaramaClient) {
179 args.numReplicas = number
180 }
181}
182
183func AutoCreateTopic(opt bool) SaramaClientOption {
184 return func(args *SaramaClient) {
185 args.autoCreateTopic = opt
186 }
187}
188
Abhilash S.L294ff522019-06-26 18:14:33 +0530189func MetadatMaxRetries(retry int) SaramaClientOption {
190 return func(args *SaramaClient) {
191 args.metadataMaxRetry = retry
192 }
193}
194
Scott Bakeree6a0872019-10-29 15:59:52 -0700195func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
196 return func(args *SaramaClient) {
197 args.livenessChannelInterval = opt
198 }
199}
200
khenaidoo43c82122018-11-22 18:38:28 -0500201func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
202 client := &SaramaClient{
203 KafkaHost: DefaultKafkaHost,
204 KafkaPort: DefaultKafkaPort,
205 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500206 client.consumerType = DefaultConsumerType
207 client.producerFlushFrequency = DefaultProducerFlushFrequency
208 client.producerFlushMessages = DefaultProducerFlushMessages
209 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
210 client.producerReturnErrors = DefaultProducerReturnErrors
211 client.producerReturnSuccess = DefaultProducerReturnSuccess
212 client.producerRetryMax = DefaultProducerRetryMax
213 client.producerRetryBackOff = DefaultProducerRetryBackoff
214 client.consumerMaxwait = DefaultConsumerMaxwait
215 client.maxProcessingTime = DefaultMaxProcessingTime
216 client.numPartitions = DefaultNumberPartitions
217 client.numReplicas = DefaultNumberReplicas
218 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530219 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Bakeree6a0872019-10-29 15:59:52 -0700220 client.livenessChannelInterval = DefaultLivenessChannelInterval
khenaidoo43c82122018-11-22 18:38:28 -0500221
222 for _, option := range opts {
223 option(client)
224 }
225
khenaidooca301322019-01-09 23:06:32 -0500226 client.groupConsumers = make(map[string]*scc.Consumer)
227
khenaidoo43c82122018-11-22 18:38:28 -0500228 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500229 client.topicLockMap = make(map[string]*sync.RWMutex)
230 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500231 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Bakeree6a0872019-10-29 15:59:52 -0700232
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800233 // healthy and alive until proven otherwise
Scott Bakeree6a0872019-10-29 15:59:52 -0700234 client.alive = true
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800235 client.healthy = true
Scott Bakeree6a0872019-10-29 15:59:52 -0700236
khenaidoo43c82122018-11-22 18:38:28 -0500237 return client
238}
239
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500240func (sc *SaramaClient) Start() error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800241 logger.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500242
243 // Create the Done channel
244 sc.doneCh = make(chan int, 1)
245
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500246 var err error
247
khenaidoob3244212019-08-27 14:32:27 -0400248 // Add a cleanup in case of failure to startup
249 defer func() {
250 if err != nil {
251 sc.Stop()
252 }
253 }()
254
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500255 // Create the Cluster Admin
256 if err = sc.createClusterAdmin(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800257 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500258 return err
259 }
260
khenaidoo43c82122018-11-22 18:38:28 -0500261 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500262 if err := sc.createPublisher(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800263 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500264 return err
265 }
266
khenaidooca301322019-01-09 23:06:32 -0500267 if sc.consumerType == DefaultConsumerType {
268 // Create the master consumers
269 if err := sc.createConsumer(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800270 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
khenaidooca301322019-01-09 23:06:32 -0500271 return err
272 }
khenaidoo43c82122018-11-22 18:38:28 -0500273 }
274
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500275 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500276 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
277
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800278 logger.Info("kafka-sarama-client-started")
khenaidooca301322019-01-09 23:06:32 -0500279
Scott Bakeree6a0872019-10-29 15:59:52 -0700280 sc.started = true
281
khenaidoo43c82122018-11-22 18:38:28 -0500282 return nil
283}
284
285func (sc *SaramaClient) Stop() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800286 logger.Info("stopping-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500287
Scott Bakeree6a0872019-10-29 15:59:52 -0700288 sc.started = false
289
khenaidoo43c82122018-11-22 18:38:28 -0500290 //Send a message over the done channel to close all long running routines
291 sc.doneCh <- 1
292
khenaidoo43c82122018-11-22 18:38:28 -0500293 if sc.producer != nil {
294 if err := sc.producer.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800295 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500296 }
297 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500298
khenaidoo43c82122018-11-22 18:38:28 -0500299 if sc.consumer != nil {
300 if err := sc.consumer.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800301 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500302 }
303 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500304
khenaidooca301322019-01-09 23:06:32 -0500305 for key, val := range sc.groupConsumers {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800306 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
khenaidooca301322019-01-09 23:06:32 -0500307 if err := val.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800308 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500309 }
310 }
311
312 if sc.cAdmin != nil {
313 if err := sc.cAdmin.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800314 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500315 }
316 }
317
318 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500319 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500320
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800321 logger.Info("sarama-client-stopped")
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500322}
323
khenaidooca301322019-01-09 23:06:32 -0500324//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
325// the invoking function must hold the lock
326func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500327 // Set the topic details
328 topicDetail := &sarama.TopicDetail{}
329 topicDetail.NumPartitions = int32(numPartition)
330 topicDetail.ReplicationFactor = int16(repFactor)
331 topicDetail.ConfigEntries = make(map[string]*string)
332 topicDetails := make(map[string]*sarama.TopicDetail)
333 topicDetails[topic.Name] = topicDetail
334
335 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
336 if err == sarama.ErrTopicAlreadyExists {
337 // Not an error
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800338 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500339 return nil
340 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800341 logger.Errorw("create-topic-failure", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500342 return err
343 }
344 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
345 // do so.
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800346 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500347 return nil
348}
349
khenaidooca301322019-01-09 23:06:32 -0500350//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
351// ensure no two go routines are performing operations on the same topic
352func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
353 sc.lockTopic(topic)
354 defer sc.unLockTopic(topic)
355
356 return sc.createTopic(topic, numPartition, repFactor)
357}
358
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500359//DeleteTopic removes a topic from the kafka Broker
360func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500361 sc.lockTopic(topic)
362 defer sc.unLockTopic(topic)
363
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500364 // Remove the topic from the broker
365 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
366 if err == sarama.ErrUnknownTopicOrPartition {
367 // Not an error as does not exist
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800368 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500369 return nil
370 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800371 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500372 return err
373 }
374
375 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
376 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800377 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500378 return err
379 }
380 return nil
381}
382
383// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
384// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500385func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500386 sc.lockTopic(topic)
387 defer sc.unLockTopic(topic)
388
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800389 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500390
391 // If a consumers already exist for that topic then resuse it
392 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800393 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500394 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500395 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500396 sc.addChannelToConsumerChannelMap(topic, ch)
397 return ch, nil
398 }
399
400 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500401 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500402 var err error
403
404 // Use the consumerType option to figure out the type of consumer to launch
405 if sc.consumerType == PartitionConsumer {
406 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500407 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800408 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500409 return nil, err
410 }
411 }
khenaidoo731697e2019-01-29 16:03:29 -0500412 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800413 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500414 return nil, err
415 }
416 } else if sc.consumerType == GroupCustomer {
417 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
418 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500419 //if sc.autoCreateTopic {
420 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800421 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500422 // return nil, err
423 // }
424 //}
425 //groupId := sc.consumerGroupName
426 groupId := getGroupId(kvArgs...)
427 // Include the group prefix
428 if groupId != "" {
429 groupId = sc.consumerGroupPrefix + groupId
430 } else {
431 // Need to use a unique group Id per topic
432 groupId = sc.consumerGroupPrefix + topic.Name
433 }
khenaidoo731697e2019-01-29 16:03:29 -0500434 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800435 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500436 return nil, err
437 }
khenaidooca301322019-01-09 23:06:32 -0500438
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500439 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800440 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500441 return nil, errors.New("unknown-consumer-type")
442 }
443
444 return consumerListeningChannel, nil
445}
446
447//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500448func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500449 sc.lockTopic(topic)
450 defer sc.unLockTopic(topic)
451
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800452 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500453 var err error
454 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800455 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500456 }
457 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800458 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500459 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500460 return err
461}
462
Scott Bakeree6a0872019-10-29 15:59:52 -0700463func (sc *SaramaClient) updateLiveness(alive bool) {
464 // Post a consistent stream of liveness data to the channel,
465 // so that in a live state, the core does not timeout and
466 // send a forced liveness message. Production of liveness
467 // events to the channel is rate-limited by livenessChannelInterval.
468 if sc.liveness != nil {
469 if sc.alive != alive {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800470 logger.Info("update-liveness-channel-because-change")
Scott Bakeree6a0872019-10-29 15:59:52 -0700471 sc.liveness <- alive
472 sc.lastLivenessTime = time.Now()
473 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800474 logger.Info("update-liveness-channel-because-interval")
Scott Bakeree6a0872019-10-29 15:59:52 -0700475 sc.liveness <- alive
476 sc.lastLivenessTime = time.Now()
477 }
478 }
479
480 // Only emit a log message when the state changes
481 if sc.alive != alive {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800482 logger.Info("set-client-alive", log.Fields{"alive": alive})
Scott Bakeree6a0872019-10-29 15:59:52 -0700483 sc.alive = alive
484 }
485}
486
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800487// Once unhealthy, we never go back
488func (sc *SaramaClient) setUnhealthy() {
489 sc.healthy = false
490 if sc.healthiness != nil {
491 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
492 sc.healthiness <- sc.healthy
493 }
494}
495
Devmalya Paulc594bb32019-11-06 07:34:27 +0000496func (sc *SaramaClient) isLivenessError(err error) bool {
497 // Sarama producers and consumers encapsulate the error inside
498 // a ProducerError or ConsumerError struct.
499 if prodError, ok := err.(*sarama.ProducerError); ok {
500 err = prodError.Err
501 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
502 err = consumerError.Err
503 }
504
505 // Sarama-Cluster will compose the error into a ClusterError struct,
506 // which we can't do a compare by reference. To handle that, we the
507 // best we can do is compare the error strings.
508
509 switch err.Error() {
510 case context.DeadlineExceeded.Error():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800511 logger.Info("is-liveness-error-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000512 return true
513 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800514 logger.Info("is-liveness-error-no-brokers")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000515 return true
516 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800517 logger.Info("is-liveness-error-shutting-down")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000518 return true
519 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800520 logger.Info("is-liveness-error-not-available")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000521 return true
522 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800523 logger.Info("is-liveness-error-circuit-breaker-open")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000524 return true
525 }
526
527 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800528 logger.Info("is-liveness-error-connection-refused")
529 return true
530 }
531
532 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
533 logger.Info("is-liveness-error-io-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000534 return true
535 }
536
537 // Other errors shouldn't trigger a loss of liveness
538
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800539 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000540
541 return false
542}
543
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500544// send formats and sends the request onto the kafka messaging bus.
545func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
546
547 // Assert message is a proto message
548 var protoMsg proto.Message
549 var ok bool
550 // ascertain the value interface type is a proto.Message
551 if protoMsg, ok = msg.(proto.Message); !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800552 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500553 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
554 }
555
556 var marshalled []byte
557 var err error
558 // Create the Sarama producer message
559 if marshalled, err = proto.Marshal(protoMsg); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800560 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500561 return err
562 }
563 key := ""
564 if len(keys) > 0 {
565 key = keys[0] // Only the first key is relevant
566 }
567 kafkaMsg := &sarama.ProducerMessage{
568 Topic: topic.Name,
569 Key: sarama.StringEncoder(key),
570 Value: sarama.ByteEncoder(marshalled),
571 }
572
573 // Send message to kafka
574 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500575 // Wait for result
576 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
577 select {
578 case ok := <-sc.producer.Successes():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800579 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Bakeree6a0872019-10-29 15:59:52 -0700580 sc.updateLiveness(true)
khenaidoo90847922018-12-03 14:47:51 -0500581 case notOk := <-sc.producer.Errors():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800582 logger.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000583 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700584 sc.updateLiveness(false)
585 }
586 return notOk
587 }
588 return nil
589}
590
591// Enable the liveness monitor channel. This channel will report
592// a "true" or "false" on every publish, which indicates whether
593// or not the channel is still live. This channel is then picked up
594// by the service (i.e. rw_core / ro_core) to update readiness status
595// and/or take other actions.
596func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800597 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Bakeree6a0872019-10-29 15:59:52 -0700598 if enable {
599 if sc.liveness == nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800600 logger.Info("kafka-create-liveness-channel")
Scott Bakeree6a0872019-10-29 15:59:52 -0700601 // At least 1, so we can immediately post to it without blocking
602 // Setting a bigger number (10) allows the monitor to fall behind
603 // without blocking others. The monitor shouldn't really fall
604 // behind...
605 sc.liveness = make(chan bool, 10)
606 // post intial state to the channel
607 sc.liveness <- sc.alive
608 }
609 } else {
610 // TODO: Think about whether we need the ability to turn off
611 // liveness monitoring
612 panic("Turning off liveness reporting is not supported")
613 }
614 return sc.liveness
615}
616
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800617// Enable the Healthiness monitor channel. This channel will report "false"
618// if the kafka consumers die, or some other problem occurs which is
619// catastrophic that would require re-creating the client.
620func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
621 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
622 if enable {
623 if sc.healthiness == nil {
624 logger.Info("kafka-create-healthiness-channel")
625 // At least 1, so we can immediately post to it without blocking
626 // Setting a bigger number (10) allows the monitor to fall behind
627 // without blocking others. The monitor shouldn't really fall
628 // behind...
629 sc.healthiness = make(chan bool, 10)
630 // post intial state to the channel
631 sc.healthiness <- sc.healthy
632 }
633 } else {
634 // TODO: Think about whether we need the ability to turn off
635 // liveness monitoring
636 panic("Turning off healthiness reporting is not supported")
637 }
638 return sc.healthiness
639}
640
Scott Bakeree6a0872019-10-29 15:59:52 -0700641// send an empty message on the liveness channel to check whether connectivity has
642// been restored.
643func (sc *SaramaClient) SendLiveness() error {
644 if !sc.started {
645 return fmt.Errorf("SendLiveness() called while not started")
646 }
647
648 kafkaMsg := &sarama.ProducerMessage{
649 Topic: "_liveness_test",
650 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
651 }
652
653 // Send message to kafka
654 sc.producer.Input() <- kafkaMsg
655 // Wait for result
656 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
657 select {
658 case ok := <-sc.producer.Successes():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800659 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
Scott Bakeree6a0872019-10-29 15:59:52 -0700660 sc.updateLiveness(true)
661 case notOk := <-sc.producer.Errors():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800662 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000663 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700664 sc.updateLiveness(false)
665 }
khenaidoo90847922018-12-03 14:47:51 -0500666 return notOk
667 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500668 return nil
669}
670
khenaidooca301322019-01-09 23:06:32 -0500671// getGroupId returns the group id from the key-value args.
672func getGroupId(kvArgs ...*KVArg) string {
673 for _, arg := range kvArgs {
674 if arg.Key == GroupIdKey {
675 return arg.Value.(string)
676 }
677 }
678 return ""
679}
680
khenaidoo731697e2019-01-29 16:03:29 -0500681// getOffset returns the offset from the key-value args.
682func getOffset(kvArgs ...*KVArg) int64 {
683 for _, arg := range kvArgs {
684 if arg.Key == Offset {
685 return arg.Value.(int64)
686 }
687 }
688 return sarama.OffsetNewest
689}
690
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500691func (sc *SaramaClient) createClusterAdmin() error {
692 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
693 config := sarama.NewConfig()
694 config.Version = sarama.V1_0_0_0
695
696 // Create a cluster Admin
697 var cAdmin sarama.ClusterAdmin
698 var err error
699 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800700 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500701 return err
702 }
703 sc.cAdmin = cAdmin
704 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500705}
706
khenaidood2b6df92018-12-13 16:37:20 -0500707func (sc *SaramaClient) lockTopic(topic *Topic) {
708 sc.lockOfTopicLockMap.Lock()
709 if _, exist := sc.topicLockMap[topic.Name]; exist {
710 sc.lockOfTopicLockMap.Unlock()
711 sc.topicLockMap[topic.Name].Lock()
712 } else {
713 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
714 sc.lockOfTopicLockMap.Unlock()
715 sc.topicLockMap[topic.Name].Lock()
716 }
717}
718
719func (sc *SaramaClient) unLockTopic(topic *Topic) {
720 sc.lockOfTopicLockMap.Lock()
721 defer sc.lockOfTopicLockMap.Unlock()
722 if _, exist := sc.topicLockMap[topic.Name]; exist {
723 sc.topicLockMap[topic.Name].Unlock()
724 }
725}
726
khenaidoo43c82122018-11-22 18:38:28 -0500727func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
728 sc.lockTopicToConsumerChannelMap.Lock()
729 defer sc.lockTopicToConsumerChannelMap.Unlock()
730 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
731 sc.topicToConsumerChannelMap[id] = arg
732 }
733}
734
735func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
739 delete(sc.topicToConsumerChannelMap, id)
740 }
741}
742
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500743func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400744 sc.lockTopicToConsumerChannelMap.RLock()
745 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500746
747 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
748 return consumerCh
749 }
750 return nil
751}
752
khenaidoo79232702018-12-04 11:00:41 -0500753func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500754 sc.lockTopicToConsumerChannelMap.Lock()
755 defer sc.lockTopicToConsumerChannelMap.Unlock()
756 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
757 consumerCh.channels = append(consumerCh.channels, ch)
758 return
759 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800760 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500761}
762
763//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
764func closeConsumers(consumers []interface{}) error {
765 var err error
766 for _, consumer := range consumers {
767 // Is it a partition consumers?
768 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
769 if errTemp := partionConsumer.Close(); errTemp != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800770 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500771 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
772 // This can occur on race condition
773 err = nil
774 } else {
775 err = errTemp
776 }
777 }
778 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
779 if errTemp := groupConsumer.Close(); errTemp != nil {
780 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
781 // This can occur on race condition
782 err = nil
783 } else {
784 err = errTemp
785 }
786 }
787 }
788 }
789 return err
khenaidoo43c82122018-11-22 18:38:28 -0500790}
791
khenaidoo79232702018-12-04 11:00:41 -0500792func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500793 sc.lockTopicToConsumerChannelMap.Lock()
794 defer sc.lockTopicToConsumerChannelMap.Unlock()
795 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
796 // Channel will be closed in the removeChannel method
797 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500798 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500799 if len(consumerCh.channels) == 0 {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800800 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500801 err := closeConsumers(consumerCh.consumers)
802 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500803 delete(sc.topicToConsumerChannelMap, topic.Name)
804 return err
805 }
806 return nil
807 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800808 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500809 return errors.New("topic-does-not-exist")
810}
811
812func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
813 sc.lockTopicToConsumerChannelMap.Lock()
814 defer sc.lockTopicToConsumerChannelMap.Unlock()
815 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
816 for _, ch := range consumerCh.channels {
817 // Channel will be closed in the removeChannel method
818 removeChannel(consumerCh.channels, ch)
819 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500820 err := closeConsumers(consumerCh.consumers)
821 //if err == sarama.ErrUnknownTopicOrPartition {
822 // // Not an error
823 // err = nil
824 //}
825 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500826 delete(sc.topicToConsumerChannelMap, topic.Name)
827 return err
828 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800829 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500830 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500831}
832
833func (sc *SaramaClient) clearConsumerChannelMap() error {
834 sc.lockTopicToConsumerChannelMap.Lock()
835 defer sc.lockTopicToConsumerChannelMap.Unlock()
836 var err error
837 for topic, consumerCh := range sc.topicToConsumerChannelMap {
838 for _, ch := range consumerCh.channels {
839 // Channel will be closed in the removeChannel method
840 removeChannel(consumerCh.channels, ch)
841 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500842 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
843 err = errTemp
844 }
845 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500846 delete(sc.topicToConsumerChannelMap, topic)
847 }
848 return err
849}
850
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500851//createPublisher creates the publisher which is used to send a message onto kafka
852func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500853 // This Creates the publisher
854 config := sarama.NewConfig()
855 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500856 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
857 config.Producer.Flush.Messages = sc.producerFlushMessages
858 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
859 config.Producer.Return.Errors = sc.producerReturnErrors
860 config.Producer.Return.Successes = sc.producerReturnSuccess
861 //config.Producer.RequiredAcks = sarama.WaitForAll
862 config.Producer.RequiredAcks = sarama.WaitForLocal
863
khenaidoo43c82122018-11-22 18:38:28 -0500864 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
865 brokers := []string{kafkaFullAddr}
866
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500867 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800868 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500869 return err
870 } else {
871 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500872 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800873 logger.Info("Kafka-publisher-created")
khenaidoo43c82122018-11-22 18:38:28 -0500874 return nil
875}
876
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500877func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500878 config := sarama.NewConfig()
879 config.Consumer.Return.Errors = true
880 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500881 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
882 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500883 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530884 config.Metadata.Retry.Max = sc.metadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500885 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
886 brokers := []string{kafkaFullAddr}
887
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500888 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800889 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500890 return err
891 } else {
892 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500893 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800894 logger.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500895 return nil
896}
897
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500898// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500899func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500900 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500901 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500902 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Bakeree6a0872019-10-29 15:59:52 -0700903 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
904 config.Consumer.Return.Errors = true
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500905 //config.Group.Return.Notifications = false
906 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
907 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500908 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500909 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500910 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
911 brokers := []string{kafkaFullAddr}
912
khenaidoo43c82122018-11-22 18:38:28 -0500913 topics := []string{topic.Name}
914 var consumer *scc.Consumer
915 var err error
916
khenaidooca301322019-01-09 23:06:32 -0500917 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800918 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500919 return nil, err
920 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800921 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500922
923 //sc.groupConsumers[topic.Name] = consumer
924 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500925 return consumer, nil
926}
927
khenaidoo43c82122018-11-22 18:38:28 -0500928// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500929// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500930func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500931 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400932 sc.lockTopicToConsumerChannelMap.RLock()
933 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500934 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500935 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500936 c <- protoMessage
937 }(ch)
938 }
939}
940
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500941func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800942 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500943startloop:
944 for {
945 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500946 case err, ok := <-consumer.Errors():
947 if ok {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500948 if sc.isLivenessError(err) {
949 sc.updateLiveness(false)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800950 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
khenaidoo6e55d9e2019-12-12 18:26:26 -0500951 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500952 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500953 // Channel is closed
954 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500955 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500956 case msg, ok := <-consumer.Messages():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800957 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500958 if !ok {
959 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500960 break startloop
961 }
962 msgBody := msg.Value
khenaidoo6e55d9e2019-12-12 18:26:26 -0500963 sc.updateLiveness(true)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800964 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo79232702018-12-04 11:00:41 -0500965 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500966 if err := proto.Unmarshal(msgBody, icm); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800967 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500968 continue
969 }
970 go sc.dispatchToConsumers(consumerChnls, icm)
971 case <-sc.doneCh:
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800972 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500973 break startloop
974 }
975 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800976 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
977 sc.setUnhealthy()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500978}
979
980func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800981 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500982
983startloop:
984 for {
985 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500986 case err, ok := <-consumer.Errors():
987 if ok {
Devmalya Paulc594bb32019-11-06 07:34:27 +0000988 if sc.isLivenessError(err) {
989 sc.updateLiveness(false)
990 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800991 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500992 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800993 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500994 // channel is closed
995 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500996 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500997 case msg, ok := <-consumer.Messages():
998 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800999 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -05001000 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001001 break startloop
1002 }
Scott Bakeree6a0872019-10-29 15:59:52 -07001003 sc.updateLiveness(true)
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001004 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -05001005 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -05001006 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -05001007 if err := proto.Unmarshal(msgBody, icm); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001008 logger.Warnw("invalid-message", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -05001009 continue
1010 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001011 go sc.dispatchToConsumers(consumerChnls, icm)
1012 consumer.MarkOffset(msg, "")
1013 case ntf := <-consumer.Notifications():
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001014 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -05001015 case <-sc.doneCh:
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001016 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001017 break startloop
1018 }
1019 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001020 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
1021 sc.setUnhealthy()
khenaidoo43c82122018-11-22 18:38:28 -05001022}
1023
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001024func (sc *SaramaClient) startConsumers(topic *Topic) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001025 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001026 var consumerCh *consumerChannels
1027 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001028 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001029 return errors.New("consumers-not-exist")
1030 }
1031 // For each consumer listening for that topic, start a consumption loop
1032 for _, consumer := range consumerCh.consumers {
1033 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1034 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1035 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1036 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1037 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001038 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001039 return errors.New("invalid-consumer")
1040 }
1041 }
1042 return nil
1043}
1044
1045//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1046//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -05001047func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001048 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -05001049 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001050
khenaidoo7ff26c72019-01-16 14:55:48 -05001051 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001052 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001053 return nil, err
1054 }
1055
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001056 consumersIf := make([]interface{}, 0)
1057 for _, pConsumer := range pConsumers {
1058 consumersIf = append(consumersIf, pConsumer)
1059 }
1060
1061 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -05001062 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001063 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -05001064 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001065 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -05001066 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -05001067 }
1068
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001069 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -05001070 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1071
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001072 //Start a consumers to listen on that specific topic
1073 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -05001074
1075 return consumerListeningChannel, nil
1076}
1077
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001078// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1079// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -05001080func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001081 // TODO: Replace this development partition consumers with a group consumers
1082 var pConsumer *scc.Consumer
1083 var err error
khenaidoo731697e2019-01-29 16:03:29 -05001084 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001085 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001086 return nil, err
1087 }
1088 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1089 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001090 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001091 cc := &consumerChannels{
1092 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -05001093 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001094 }
1095
1096 // Add the consumers channel to the map
1097 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1098
1099 //Start a consumers to listen on that specific topic
1100 go sc.startConsumers(topic)
1101
1102 return consumerListeningChannel, nil
1103}
1104
khenaidoo7ff26c72019-01-16 14:55:48 -05001105func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001106 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001107 partitionList, err := sc.consumer.Partitions(topic.Name)
1108 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001109 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001110 return nil, err
1111 }
1112
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001113 pConsumers := make([]sarama.PartitionConsumer, 0)
1114 for _, partition := range partitionList {
1115 var pConsumer sarama.PartitionConsumer
1116 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001117 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001118 return nil, err
1119 }
1120 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -05001121 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001122 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -05001123}
1124
khenaidoo79232702018-12-04 11:00:41 -05001125func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -05001126 var i int
khenaidoo79232702018-12-04 11:00:41 -05001127 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -05001128 for i, channel = range channels {
1129 if channel == ch {
1130 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1131 close(channel)
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001132 logger.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -05001133 return channels[:len(channels)-1]
1134 }
1135 }
1136 return channels
1137}
khenaidoo7ff26c72019-01-16 14:55:48 -05001138
khenaidoo7ff26c72019-01-16 14:55:48 -05001139func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1140 sc.lockOfGroupConsumers.Lock()
1141 defer sc.lockOfGroupConsumers.Unlock()
1142 if _, exist := sc.groupConsumers[topic]; !exist {
1143 sc.groupConsumers[topic] = consumer
1144 }
1145}
1146
1147func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1148 sc.lockOfGroupConsumers.Lock()
1149 defer sc.lockOfGroupConsumers.Unlock()
1150 if _, exist := sc.groupConsumers[topic]; exist {
1151 consumer := sc.groupConsumers[topic]
1152 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -04001153 if err := consumer.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001154 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -05001155 return err
1156 }
1157 }
1158 return nil
1159}