blob: c4014eebd8de797efc4cfc8d895b98b71325a9af [file] [log] [blame]
Devmalya Paulfb990a52019-07-09 10:01:49 -04001/*
Himani Chawlacd407802020-12-10 12:08:59 +05302 * Copyright 2020-present Open Networking Foundation
Devmalya Paulfb990a52019-07-09 10:01:49 -04003
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Himani Chawlacd407802020-12-10 12:08:59 +053017package events
Devmalya Paulfb990a52019-07-09 10:01:49 -040018
19import (
Neha Sharma96b7bf22020-06-15 10:37:32 +000020 "context"
Devmalya Paulfb990a52019-07-09 10:01:49 -040021 "errors"
22 "fmt"
Devmalya Paulfb990a52019-07-09 10:01:49 -040023 "strconv"
24 "strings"
25 "time"
Devmalya Paulfb990a52019-07-09 10:01:49 -040026
Scott Baker589724f2020-02-10 17:56:58 -080027 "github.com/golang/protobuf/ptypes"
Himani Chawlacd407802020-12-10 12:08:59 +053028 "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
Girish Gowdraa09aeab2020-09-14 16:30:52 -070029 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
30 "github.com/opencord/voltha-lib-go/v4/pkg/log"
31 "github.com/opencord/voltha-protos/v4/go/voltha"
Devmalya Paulfb990a52019-07-09 10:01:49 -040032)
33
34type EventProxy struct {
35 kafkaClient kafka.Client
36 eventTopic kafka.Topic
37}
38
39func NewEventProxy(opts ...EventProxyOption) *EventProxy {
40 var proxy EventProxy
41 for _, option := range opts {
42 option(&proxy)
43 }
44 return &proxy
45}
46
47type EventProxyOption func(*EventProxy)
48
49func MsgClient(client kafka.Client) EventProxyOption {
50 return func(args *EventProxy) {
51 args.kafkaClient = client
52 }
53}
54
55func MsgTopic(topic kafka.Topic) EventProxyOption {
56 return func(args *EventProxy) {
57 args.eventTopic = topic
58 }
59}
60
61func (ep *EventProxy) formatId(eventName string) string {
62 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
63}
64
Scott Baker589724f2020-02-10 17:56:58 -080065func (ep *EventProxy) getEventHeader(eventName string,
Himani Chawlacd407802020-12-10 12:08:59 +053066 category eventif.EventCategory,
67 subCategory *eventif.EventSubCategory,
68 eventType eventif.EventType,
Scott Baker589724f2020-02-10 17:56:58 -080069 raisedTs int64) (*voltha.EventHeader, error) {
Devmalya Paulfb990a52019-07-09 10:01:49 -040070 var header voltha.EventHeader
71 if strings.Contains(eventName, "_") {
72 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
73 } else {
74 eventName = "UNKNOWN_EVENT"
75 }
76 /* Populating event header */
77 header.Id = ep.formatId(eventName)
78 header.Category = category
Himani Chawlacd407802020-12-10 12:08:59 +053079 if subCategory != nil {
80 header.SubCategory = *subCategory
serkant.uluderya7b8211e2021-02-24 16:39:18 +030081 } else {
82 header.SubCategory = voltha.EventSubCategory_NONE
Himani Chawlacd407802020-12-10 12:08:59 +053083 }
Devmalya Paulfb990a52019-07-09 10:01:49 -040084 header.Type = eventType
Himani Chawlacd407802020-12-10 12:08:59 +053085 header.TypeVersion = eventif.EventTypeVersion
Scott Baker589724f2020-02-10 17:56:58 -080086
87 // raisedTs is in nanoseconds
88 timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
89 if err != nil {
90 return nil, err
91 }
92 header.RaisedTs = timestamp
93
94 timestamp, err = ptypes.TimestampProto(time.Now())
95 if err != nil {
96 return nil, err
97 }
98 header.ReportedTs = timestamp
99
100 return &header, nil
Devmalya Paulfb990a52019-07-09 10:01:49 -0400101}
102
Himani Chawlacd407802020-12-10 12:08:59 +0530103/* Send out rpc events*/
104func (ep *EventProxy) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category eventif.EventCategory, subCategory *eventif.EventSubCategory, raisedTs int64) error {
105 if rpcEvent == nil {
106 logger.Error(ctx, "Received empty rpc event")
107 return errors.New("rpc event nil")
108 }
109 var event voltha.Event
110 var err error
111 if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_RPC_EVENT, raisedTs); err != nil {
112 return err
113 }
114 event.EventType = &voltha.Event_RpcEvent{RpcEvent: rpcEvent}
115 if err := ep.sendEvent(ctx, &event); err != nil {
116 logger.Errorw(ctx, "Failed to send rpc event to KAFKA bus", log.Fields{"rpc-event": rpcEvent})
117 return err
118 }
119 logger.Debugw(ctx, "Successfully sent RPC event to KAFKA bus", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
120 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
121 "ReportedTs": event.Header.ReportedTs, "ResourceId": rpcEvent.ResourceId, "Context": rpcEvent.Context,
122 "RPCEventName": id})
123
124 return nil
125
126}
127
Devmalya Paulfb990a52019-07-09 10:01:49 -0400128/* Send out device events*/
Himani Chawlacd407802020-12-10 12:08:59 +0530129func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
Devmalya Paulfb990a52019-07-09 10:01:49 -0400130 if deviceEvent == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000131 logger.Error(ctx, "Recieved empty device event")
Devmalya Paulfb990a52019-07-09 10:01:49 -0400132 return errors.New("Device event nil")
133 }
134 var event voltha.Event
135 var de voltha.Event_DeviceEvent
Scott Baker589724f2020-02-10 17:56:58 -0800136 var err error
Devmalya Paulfb990a52019-07-09 10:01:49 -0400137 de.DeviceEvent = deviceEvent
Himani Chawlacd407802020-12-10 12:08:59 +0530138 if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, &subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
Scott Baker589724f2020-02-10 17:56:58 -0800139 return err
140 }
Devmalya Paulfb990a52019-07-09 10:01:49 -0400141 event.EventType = &de
Neha Sharma96b7bf22020-06-15 10:37:32 +0000142 if err := ep.sendEvent(ctx, &event); err != nil {
143 logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
Devmalya Paulfb990a52019-07-09 10:01:49 -0400144 return err
145 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000146 logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Devmalya Paulfb990a52019-07-09 10:01:49 -0400147 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
148 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
149 "DeviceEventName": deviceEvent.DeviceEventName})
150
151 return nil
152
153}
154
Naga Manjunath7615e552019-10-11 22:35:47 +0530155// SendKpiEvent is to send kpi events to voltha.event topic
Himani Chawlacd407802020-12-10 12:08:59 +0530156func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
Naga Manjunath7615e552019-10-11 22:35:47 +0530157 if kpiEvent == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000158 logger.Error(ctx, "Recieved empty kpi event")
Naga Manjunath7615e552019-10-11 22:35:47 +0530159 return errors.New("KPI event nil")
160 }
161 var event voltha.Event
162 var de voltha.Event_KpiEvent2
Scott Baker589724f2020-02-10 17:56:58 -0800163 var err error
Naga Manjunath7615e552019-10-11 22:35:47 +0530164 de.KpiEvent2 = kpiEvent
Himani Chawlacd407802020-12-10 12:08:59 +0530165 if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
Scott Baker589724f2020-02-10 17:56:58 -0800166 return err
167 }
Naga Manjunath7615e552019-10-11 22:35:47 +0530168 event.EventType = &de
Neha Sharma96b7bf22020-06-15 10:37:32 +0000169 if err := ep.sendEvent(ctx, &event); err != nil {
170 logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
Naga Manjunath7615e552019-10-11 22:35:47 +0530171 return err
172 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000173 logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Naga Manjunath7615e552019-10-11 22:35:47 +0530174 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
175 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
176
177 return nil
178
179}
180
Neha Sharma96b7bf22020-06-15 10:37:32 +0000181func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
Himani Chawlacd407802020-12-10 12:08:59 +0530182 logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
Neha Sharma96b7bf22020-06-15 10:37:32 +0000183 if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
Devmalya Paulfb990a52019-07-09 10:01:49 -0400184 return err
185 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000186 logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
Devmalya Paulfb990a52019-07-09 10:01:49 -0400187
188 return nil
189}