blob: c0c16f94bb7764216c38311008dd33ba38118871 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
Devmalya Paulc594bb32019-11-06 07:34:27 +000019 "context"
khenaidoo43c82122018-11-22 18:38:28 -050020 "errors"
21 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 "strings"
23 "sync"
24 "time"
25
Scott Bakerf2596722019-09-27 12:39:56 -070026 "github.com/Shopify/sarama"
khenaidoo43c82122018-11-22 18:38:28 -050027 scc "github.com/bsm/sarama-cluster"
Devmalya Paulc594bb32019-11-06 07:34:27 +000028 "github.com/eapache/go-resiliency/breaker"
khenaidoo43c82122018-11-22 18:38:28 -050029 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050030 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080031 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidoo43c82122018-11-22 18:38:28 -050033)
34
khenaidoo4c1a5bf2018-11-29 15:53:42 -050035type returnErrorFunction func() error
36
37// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
38// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
39//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050040type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050041 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050042 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050043}
44
npujar467fe752020-01-16 20:17:45 +053045// static check to ensure SaramaClient implements Client
46var _ Client = &SaramaClient{}
47
khenaidoo43c82122018-11-22 18:38:28 -050048// SaramaClient represents the messaging proxy
49type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050050 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050051 client sarama.Client
52 KafkaHost string
53 KafkaPort int
54 producer sarama.AsyncProducer
55 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050056 groupConsumers map[string]*scc.Consumer
khenaidoo2c6a0992019-04-29 13:46:56 -040057 lockOfGroupConsumers sync.RWMutex
khenaidooca301322019-01-09 23:06:32 -050058 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050059 consumerType int
khenaidooca301322019-01-09 23:06:32 -050060 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050061 producerFlushFrequency int
62 producerFlushMessages int
63 producerFlushMaxmessages int
64 producerRetryMax int
65 producerRetryBackOff time.Duration
66 producerReturnSuccess bool
67 producerReturnErrors bool
68 consumerMaxwait int
69 maxProcessingTime int
70 numPartitions int
71 numReplicas int
72 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050073 doneCh chan int
npujar467fe752020-01-16 20:17:45 +053074 metadataCallback func(fromTopic string, timestamp int64)
khenaidoo43c82122018-11-22 18:38:28 -050075 topicToConsumerChannelMap map[string]*consumerChannels
76 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050077 topicLockMap map[string]*sync.RWMutex
78 lockOfTopicLockMap sync.RWMutex
Abhilash S.L294ff522019-06-26 18:14:33 +053079 metadataMaxRetry int
Scott Bakeree6a0872019-10-29 15:59:52 -070080 alive bool
81 liveness chan bool
82 livenessChannelInterval time.Duration
83 lastLivenessTime time.Time
84 started bool
serkant.uluderya2ae470f2020-01-21 11:13:09 -080085 healthy bool
86 healthiness chan bool
khenaidoo43c82122018-11-22 18:38:28 -050087}
88
89type SaramaClientOption func(*SaramaClient)
90
91func Host(host string) SaramaClientOption {
92 return func(args *SaramaClient) {
93 args.KafkaHost = host
94 }
95}
96
97func Port(port int) SaramaClientOption {
98 return func(args *SaramaClient) {
99 args.KafkaPort = port
100 }
101}
102
khenaidooca301322019-01-09 23:06:32 -0500103func ConsumerGroupPrefix(prefix string) SaramaClientOption {
104 return func(args *SaramaClient) {
105 args.consumerGroupPrefix = prefix
106 }
107}
108
109func ConsumerGroupName(name string) SaramaClientOption {
110 return func(args *SaramaClient) {
111 args.consumerGroupName = name
112 }
113}
114
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500115func ConsumerType(consumer int) SaramaClientOption {
116 return func(args *SaramaClient) {
117 args.consumerType = consumer
118 }
119}
120
121func ProducerFlushFrequency(frequency int) SaramaClientOption {
122 return func(args *SaramaClient) {
123 args.producerFlushFrequency = frequency
124 }
125}
126
127func ProducerFlushMessages(num int) SaramaClientOption {
128 return func(args *SaramaClient) {
129 args.producerFlushMessages = num
130 }
131}
132
133func ProducerFlushMaxMessages(num int) SaramaClientOption {
134 return func(args *SaramaClient) {
135 args.producerFlushMaxmessages = num
136 }
137}
138
khenaidoo90847922018-12-03 14:47:51 -0500139func ProducerMaxRetries(num int) SaramaClientOption {
140 return func(args *SaramaClient) {
141 args.producerRetryMax = num
142 }
143}
144
145func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
146 return func(args *SaramaClient) {
147 args.producerRetryBackOff = duration
148 }
149}
150
151func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500152 return func(args *SaramaClient) {
153 args.producerReturnErrors = opt
154 }
155}
156
khenaidoo90847922018-12-03 14:47:51 -0500157func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500158 return func(args *SaramaClient) {
159 args.producerReturnSuccess = opt
160 }
161}
162
163func ConsumerMaxWait(wait int) SaramaClientOption {
164 return func(args *SaramaClient) {
165 args.consumerMaxwait = wait
166 }
167}
168
169func MaxProcessingTime(pTime int) SaramaClientOption {
170 return func(args *SaramaClient) {
171 args.maxProcessingTime = pTime
172 }
173}
174
175func NumPartitions(number int) SaramaClientOption {
176 return func(args *SaramaClient) {
177 args.numPartitions = number
178 }
179}
180
181func NumReplicas(number int) SaramaClientOption {
182 return func(args *SaramaClient) {
183 args.numReplicas = number
184 }
185}
186
187func AutoCreateTopic(opt bool) SaramaClientOption {
188 return func(args *SaramaClient) {
189 args.autoCreateTopic = opt
190 }
191}
192
Abhilash S.L294ff522019-06-26 18:14:33 +0530193func MetadatMaxRetries(retry int) SaramaClientOption {
194 return func(args *SaramaClient) {
195 args.metadataMaxRetry = retry
196 }
197}
198
Scott Bakeree6a0872019-10-29 15:59:52 -0700199func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
200 return func(args *SaramaClient) {
201 args.livenessChannelInterval = opt
202 }
203}
204
khenaidoo43c82122018-11-22 18:38:28 -0500205func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
206 client := &SaramaClient{
207 KafkaHost: DefaultKafkaHost,
208 KafkaPort: DefaultKafkaPort,
209 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500210 client.consumerType = DefaultConsumerType
211 client.producerFlushFrequency = DefaultProducerFlushFrequency
212 client.producerFlushMessages = DefaultProducerFlushMessages
213 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
214 client.producerReturnErrors = DefaultProducerReturnErrors
215 client.producerReturnSuccess = DefaultProducerReturnSuccess
216 client.producerRetryMax = DefaultProducerRetryMax
217 client.producerRetryBackOff = DefaultProducerRetryBackoff
218 client.consumerMaxwait = DefaultConsumerMaxwait
219 client.maxProcessingTime = DefaultMaxProcessingTime
220 client.numPartitions = DefaultNumberPartitions
221 client.numReplicas = DefaultNumberReplicas
222 client.autoCreateTopic = DefaultAutoCreateTopic
Abhilash S.L294ff522019-06-26 18:14:33 +0530223 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Bakeree6a0872019-10-29 15:59:52 -0700224 client.livenessChannelInterval = DefaultLivenessChannelInterval
khenaidoo43c82122018-11-22 18:38:28 -0500225
226 for _, option := range opts {
227 option(client)
228 }
229
khenaidooca301322019-01-09 23:06:32 -0500230 client.groupConsumers = make(map[string]*scc.Consumer)
231
khenaidoo43c82122018-11-22 18:38:28 -0500232 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500233 client.topicLockMap = make(map[string]*sync.RWMutex)
234 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo7ff26c72019-01-16 14:55:48 -0500235 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Bakeree6a0872019-10-29 15:59:52 -0700236
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800237 // healthy and alive until proven otherwise
Scott Bakeree6a0872019-10-29 15:59:52 -0700238 client.alive = true
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800239 client.healthy = true
Scott Bakeree6a0872019-10-29 15:59:52 -0700240
khenaidoo43c82122018-11-22 18:38:28 -0500241 return client
242}
243
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500244func (sc *SaramaClient) Start() error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800245 logger.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500246
247 // Create the Done channel
248 sc.doneCh = make(chan int, 1)
249
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500250 var err error
251
khenaidoob3244212019-08-27 14:32:27 -0400252 // Add a cleanup in case of failure to startup
253 defer func() {
254 if err != nil {
255 sc.Stop()
256 }
257 }()
258
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500259 // Create the Cluster Admin
260 if err = sc.createClusterAdmin(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800261 logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500262 return err
263 }
264
khenaidoo43c82122018-11-22 18:38:28 -0500265 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500266 if err := sc.createPublisher(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800267 logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500268 return err
269 }
270
khenaidooca301322019-01-09 23:06:32 -0500271 if sc.consumerType == DefaultConsumerType {
272 // Create the master consumers
273 if err := sc.createConsumer(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800274 logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
khenaidooca301322019-01-09 23:06:32 -0500275 return err
276 }
khenaidoo43c82122018-11-22 18:38:28 -0500277 }
278
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500279 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500280 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
281
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800282 logger.Info("kafka-sarama-client-started")
khenaidooca301322019-01-09 23:06:32 -0500283
Scott Bakeree6a0872019-10-29 15:59:52 -0700284 sc.started = true
285
khenaidoo43c82122018-11-22 18:38:28 -0500286 return nil
287}
288
289func (sc *SaramaClient) Stop() {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800290 logger.Info("stopping-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500291
Scott Bakeree6a0872019-10-29 15:59:52 -0700292 sc.started = false
293
khenaidoo43c82122018-11-22 18:38:28 -0500294 //Send a message over the done channel to close all long running routines
295 sc.doneCh <- 1
296
khenaidoo43c82122018-11-22 18:38:28 -0500297 if sc.producer != nil {
298 if err := sc.producer.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800299 logger.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500300 }
301 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500302
khenaidoo43c82122018-11-22 18:38:28 -0500303 if sc.consumer != nil {
304 if err := sc.consumer.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800305 logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500306 }
307 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500308
khenaidooca301322019-01-09 23:06:32 -0500309 for key, val := range sc.groupConsumers {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800310 logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
khenaidooca301322019-01-09 23:06:32 -0500311 if err := val.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800312 logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500313 }
314 }
315
316 if sc.cAdmin != nil {
317 if err := sc.cAdmin.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800318 logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500319 }
320 }
321
322 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500323 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500324
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800325 logger.Info("sarama-client-stopped")
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500326}
327
khenaidooca301322019-01-09 23:06:32 -0500328//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
329// the invoking function must hold the lock
330func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500331 // Set the topic details
332 topicDetail := &sarama.TopicDetail{}
333 topicDetail.NumPartitions = int32(numPartition)
334 topicDetail.ReplicationFactor = int16(repFactor)
335 topicDetail.ConfigEntries = make(map[string]*string)
336 topicDetails := make(map[string]*sarama.TopicDetail)
337 topicDetails[topic.Name] = topicDetail
338
339 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
340 if err == sarama.ErrTopicAlreadyExists {
341 // Not an error
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800342 logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500343 return nil
344 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800345 logger.Errorw("create-topic-failure", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500346 return err
347 }
348 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
349 // do so.
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800350 logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500351 return nil
352}
353
khenaidooca301322019-01-09 23:06:32 -0500354//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
355// ensure no two go routines are performing operations on the same topic
356func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
357 sc.lockTopic(topic)
358 defer sc.unLockTopic(topic)
359
360 return sc.createTopic(topic, numPartition, repFactor)
361}
362
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500363//DeleteTopic removes a topic from the kafka Broker
364func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500365 sc.lockTopic(topic)
366 defer sc.unLockTopic(topic)
367
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500368 // Remove the topic from the broker
369 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
370 if err == sarama.ErrUnknownTopicOrPartition {
371 // Not an error as does not exist
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800372 logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500373 return nil
374 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800375 logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500376 return err
377 }
378
379 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
380 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800381 logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500382 return err
383 }
384 return nil
385}
386
387// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
388// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500389func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500390 sc.lockTopic(topic)
391 defer sc.unLockTopic(topic)
392
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800393 logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500394
395 // If a consumers already exist for that topic then resuse it
396 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800397 logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500398 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500399 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500400 sc.addChannelToConsumerChannelMap(topic, ch)
401 return ch, nil
402 }
403
404 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500405 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500406 var err error
407
408 // Use the consumerType option to figure out the type of consumer to launch
409 if sc.consumerType == PartitionConsumer {
410 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500411 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800412 logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500413 return nil, err
414 }
415 }
khenaidoo731697e2019-01-29 16:03:29 -0500416 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800417 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500418 return nil, err
419 }
420 } else if sc.consumerType == GroupCustomer {
421 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
422 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500423 //if sc.autoCreateTopic {
424 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800425 // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
khenaidooca301322019-01-09 23:06:32 -0500426 // return nil, err
427 // }
428 //}
429 //groupId := sc.consumerGroupName
430 groupId := getGroupId(kvArgs...)
431 // Include the group prefix
432 if groupId != "" {
433 groupId = sc.consumerGroupPrefix + groupId
434 } else {
435 // Need to use a unique group Id per topic
436 groupId = sc.consumerGroupPrefix + topic.Name
437 }
khenaidoo731697e2019-01-29 16:03:29 -0500438 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800439 logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500440 return nil, err
441 }
khenaidooca301322019-01-09 23:06:32 -0500442
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500443 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800444 logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500445 return nil, errors.New("unknown-consumer-type")
446 }
447
448 return consumerListeningChannel, nil
449}
450
451//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500452func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500453 sc.lockTopic(topic)
454 defer sc.unLockTopic(topic)
455
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800456 logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
khenaidoo7ff26c72019-01-16 14:55:48 -0500457 var err error
458 if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800459 logger.Errorw("failed-removing-channel", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500460 }
461 if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800462 logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -0500463 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500464 return err
465}
466
npujar467fe752020-01-16 20:17:45 +0530467func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
468 sc.metadataCallback = callback
469}
470
Scott Bakeree6a0872019-10-29 15:59:52 -0700471func (sc *SaramaClient) updateLiveness(alive bool) {
472 // Post a consistent stream of liveness data to the channel,
473 // so that in a live state, the core does not timeout and
474 // send a forced liveness message. Production of liveness
475 // events to the channel is rate-limited by livenessChannelInterval.
476 if sc.liveness != nil {
477 if sc.alive != alive {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800478 logger.Info("update-liveness-channel-because-change")
Scott Bakeree6a0872019-10-29 15:59:52 -0700479 sc.liveness <- alive
480 sc.lastLivenessTime = time.Now()
481 } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800482 logger.Info("update-liveness-channel-because-interval")
Scott Bakeree6a0872019-10-29 15:59:52 -0700483 sc.liveness <- alive
484 sc.lastLivenessTime = time.Now()
485 }
486 }
487
488 // Only emit a log message when the state changes
489 if sc.alive != alive {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800490 logger.Info("set-client-alive", log.Fields{"alive": alive})
Scott Bakeree6a0872019-10-29 15:59:52 -0700491 sc.alive = alive
492 }
493}
494
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800495// Once unhealthy, we never go back
496func (sc *SaramaClient) setUnhealthy() {
497 sc.healthy = false
498 if sc.healthiness != nil {
499 logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
500 sc.healthiness <- sc.healthy
501 }
502}
503
Devmalya Paulc594bb32019-11-06 07:34:27 +0000504func (sc *SaramaClient) isLivenessError(err error) bool {
505 // Sarama producers and consumers encapsulate the error inside
506 // a ProducerError or ConsumerError struct.
507 if prodError, ok := err.(*sarama.ProducerError); ok {
508 err = prodError.Err
509 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
510 err = consumerError.Err
511 }
512
513 // Sarama-Cluster will compose the error into a ClusterError struct,
514 // which we can't do a compare by reference. To handle that, we the
515 // best we can do is compare the error strings.
516
517 switch err.Error() {
518 case context.DeadlineExceeded.Error():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800519 logger.Info("is-liveness-error-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000520 return true
521 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800522 logger.Info("is-liveness-error-no-brokers")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000523 return true
524 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800525 logger.Info("is-liveness-error-shutting-down")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000526 return true
527 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800528 logger.Info("is-liveness-error-not-available")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000529 return true
530 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800531 logger.Info("is-liveness-error-circuit-breaker-open")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000532 return true
533 }
534
535 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800536 logger.Info("is-liveness-error-connection-refused")
537 return true
538 }
539
540 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
541 logger.Info("is-liveness-error-io-timeout")
Devmalya Paulc594bb32019-11-06 07:34:27 +0000542 return true
543 }
544
545 // Other errors shouldn't trigger a loss of liveness
546
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800547 logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000548
549 return false
550}
551
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500552// send formats and sends the request onto the kafka messaging bus.
553func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
554
555 // Assert message is a proto message
556 var protoMsg proto.Message
557 var ok bool
558 // ascertain the value interface type is a proto.Message
559 if protoMsg, ok = msg.(proto.Message); !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800560 logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500561 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
562 }
563
564 var marshalled []byte
565 var err error
566 // Create the Sarama producer message
567 if marshalled, err = proto.Marshal(protoMsg); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800568 logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500569 return err
570 }
571 key := ""
572 if len(keys) > 0 {
573 key = keys[0] // Only the first key is relevant
574 }
575 kafkaMsg := &sarama.ProducerMessage{
576 Topic: topic.Name,
577 Key: sarama.StringEncoder(key),
578 Value: sarama.ByteEncoder(marshalled),
579 }
580
581 // Send message to kafka
582 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500583 // Wait for result
584 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
585 select {
586 case ok := <-sc.producer.Successes():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800587 logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
Scott Bakeree6a0872019-10-29 15:59:52 -0700588 sc.updateLiveness(true)
khenaidoo90847922018-12-03 14:47:51 -0500589 case notOk := <-sc.producer.Errors():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800590 logger.Debugw("error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000591 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700592 sc.updateLiveness(false)
593 }
594 return notOk
595 }
596 return nil
597}
598
599// Enable the liveness monitor channel. This channel will report
600// a "true" or "false" on every publish, which indicates whether
601// or not the channel is still live. This channel is then picked up
602// by the service (i.e. rw_core / ro_core) to update readiness status
603// and/or take other actions.
604func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800605 logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Bakeree6a0872019-10-29 15:59:52 -0700606 if enable {
607 if sc.liveness == nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800608 logger.Info("kafka-create-liveness-channel")
Scott Bakeree6a0872019-10-29 15:59:52 -0700609 // At least 1, so we can immediately post to it without blocking
610 // Setting a bigger number (10) allows the monitor to fall behind
611 // without blocking others. The monitor shouldn't really fall
612 // behind...
613 sc.liveness = make(chan bool, 10)
614 // post intial state to the channel
615 sc.liveness <- sc.alive
616 }
617 } else {
618 // TODO: Think about whether we need the ability to turn off
619 // liveness monitoring
620 panic("Turning off liveness reporting is not supported")
621 }
622 return sc.liveness
623}
624
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800625// Enable the Healthiness monitor channel. This channel will report "false"
626// if the kafka consumers die, or some other problem occurs which is
627// catastrophic that would require re-creating the client.
628func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
629 logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
630 if enable {
631 if sc.healthiness == nil {
632 logger.Info("kafka-create-healthiness-channel")
633 // At least 1, so we can immediately post to it without blocking
634 // Setting a bigger number (10) allows the monitor to fall behind
635 // without blocking others. The monitor shouldn't really fall
636 // behind...
637 sc.healthiness = make(chan bool, 10)
638 // post intial state to the channel
639 sc.healthiness <- sc.healthy
640 }
641 } else {
642 // TODO: Think about whether we need the ability to turn off
643 // liveness monitoring
644 panic("Turning off healthiness reporting is not supported")
645 }
646 return sc.healthiness
647}
648
Scott Bakeree6a0872019-10-29 15:59:52 -0700649// send an empty message on the liveness channel to check whether connectivity has
650// been restored.
651func (sc *SaramaClient) SendLiveness() error {
652 if !sc.started {
653 return fmt.Errorf("SendLiveness() called while not started")
654 }
655
656 kafkaMsg := &sarama.ProducerMessage{
657 Topic: "_liveness_test",
658 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
659 }
660
661 // Send message to kafka
662 sc.producer.Input() <- kafkaMsg
663 // Wait for result
664 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
665 select {
666 case ok := <-sc.producer.Successes():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800667 logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
Scott Bakeree6a0872019-10-29 15:59:52 -0700668 sc.updateLiveness(true)
669 case notOk := <-sc.producer.Errors():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800670 logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
Devmalya Paulc594bb32019-11-06 07:34:27 +0000671 if sc.isLivenessError(notOk) {
Scott Bakeree6a0872019-10-29 15:59:52 -0700672 sc.updateLiveness(false)
673 }
khenaidoo90847922018-12-03 14:47:51 -0500674 return notOk
675 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500676 return nil
677}
678
khenaidooca301322019-01-09 23:06:32 -0500679// getGroupId returns the group id from the key-value args.
680func getGroupId(kvArgs ...*KVArg) string {
681 for _, arg := range kvArgs {
682 if arg.Key == GroupIdKey {
683 return arg.Value.(string)
684 }
685 }
686 return ""
687}
688
khenaidoo731697e2019-01-29 16:03:29 -0500689// getOffset returns the offset from the key-value args.
690func getOffset(kvArgs ...*KVArg) int64 {
691 for _, arg := range kvArgs {
692 if arg.Key == Offset {
693 return arg.Value.(int64)
694 }
695 }
696 return sarama.OffsetNewest
697}
698
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500699func (sc *SaramaClient) createClusterAdmin() error {
700 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
701 config := sarama.NewConfig()
702 config.Version = sarama.V1_0_0_0
703
704 // Create a cluster Admin
705 var cAdmin sarama.ClusterAdmin
706 var err error
707 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800708 logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500709 return err
710 }
711 sc.cAdmin = cAdmin
712 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500713}
714
khenaidood2b6df92018-12-13 16:37:20 -0500715func (sc *SaramaClient) lockTopic(topic *Topic) {
716 sc.lockOfTopicLockMap.Lock()
717 if _, exist := sc.topicLockMap[topic.Name]; exist {
718 sc.lockOfTopicLockMap.Unlock()
719 sc.topicLockMap[topic.Name].Lock()
720 } else {
721 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
722 sc.lockOfTopicLockMap.Unlock()
723 sc.topicLockMap[topic.Name].Lock()
724 }
725}
726
727func (sc *SaramaClient) unLockTopic(topic *Topic) {
728 sc.lockOfTopicLockMap.Lock()
729 defer sc.lockOfTopicLockMap.Unlock()
730 if _, exist := sc.topicLockMap[topic.Name]; exist {
731 sc.topicLockMap[topic.Name].Unlock()
732 }
733}
734
khenaidoo43c82122018-11-22 18:38:28 -0500735func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
739 sc.topicToConsumerChannelMap[id] = arg
740 }
741}
742
743func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
744 sc.lockTopicToConsumerChannelMap.Lock()
745 defer sc.lockTopicToConsumerChannelMap.Unlock()
746 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
747 delete(sc.topicToConsumerChannelMap, id)
748 }
749}
750
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500751func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400752 sc.lockTopicToConsumerChannelMap.RLock()
753 defer sc.lockTopicToConsumerChannelMap.RUnlock()
khenaidoo43c82122018-11-22 18:38:28 -0500754
755 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
756 return consumerCh
757 }
758 return nil
759}
760
khenaidoo79232702018-12-04 11:00:41 -0500761func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500762 sc.lockTopicToConsumerChannelMap.Lock()
763 defer sc.lockTopicToConsumerChannelMap.Unlock()
764 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
765 consumerCh.channels = append(consumerCh.channels, ch)
766 return
767 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800768 logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500769}
770
771//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
772func closeConsumers(consumers []interface{}) error {
773 var err error
774 for _, consumer := range consumers {
775 // Is it a partition consumers?
776 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
777 if errTemp := partionConsumer.Close(); errTemp != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800778 logger.Debugw("partition!!!", log.Fields{"err": errTemp})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500779 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
780 // This can occur on race condition
781 err = nil
782 } else {
783 err = errTemp
784 }
785 }
786 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
787 if errTemp := groupConsumer.Close(); errTemp != nil {
788 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
789 // This can occur on race condition
790 err = nil
791 } else {
792 err = errTemp
793 }
794 }
795 }
796 }
797 return err
khenaidoo43c82122018-11-22 18:38:28 -0500798}
799
khenaidoo79232702018-12-04 11:00:41 -0500800func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500801 sc.lockTopicToConsumerChannelMap.Lock()
802 defer sc.lockTopicToConsumerChannelMap.Unlock()
803 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
804 // Channel will be closed in the removeChannel method
805 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500806 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500807 if len(consumerCh.channels) == 0 {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800808 logger.Debugw("closing-consumers", log.Fields{"topic": topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500809 err := closeConsumers(consumerCh.consumers)
810 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500811 delete(sc.topicToConsumerChannelMap, topic.Name)
812 return err
813 }
814 return nil
815 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800816 logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500817 return errors.New("topic-does-not-exist")
818}
819
820func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
821 sc.lockTopicToConsumerChannelMap.Lock()
822 defer sc.lockTopicToConsumerChannelMap.Unlock()
823 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
824 for _, ch := range consumerCh.channels {
825 // Channel will be closed in the removeChannel method
826 removeChannel(consumerCh.channels, ch)
827 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500828 err := closeConsumers(consumerCh.consumers)
829 //if err == sarama.ErrUnknownTopicOrPartition {
830 // // Not an error
831 // err = nil
832 //}
833 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500834 delete(sc.topicToConsumerChannelMap, topic.Name)
835 return err
836 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800837 logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500838 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500839}
840
841func (sc *SaramaClient) clearConsumerChannelMap() error {
842 sc.lockTopicToConsumerChannelMap.Lock()
843 defer sc.lockTopicToConsumerChannelMap.Unlock()
844 var err error
845 for topic, consumerCh := range sc.topicToConsumerChannelMap {
846 for _, ch := range consumerCh.channels {
847 // Channel will be closed in the removeChannel method
848 removeChannel(consumerCh.channels, ch)
849 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500850 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
851 err = errTemp
852 }
853 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500854 delete(sc.topicToConsumerChannelMap, topic)
855 }
856 return err
857}
858
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500859//createPublisher creates the publisher which is used to send a message onto kafka
860func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500861 // This Creates the publisher
862 config := sarama.NewConfig()
863 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500864 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
865 config.Producer.Flush.Messages = sc.producerFlushMessages
866 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
867 config.Producer.Return.Errors = sc.producerReturnErrors
868 config.Producer.Return.Successes = sc.producerReturnSuccess
869 //config.Producer.RequiredAcks = sarama.WaitForAll
870 config.Producer.RequiredAcks = sarama.WaitForLocal
871
khenaidoo43c82122018-11-22 18:38:28 -0500872 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
873 brokers := []string{kafkaFullAddr}
874
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500875 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800876 logger.Errorw("error-starting-publisher", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500877 return err
878 } else {
879 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500880 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800881 logger.Info("Kafka-publisher-created")
khenaidoo43c82122018-11-22 18:38:28 -0500882 return nil
883}
884
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500885func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500886 config := sarama.NewConfig()
887 config.Consumer.Return.Errors = true
888 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500889 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
890 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500891 config.Consumer.Offsets.Initial = sarama.OffsetNewest
Abhilash S.L294ff522019-06-26 18:14:33 +0530892 config.Metadata.Retry.Max = sc.metadataMaxRetry
khenaidoo43c82122018-11-22 18:38:28 -0500893 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
894 brokers := []string{kafkaFullAddr}
895
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500896 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800897 logger.Errorw("error-starting-consumers", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500898 return err
899 } else {
900 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500901 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800902 logger.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500903 return nil
904}
905
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500906// createGroupConsumer creates a consumers group
khenaidoo731697e2019-01-29 16:03:29 -0500907func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500908 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500909 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500910 config.Group.Mode = scc.ConsumerModeMultiplex
Scott Bakeree6a0872019-10-29 15:59:52 -0700911 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
912 config.Consumer.Return.Errors = true
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500913 //config.Group.Return.Notifications = false
914 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
915 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo731697e2019-01-29 16:03:29 -0500916 config.Consumer.Offsets.Initial = initialOffset
khenaidooca301322019-01-09 23:06:32 -0500917 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500918 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
919 brokers := []string{kafkaFullAddr}
920
khenaidoo43c82122018-11-22 18:38:28 -0500921 topics := []string{topic.Name}
922 var consumer *scc.Consumer
923 var err error
924
khenaidooca301322019-01-09 23:06:32 -0500925 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800926 logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500927 return nil, err
928 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800929 logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo7ff26c72019-01-16 14:55:48 -0500930
931 //sc.groupConsumers[topic.Name] = consumer
932 sc.addToGroupConsumers(topic.Name, consumer)
khenaidoo43c82122018-11-22 18:38:28 -0500933 return consumer, nil
934}
935
khenaidoo43c82122018-11-22 18:38:28 -0500936// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
khenaidoo731697e2019-01-29 16:03:29 -0500937// topic via the unique channel each subscriber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500938func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500939 // Need to go over all channels and publish messages to them - do we need to copy msg?
khenaidoo1ce37ad2019-03-24 22:07:24 -0400940 sc.lockTopicToConsumerChannelMap.RLock()
khenaidoo43c82122018-11-22 18:38:28 -0500941 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500942 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500943 c <- protoMessage
944 }(ch)
945 }
npujar467fe752020-01-16 20:17:45 +0530946 sc.lockTopicToConsumerChannelMap.RUnlock()
947
948 if callback := sc.metadataCallback; callback != nil {
949 callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
950 }
khenaidoo43c82122018-11-22 18:38:28 -0500951}
952
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500953func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800954 logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500955startloop:
956 for {
957 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500958 case err, ok := <-consumer.Errors():
959 if ok {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500960 if sc.isLivenessError(err) {
961 sc.updateLiveness(false)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800962 logger.Warnw("partition-consumers-error", log.Fields{"error": err})
khenaidoo6e55d9e2019-12-12 18:26:26 -0500963 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500964 } else {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500965 // Channel is closed
966 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500967 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500968 case msg, ok := <-consumer.Messages():
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800969 //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500970 if !ok {
971 // channel is closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500972 break startloop
973 }
974 msgBody := msg.Value
khenaidoo6e55d9e2019-12-12 18:26:26 -0500975 sc.updateLiveness(true)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800976 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo79232702018-12-04 11:00:41 -0500977 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500978 if err := proto.Unmarshal(msgBody, icm); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800979 logger.Warnw("partition-invalid-message", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500980 continue
981 }
982 go sc.dispatchToConsumers(consumerChnls, icm)
983 case <-sc.doneCh:
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800984 logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500985 break startloop
986 }
987 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800988 logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
989 sc.setUnhealthy()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500990}
991
992func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800993 logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500994
995startloop:
996 for {
997 select {
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500998 case err, ok := <-consumer.Errors():
999 if ok {
Devmalya Paulc594bb32019-11-06 07:34:27 +00001000 if sc.isLivenessError(err) {
1001 sc.updateLiveness(false)
1002 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001003 logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001004 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001005 logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -05001006 // channel is closed
1007 break startloop
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001008 }
khenaidoo3dfc8bc2019-01-10 16:48:25 -05001009 case msg, ok := <-consumer.Messages():
1010 if !ok {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001011 logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
khenaidoo3dfc8bc2019-01-10 16:48:25 -05001012 // Channel closed
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001013 break startloop
1014 }
Scott Bakeree6a0872019-10-29 15:59:52 -07001015 sc.updateLiveness(true)
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001016 logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -05001017 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -05001018 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -05001019 if err := proto.Unmarshal(msgBody, icm); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001020 logger.Warnw("invalid-message", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -05001021 continue
1022 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001023 go sc.dispatchToConsumers(consumerChnls, icm)
1024 consumer.MarkOffset(msg, "")
1025 case ntf := <-consumer.Notifications():
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001026 logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -05001027 case <-sc.doneCh:
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001028 logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001029 break startloop
1030 }
1031 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001032 logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
1033 sc.setUnhealthy()
khenaidoo43c82122018-11-22 18:38:28 -05001034}
1035
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001036func (sc *SaramaClient) startConsumers(topic *Topic) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001037 logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001038 var consumerCh *consumerChannels
1039 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001040 logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001041 return errors.New("consumers-not-exist")
1042 }
1043 // For each consumer listening for that topic, start a consumption loop
1044 for _, consumer := range consumerCh.consumers {
1045 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
1046 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
1047 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
1048 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
1049 } else {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001050 logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001051 return errors.New("invalid-consumer")
1052 }
1053 }
1054 return nil
1055}
1056
1057//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1058//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -05001059func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001060 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -05001061 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001062
khenaidoo7ff26c72019-01-16 14:55:48 -05001063 if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001064 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001065 return nil, err
1066 }
1067
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001068 consumersIf := make([]interface{}, 0)
1069 for _, pConsumer := range pConsumers {
1070 consumersIf = append(consumersIf, pConsumer)
1071 }
1072
1073 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -05001074 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001075 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -05001076 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001077 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -05001078 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -05001079 }
1080
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001081 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -05001082 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1083
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001084 //Start a consumers to listen on that specific topic
1085 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -05001086
1087 return consumerListeningChannel, nil
1088}
1089
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001090// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1091// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo731697e2019-01-29 16:03:29 -05001092func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001093 // TODO: Replace this development partition consumers with a group consumers
1094 var pConsumer *scc.Consumer
1095 var err error
khenaidoo731697e2019-01-29 16:03:29 -05001096 if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001097 logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001098 return nil, err
1099 }
1100 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1101 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -05001102 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001103 cc := &consumerChannels{
1104 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -05001105 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001106 }
1107
1108 // Add the consumers channel to the map
1109 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1110
1111 //Start a consumers to listen on that specific topic
1112 go sc.startConsumers(topic)
1113
1114 return consumerListeningChannel, nil
1115}
1116
khenaidoo7ff26c72019-01-16 14:55:48 -05001117func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001118 logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001119 partitionList, err := sc.consumer.Partitions(topic.Name)
1120 if err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001121 logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -05001122 return nil, err
1123 }
1124
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001125 pConsumers := make([]sarama.PartitionConsumer, 0)
1126 for _, partition := range partitionList {
1127 var pConsumer sarama.PartitionConsumer
1128 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001129 logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001130 return nil, err
1131 }
1132 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -05001133 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -05001134 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -05001135}
1136
khenaidoo79232702018-12-04 11:00:41 -05001137func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -05001138 var i int
khenaidoo79232702018-12-04 11:00:41 -05001139 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -05001140 for i, channel = range channels {
1141 if channel == ch {
1142 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1143 close(channel)
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001144 logger.Debug("channel-closed")
khenaidoo43c82122018-11-22 18:38:28 -05001145 return channels[:len(channels)-1]
1146 }
1147 }
1148 return channels
1149}
khenaidoo7ff26c72019-01-16 14:55:48 -05001150
khenaidoo7ff26c72019-01-16 14:55:48 -05001151func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
1152 sc.lockOfGroupConsumers.Lock()
1153 defer sc.lockOfGroupConsumers.Unlock()
1154 if _, exist := sc.groupConsumers[topic]; !exist {
1155 sc.groupConsumers[topic] = consumer
1156 }
1157}
1158
1159func (sc *SaramaClient) deleteFromGroupConsumers(topic string) error {
1160 sc.lockOfGroupConsumers.Lock()
1161 defer sc.lockOfGroupConsumers.Unlock()
1162 if _, exist := sc.groupConsumers[topic]; exist {
1163 consumer := sc.groupConsumers[topic]
1164 delete(sc.groupConsumers, topic)
khenaidoo2c6a0992019-04-29 13:46:56 -04001165 if err := consumer.Close(); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -08001166 logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
khenaidoo7ff26c72019-01-16 14:55:48 -05001167 return err
1168 }
1169 }
1170 return nil
1171}