blob: 4248df2a1ff16c184be21a90e95465b38cffb574 [file] [log] [blame]
Humera Kousera4442952020-11-23 23:51:19 +05301/*
Joey Armstrong14628cd2023-01-10 08:38:31 -05002 * Copyright 2018-2023 Open Networking Foundation (ONF) and the ONF Contributors
Humera Kousera4442952020-11-23 23:51:19 +05303
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package dmiserver
18
19import (
20 "context"
21 "encoding/json"
22 "strconv"
23 "time"
24
25 "github.com/Shopify/sarama"
26 log "github.com/sirupsen/logrus"
27)
28
hkouser24361d42020-12-14 19:21:47 +053029var producer sarama.AsyncProducer
Humera Kousera4442952020-11-23 23:51:19 +053030
31// InitializeDMKafkaPublishers initializes metrics kafka publisher
32func InitializeDMKafkaPublishers(NewAsyncProducer func([]string, *sarama.Config) (sarama.AsyncProducer, error), oltID int, msgBusEndPoint string) error {
33 var err error
34 sarama.Logger = log.New()
35 // producer config
36 config := sarama.NewConfig()
37 config.Producer.Retry.Max = 5
38 config.Metadata.Retry.Max = 10
39 config.Metadata.Retry.Backoff = 10 * time.Second
hkouser24361d42020-12-14 19:21:47 +053040 config.ClientID = "BBSim-OLT-DMIServer-" + strconv.Itoa(oltID)
Humera Kousera4442952020-11-23 23:51:19 +053041
hkouser24361d42020-12-14 19:21:47 +053042 producer, err = NewAsyncProducer([]string{msgBusEndPoint}, config)
Humera Kousera4442952020-11-23 23:51:19 +053043 return err
44}
45
46// DMKafkaPublisher receives messages on ch and publish them to kafka on topic
47func DMKafkaPublisher(ctx context.Context, ch chan interface{}, topic string) {
48 defer log.Debugf("DMKafkaPublisher stopped")
49loop:
50 for {
51 select {
hkouser24361d42020-12-14 19:21:47 +053052 case data := <-ch:
53 log.Tracef("Writing to kafka topic(%s): %v", topic, data)
54 jsonData, err := json.Marshal(data)
Humera Kousera4442952020-11-23 23:51:19 +053055 if err != nil {
hkouser24361d42020-12-14 19:21:47 +053056 log.Errorf("Failed to get json %v", err)
Humera Kousera4442952020-11-23 23:51:19 +053057 continue
58 }
hkouser24361d42020-12-14 19:21:47 +053059 producer.Input() <- &sarama.ProducerMessage{
Humera Kousera4442952020-11-23 23:51:19 +053060 Topic: topic,
hkouser24361d42020-12-14 19:21:47 +053061 Value: sarama.ByteEncoder(jsonData),
Humera Kousera4442952020-11-23 23:51:19 +053062 }
63 case <-ctx.Done():
64 log.Infof("Stopping DM Kafka Publisher for topic %s", topic)
65 break loop
66 }
67 }
68}