blob: 9ee01726903286737189a97f215ff52ed51fc7e7 [file] [log] [blame]
Humera Kousera4442952020-11-23 23:51:19 +05301/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package dmiserver
18
19import (
20 "context"
21 "encoding/json"
22 "strconv"
23 "time"
24
25 "github.com/Shopify/sarama"
26 log "github.com/sirupsen/logrus"
27)
28
29var metricsProducer sarama.AsyncProducer
30
31// InitializeDMKafkaPublishers initializes metrics kafka publisher
32func InitializeDMKafkaPublishers(NewAsyncProducer func([]string, *sarama.Config) (sarama.AsyncProducer, error), oltID int, msgBusEndPoint string) error {
33 var err error
34 sarama.Logger = log.New()
35 // producer config
36 config := sarama.NewConfig()
37 config.Producer.Retry.Max = 5
38 config.Metadata.Retry.Max = 10
39 config.Metadata.Retry.Backoff = 10 * time.Second
40 config.ClientID = "BBSim-OLT-Metrics-" + strconv.Itoa(oltID)
41
42 metricsProducer, err = NewAsyncProducer([]string{msgBusEndPoint}, config)
43 return err
44}
45
46// DMKafkaPublisher receives messages on ch and publish them to kafka on topic
47func DMKafkaPublisher(ctx context.Context, ch chan interface{}, topic string) {
48 defer log.Debugf("DMKafkaPublisher stopped")
49loop:
50 for {
51 select {
52 case metric := <-ch:
53 log.Tracef("Writing to kafka topic(%s): %v", topic, metric)
54 jsonMet, err := json.Marshal(metric)
55 if err != nil {
56 log.Errorf("Failed to get json metric %v", err)
57 continue
58 }
59 metricsProducer.Input() <- &sarama.ProducerMessage{
60 Topic: topic,
61 Value: sarama.ByteEncoder(jsonMet),
62 }
63 case <-ctx.Done():
64 log.Infof("Stopping DM Kafka Publisher for topic %s", topic)
65 break loop
66 }
67 }
68}