blob: eed0588cd7e1a5cfea872b9ca448d75098a0656f [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
21 "github.com/Shopify/sarama"
22 scc "github.com/bsm/sarama-cluster"
23 "github.com/golang/protobuf/proto"
24 "github.com/opencord/voltha-go/common/log"
25 ca "github.com/opencord/voltha-go/protos/core_adapter"
26 "sync"
27 "time"
28)
29
30// consumerChannels represents a consumer listening on a kafka topic. Once it receives a message on that
31// topic it broadcasts the message to all the listening channels
32type consumerChannels struct {
33 consumer sarama.PartitionConsumer
34 //consumer *sc.Consumer
35 channels []chan *ca.InterContainerMessage
36}
37
38// SaramaClient represents the messaging proxy
39type SaramaClient struct {
40 broker *sarama.Broker
41 client sarama.Client
42 KafkaHost string
43 KafkaPort int
44 producer sarama.AsyncProducer
45 consumer sarama.Consumer
46 groupConsumer *scc.Consumer
47 doneCh chan int
48 topicToConsumerChannelMap map[string]*consumerChannels
49 lockTopicToConsumerChannelMap sync.RWMutex
50}
51
52type SaramaClientOption func(*SaramaClient)
53
54func Host(host string) SaramaClientOption {
55 return func(args *SaramaClient) {
56 args.KafkaHost = host
57 }
58}
59
60func Port(port int) SaramaClientOption {
61 return func(args *SaramaClient) {
62 args.KafkaPort = port
63 }
64}
65
66func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
67 client := &SaramaClient{
68 KafkaHost: DefaultKafkaHost,
69 KafkaPort: DefaultKafkaPort,
70 }
71
72 for _, option := range opts {
73 option(client)
74 }
75
76 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
77
78 return client
79}
80
81func (sc *SaramaClient) Start(retries int) error {
82 log.Info("Starting-Proxy")
83
84 // Create the Done channel
85 sc.doneCh = make(chan int, 1)
86
87 // Create the Publisher
88 if err := sc.createPublisher(retries); err != nil {
89 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
90 return err
91 }
92
93 // Create the master consumer
94 if err := sc.createConsumer(retries); err != nil {
95 log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
96 return err
97 }
98
99 // Create the topic to consumer/channel map
100 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
101
102 return nil
103}
104
105func (sc *SaramaClient) Stop() {
106 log.Info("stopping-sarama-client")
107
108 //Send a message over the done channel to close all long running routines
109 sc.doneCh <- 1
110
111 // Clear the consumer map
112 //sc.clearConsumerChannelMap()
113
114 if sc.producer != nil {
115 if err := sc.producer.Close(); err != nil {
116 panic(err)
117 }
118 }
119 if sc.consumer != nil {
120 if err := sc.consumer.Close(); err != nil {
121 panic(err)
122 }
123 }
124}
125
126func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
127 sc.lockTopicToConsumerChannelMap.Lock()
128 defer sc.lockTopicToConsumerChannelMap.Unlock()
129 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
130 sc.topicToConsumerChannelMap[id] = arg
131 }
132}
133
134func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
135 sc.lockTopicToConsumerChannelMap.Lock()
136 defer sc.lockTopicToConsumerChannelMap.Unlock()
137 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
138 delete(sc.topicToConsumerChannelMap, id)
139 }
140}
141
142func (sc *SaramaClient) getConsumerChannel(topic Topic) *consumerChannels {
143 sc.lockTopicToConsumerChannelMap.Lock()
144 defer sc.lockTopicToConsumerChannelMap.Unlock()
145
146 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
147 return consumerCh
148 }
149 return nil
150}
151
152func (sc *SaramaClient) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
153 sc.lockTopicToConsumerChannelMap.Lock()
154 defer sc.lockTopicToConsumerChannelMap.Unlock()
155 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
156 consumerCh.channels = append(consumerCh.channels, ch)
157 return
158 }
159 log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
160}
161
162func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
163 sc.lockTopicToConsumerChannelMap.Lock()
164 defer sc.lockTopicToConsumerChannelMap.Unlock()
165 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
166 // Channel will be closed in the removeChannel method
167 consumerCh.channels = removeChannel(consumerCh.channels, ch)
168 // If there are no more channels then we can close the consumer itself
169 if len(consumerCh.channels) == 0 {
170 log.Debugw("closing-consumer", log.Fields{"topic": topic})
171 err := consumerCh.consumer.Close()
172 delete(sc.topicToConsumerChannelMap, topic.Name)
173 return err
174 }
175 return nil
176 }
177 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
178 return errors.New("topic-does-not-exist")
179}
180
181func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
182 sc.lockTopicToConsumerChannelMap.Lock()
183 defer sc.lockTopicToConsumerChannelMap.Unlock()
184 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
185 for _, ch := range consumerCh.channels {
186 // Channel will be closed in the removeChannel method
187 removeChannel(consumerCh.channels, ch)
188 }
189 err := consumerCh.consumer.Close()
190 delete(sc.topicToConsumerChannelMap, topic.Name)
191 return err
192 }
193 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
194 return errors.New("topic-does-not-exist")
195}
196
197func (sc *SaramaClient) clearConsumerChannelMap() error {
198 sc.lockTopicToConsumerChannelMap.Lock()
199 defer sc.lockTopicToConsumerChannelMap.Unlock()
200 var err error
201 for topic, consumerCh := range sc.topicToConsumerChannelMap {
202 for _, ch := range consumerCh.channels {
203 // Channel will be closed in the removeChannel method
204 removeChannel(consumerCh.channels, ch)
205 }
206 err = consumerCh.consumer.Close()
207 delete(sc.topicToConsumerChannelMap, topic)
208 }
209 return err
210}
211
212func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error {
213 // This Creates the kafka topic
214 // Set broker configuration
215 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
216 broker := sarama.NewBroker(kafkaFullAddr)
217
218 // Additional configurations. Check sarama doc for more info
219 config := sarama.NewConfig()
220 config.Version = sarama.V1_0_0_0
221
222 // Open broker connection with configs defined above
223 broker.Open(config)
224
225 // check if the connection was OK
226 _, err := broker.Connected()
227 if err != nil {
228 return err
229 }
230
231 topicDetail := &sarama.TopicDetail{}
232 topicDetail.NumPartitions = int32(numPartition)
233 topicDetail.ReplicationFactor = int16(repFactor)
234 topicDetail.ConfigEntries = make(map[string]*string)
235
236 topicDetails := make(map[string]*sarama.TopicDetail)
237 topicDetails[topic.Name] = topicDetail
238
239 request := sarama.CreateTopicsRequest{
240 Timeout: time.Second * 1,
241 TopicDetails: topicDetails,
242 }
243
244 for {
245 // Send request to Broker
246 if response, err := broker.CreateTopics(&request); err != nil {
247 if retries == 0 {
248 log.Errorw("error-creating-topic", log.Fields{"error": err})
249 return err
250 } else {
251 // If retries is -ve then we will retry indefinitely
252 retries--
253 }
254 log.Debug("retrying-after-a-second-delay")
255 time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
256 } else {
257 log.Debug("topic-response", log.Fields{"response": response})
258 break
259 }
260 }
261
262 log.Debug("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
263 return nil
264}
265
266func (sc *SaramaClient) createPublisher(retries int) error {
267 // This Creates the publisher
268 config := sarama.NewConfig()
269 config.Producer.Partitioner = sarama.NewRandomPartitioner
270 config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
271 config.Producer.Flush.Messages = DefaultFlushMessages
272 config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
273 config.Producer.Return.Errors = DefaultReturnErrors
274 config.Producer.Return.Successes = DefaultReturnSuccess
275 config.Producer.RequiredAcks = sarama.WaitForAll
276 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
277 brokers := []string{kafkaFullAddr}
278
279 for {
280 producer, err := sarama.NewAsyncProducer(brokers, config)
281 if err != nil {
282 if retries == 0 {
283 log.Errorw("error-starting-publisher", log.Fields{"error": err})
284 return err
285 } else {
286 // If retries is -ve then we will retry indefinitely
287 retries--
288 }
289 log.Info("retrying-after-a-second-delay")
290 time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
291 } else {
292 sc.producer = producer
293 break
294 }
295 }
296 log.Info("Kafka-publisher-created")
297 return nil
298}
299
300func (sc *SaramaClient) createConsumer(retries int) error {
301 config := sarama.NewConfig()
302 config.Consumer.Return.Errors = true
303 config.Consumer.Fetch.Min = 1
304 config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
305 config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
306 config.Consumer.Offsets.Initial = sarama.OffsetNewest
307 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
308 brokers := []string{kafkaFullAddr}
309
310 for {
311 consumer, err := sarama.NewConsumer(brokers, config)
312 if err != nil {
313 if retries == 0 {
314 log.Errorw("error-starting-consumer", log.Fields{"error": err})
315 return err
316 } else {
317 // If retries is -ve then we will retry indefinitely
318 retries--
319 }
320 log.Info("retrying-after-a-second-delay")
321 time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
322 } else {
323 sc.consumer = consumer
324 break
325 }
326 }
327 log.Info("Kafka-consumer-created")
328 return nil
329}
330
331// createGroupConsumer creates a consumer group
332func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
333 config := scc.NewConfig()
334 config.Group.Mode = scc.ConsumerModeMultiplex
335 config.Consumer.Return.Errors = true
336 config.Group.Return.Notifications = true
337 config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
338 config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
339 config.Consumer.Offsets.Initial = sarama.OffsetNewest
340 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
341 brokers := []string{kafkaFullAddr}
342
343 if groupId == nil {
344 g := DefaultGroupName
345 groupId = &g
346 }
347 topics := []string{topic.Name}
348 var consumer *scc.Consumer
349 var err error
350
351 // Create the topic with default attributes
352 // TODO: needs to be revisited
353 //sc.CreateTopic(&Topic{Name:topic.Name}, 3, 1, 1)
354
355 if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
356 log.Errorw("create-consumer-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
357 return nil, err
358 }
359 log.Debugw("create-consumer-success", log.Fields{"topic": topic.Name, "groupId": groupId})
360 //time.Sleep(10*time.Second)
361 sc.groupConsumer = consumer
362 return consumer, nil
363}
364
365// send formats and sends the request onto the kafka messaging bus.
366func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) {
367
368 // Assert message is a proto message
369 var protoMsg proto.Message
370 var ok bool
371 // ascertain the value interface type is a proto.Message
372 if protoMsg, ok = msg.(proto.Message); !ok {
373 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
374 return
375 }
376
377 // Create the Sarama producer message
378 marshalled, _ := proto.Marshal(protoMsg)
379 key := ""
380 if len(keys) > 0 {
381 key = keys[0] // Only the first key is relevant
382 }
383 kafkaMsg := &sarama.ProducerMessage{
384 Topic: topic.Name,
385 Key: sarama.StringEncoder(key),
386 Value: sarama.ByteEncoder(marshalled),
387 }
388
389 // Send message to kafka
390 sc.producer.Input() <- kafkaMsg
391}
392
393// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
394// messages from that topic
395func (sc *SaramaClient) Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error) {
396 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
397
398 // If a consumer already exist for that topic then resuse it
399 if consumerCh := sc.getConsumerChannel(*topic); consumerCh != nil {
400 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
401 // Create a channel specific for that consumer and add it to the consumer channel map
402 ch := make(chan *ca.InterContainerMessage)
403 sc.addChannelToConsumerChannelMap(*topic, ch)
404 return ch, nil
405 }
406
407 // Register for the topic and set it up
408 var consumerListeningChannel chan *ca.InterContainerMessage
409 var err error
410 if consumerListeningChannel, err = sc.setupConsumerChannel(topic); err != nil {
411 log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
412 return nil, err
413 }
414
415 return consumerListeningChannel, nil
416}
417
418// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
419// topic via the unique channel each subsciber received during subscription
420func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
421 // Need to go over all channels and publish messages to them - do we need to copy msg?
422 sc.lockTopicToConsumerChannelMap.Lock()
423 defer sc.lockTopicToConsumerChannelMap.Unlock()
424 for _, ch := range consumerCh.channels {
425 go func(c chan *ca.InterContainerMessage) {
426 c <- protoMessage
427 }(ch)
428 }
429}
430
431func (sc *SaramaClient) consumeMessagesLoop(topic Topic) {
432 log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
433 var consumerCh *consumerChannels
434 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
435 log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
436 return
437 }
438startloop:
439 for {
440 select {
441 case err := <-consumerCh.consumer.Errors():
442 log.Warnw("consumer-error", log.Fields{"error": err})
443 case msg := <-consumerCh.consumer.Messages():
444 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
445 // Since the only expected message is a proto intercontainermessage then extract it right away
446 // instead of dispatching it to the consumers
447 msgBody := msg.Value
448 icm := &ca.InterContainerMessage{}
449 if err := proto.Unmarshal(msgBody, icm); err != nil {
450 log.Warnw("invalid-message", log.Fields{"error": err})
451 continue
452 }
453 go sc.dispatchToConsumers(consumerCh, icm)
454
455 //consumerCh.consumer.MarkOffset(msg, "")
456 //// TODO: Dispatch requests and responses separately
457 case <-sc.doneCh:
458 log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
459 break startloop
460 }
461 }
462 log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
463}
464
465// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
466// for that topic. It also starts the routine that listens for messages on that topic.
467func (sc *SaramaClient) setupConsumerChannel(topic *Topic) (chan *ca.InterContainerMessage, error) {
468 // TODO: Replace this development partition consumer with a group consumer
469 var pConsumer *sarama.PartitionConsumer
470 var err error
471 if pConsumer, err = sc.CreatePartionConsumer(topic, DefaultMaxRetries); err != nil {
472 log.Errorw("creating-partition-consumer-failure", log.Fields{"error": err, "topic": topic.Name})
473 return nil, err
474 }
475
476 // Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
477 // unbuffered to verify race conditions.
478 consumerListeningChannel := make(chan *ca.InterContainerMessage)
479 cc := &consumerChannels{
480 consumer: *pConsumer,
481 channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
482 }
483
484 // Add the consumer channel to the map
485 sc.addTopicToConsumerChannelMap(topic.Name, cc)
486
487 //Start a consumer to listen on that specific topic
488 go sc.consumeMessagesLoop(*topic)
489
490 return consumerListeningChannel, nil
491}
492
493func (sc *SaramaClient) CreatePartionConsumer(topic *Topic, retries int) (*sarama.PartitionConsumer, error) {
494 log.Debugw("creating-partition-consumer", log.Fields{"topic": topic.Name})
495 partitionList, err := sc.consumer.Partitions(topic.Name)
496 if err != nil {
497 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
498 return nil, err
499 }
500
501 log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
502 // Create a partition consumer for that topic - for now just use one partition
503 var pConsumer sarama.PartitionConsumer
504 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
505 log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
506 return nil, err
507 }
508 log.Debugw("partition-consumer-created", log.Fields{"topic": topic.Name})
509 return &pConsumer, nil
510}
511
512func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
513 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
514 err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
515 return err
516}
517
518func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
519 var i int
520 var channel chan *ca.InterContainerMessage
521 for i, channel = range channels {
522 if channel == ch {
523 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
524 close(channel)
525 return channels[:len(channels)-1]
526 }
527 }
528 return channels
529}
530
531func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
532 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
533 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
534 return err
535 }
536 return nil
537}