blob: 38c6cc1e34c02cff9e5e636fe1475dfb4665eb33 [file] [log] [blame]
Prince Pereirac1c21d62021-04-22 08:38:15 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// Package msgbus holds messagebus related util functions
18package msgbus
19
20import (
21 "context"
22 "encoding/json"
23 "errors"
24 "time"
25
26 "github.com/opencord/device-management-interface/go/dmi"
27
28 "github.com/Shopify/sarama"
29 "github.com/opencord/opendevice-manager/pkg/config"
30
31 "github.com/opencord/voltha-lib-go/v4/pkg/log"
32)
33
34var kafkaProducer sarama.SyncProducer
35
36// logger represents the log object
37var logger log.CLogger
38
39// init function for the package
40func init() {
41 logger = config.Initlog()
42}
43
44// InitMsgbusProducer initialises producer for kafka msgbus
45func InitMsgbusProducer(ctx context.Context) error {
46 cf := config.NewCoreFlags()
47 saramaConfig := sarama.NewConfig()
48 saramaConfig.Producer.Retry.Max = 6
49 saramaConfig.Producer.Retry.Backoff = time.Millisecond * 30
50 saramaConfig.Producer.Return.Successes = true
51 saramaConfig.Producer.Return.Errors = true
52
53 // The level of acknowledgement reliability needed from the broker.
54 saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
55 brokers := []string{cf.MsgbusEndPoint}
56 producer, err := sarama.NewSyncProducer(brokers, saramaConfig)
57
58 if err != nil {
59 logger.Errorw(ctx, "Failed-creating-kafka-producer", log.Fields{"error": err, "sarama-config": saramaConfig})
60 return err
61 }
62
63 kafkaProducer = producer
64 logger.Infow(ctx, "creating-kafka-producer-successful", log.Fields{"sarama-config": saramaConfig})
65 return nil
66}
67
68// SendEvent sends events over kafka bus
69func SendEvent(ctx context.Context, event *dmi.Event) error {
70 e, err := json.Marshal(event)
71 if err != nil {
72 logger.Errorw(ctx, "marshal-event-failed", log.Fields{"event": event})
73 return err
74 }
75 logger.Infow(ctx, "sending-event", log.Fields{"event": event})
76 return sendMsg(ctx, string(e), config.OpenDevMgrEventsTopic, event.EventId.String())
77}
78
79// SendMetric sends metrics over kafka bus
80func SendMetric(ctx context.Context, metric *dmi.Metric) error {
81 e, err := json.Marshal(metric)
82 if err != nil {
83 logger.Errorw(ctx, "marshal-metrics-failed", log.Fields{"metrics": metric})
84 return err
85 }
86 logger.Infow(ctx, "sending-metric", log.Fields{"metrics": metric})
87 return sendMsg(ctx, string(e), config.OpenDevMgrMetricsTopic, metric.MetricId.String())
88}
89
90// SendMsg function will help to publish the message to msgbus/kafka
91func sendMsg(ctx context.Context, msg, topic, key string) error {
92 if kafkaProducer != nil {
93 logger.Debugw(ctx, "sending-message", log.Fields{"msg": msg})
94 msg := &sarama.ProducerMessage{
95 Topic: topic,
96 Key: sarama.StringEncoder(key),
97 Value: sarama.StringEncoder(msg),
98 }
99
100 partition, offset, err := kafkaProducer.SendMessage(msg)
101 logger.Debugw(ctx, "kafka-msg-sent-info", log.Fields{"msg": msg, "partition": partition, "offset": offset, "error": err})
102 return err
103 }
104 logger.Errorw(ctx, "kafka-producer-not-found", log.Fields{"msg": msg, "topic": topic, "key": key})
105 return errors.New("kafka producer not found")
106}
107
108// Close close the msgbus connection
109func Close(ctx context.Context) {
110 if kafkaProducer != nil {
111 reason := "pod exited"
112 logger.Warnw(ctx, "Exiting-msg-bus", log.Fields{"reason": reason})
113 kafkaProducer.Close()
114 }
115}