blob: da9c9eb53b29b693e17598f17fe08d5398622d79 [file] [log] [blame]
Devmalya Paulfb990a52019-07-09 10:01:49 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package common
18
19import (
20 "errors"
21 "fmt"
Devmalya Paulfb990a52019-07-09 10:01:49 -040022 "strconv"
23 "strings"
24 "time"
Devmalya Paulfb990a52019-07-09 10:01:49 -040025
Scott Baker589724f2020-02-10 17:56:58 -080026 "github.com/golang/protobuf/ptypes"
Esin Karamanccb714b2019-11-29 15:02:06 +000027 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
28 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
29 "github.com/opencord/voltha-lib-go/v3/pkg/log"
30 "github.com/opencord/voltha-protos/v3/go/voltha"
Devmalya Paulfb990a52019-07-09 10:01:49 -040031)
32
33type EventProxy struct {
34 kafkaClient kafka.Client
35 eventTopic kafka.Topic
36}
37
38func NewEventProxy(opts ...EventProxyOption) *EventProxy {
39 var proxy EventProxy
40 for _, option := range opts {
41 option(&proxy)
42 }
43 return &proxy
44}
45
46type EventProxyOption func(*EventProxy)
47
48func MsgClient(client kafka.Client) EventProxyOption {
49 return func(args *EventProxy) {
50 args.kafkaClient = client
51 }
52}
53
54func MsgTopic(topic kafka.Topic) EventProxyOption {
55 return func(args *EventProxy) {
56 args.eventTopic = topic
57 }
58}
59
60func (ep *EventProxy) formatId(eventName string) string {
61 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
62}
63
Scott Baker589724f2020-02-10 17:56:58 -080064func (ep *EventProxy) getEventHeader(eventName string,
65 category adapterif.EventCategory,
66 subCategory adapterif.EventSubCategory,
67 eventType adapterif.EventType,
68 raisedTs int64) (*voltha.EventHeader, error) {
Devmalya Paulfb990a52019-07-09 10:01:49 -040069 var header voltha.EventHeader
70 if strings.Contains(eventName, "_") {
71 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
72 } else {
73 eventName = "UNKNOWN_EVENT"
74 }
75 /* Populating event header */
76 header.Id = ep.formatId(eventName)
77 header.Category = category
78 header.SubCategory = subCategory
79 header.Type = eventType
Devmalya Paul495b94a2019-08-27 19:42:00 -040080 header.TypeVersion = adapterif.EventTypeVersion
Scott Baker589724f2020-02-10 17:56:58 -080081
82 // raisedTs is in nanoseconds
83 timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
84 if err != nil {
85 return nil, err
86 }
87 header.RaisedTs = timestamp
88
89 timestamp, err = ptypes.TimestampProto(time.Now())
90 if err != nil {
91 return nil, err
92 }
93 header.ReportedTs = timestamp
94
95 return &header, nil
Devmalya Paulfb990a52019-07-09 10:01:49 -040096}
97
98/* Send out device events*/
Devmalya Paul495b94a2019-08-27 19:42:00 -040099func (ep *EventProxy) SendDeviceEvent(deviceEvent *voltha.DeviceEvent, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
Devmalya Paulfb990a52019-07-09 10:01:49 -0400100 if deviceEvent == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000101 logger.Error("Recieved empty device event")
Devmalya Paulfb990a52019-07-09 10:01:49 -0400102 return errors.New("Device event nil")
103 }
104 var event voltha.Event
105 var de voltha.Event_DeviceEvent
Scott Baker589724f2020-02-10 17:56:58 -0800106 var err error
Devmalya Paulfb990a52019-07-09 10:01:49 -0400107 de.DeviceEvent = deviceEvent
Scott Baker589724f2020-02-10 17:56:58 -0800108 if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
109 return err
110 }
Devmalya Paulfb990a52019-07-09 10:01:49 -0400111 event.EventType = &de
112 if err := ep.sendEvent(&event); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000113 logger.Errorw("Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
Devmalya Paulfb990a52019-07-09 10:01:49 -0400114 return err
115 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000116 logger.Infow("Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Devmalya Paulfb990a52019-07-09 10:01:49 -0400117 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
118 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
119 "DeviceEventName": deviceEvent.DeviceEventName})
120
121 return nil
122
123}
124
Naga Manjunath7615e552019-10-11 22:35:47 +0530125// SendKpiEvent is to send kpi events to voltha.event topic
126func (ep *EventProxy) SendKpiEvent(id string, kpiEvent *voltha.KpiEvent2, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
127 if kpiEvent == nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000128 logger.Error("Recieved empty kpi event")
Naga Manjunath7615e552019-10-11 22:35:47 +0530129 return errors.New("KPI event nil")
130 }
131 var event voltha.Event
132 var de voltha.Event_KpiEvent2
Scott Baker589724f2020-02-10 17:56:58 -0800133 var err error
Naga Manjunath7615e552019-10-11 22:35:47 +0530134 de.KpiEvent2 = kpiEvent
Scott Baker589724f2020-02-10 17:56:58 -0800135 if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
136 return err
137 }
Naga Manjunath7615e552019-10-11 22:35:47 +0530138 event.EventType = &de
139 if err := ep.sendEvent(&event); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +0000140 logger.Errorw("Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
Naga Manjunath7615e552019-10-11 22:35:47 +0530141 return err
142 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000143 logger.Infow("Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Naga Manjunath7615e552019-10-11 22:35:47 +0530144 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
145 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
146
147 return nil
148
149}
150
Devmalya Paulfb990a52019-07-09 10:01:49 -0400151/* TODO: Send out KPI events*/
152
153func (ep *EventProxy) sendEvent(event *voltha.Event) error {
154 if err := ep.kafkaClient.Send(event, &ep.eventTopic); err != nil {
155 return err
156 }
Esin Karamanccb714b2019-11-29 15:02:06 +0000157 logger.Debugw("Sent event to kafka", log.Fields{"event": event})
Devmalya Paulfb990a52019-07-09 10:01:49 -0400158
159 return nil
160}