blob: a4b12f7e6dfe9fc11759ea5c82da97f231cac903 [file] [log] [blame]
Devmalya Paulfb990a52019-07-09 10:01:49 -04001/*
Himani Chawlacd407802020-12-10 12:08:59 +05302 * Copyright 2020-present Open Networking Foundation
Devmalya Paulfb990a52019-07-09 10:01:49 -04003
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Himani Chawlacd407802020-12-10 12:08:59 +053017package events
Devmalya Paulfb990a52019-07-09 10:01:49 -040018
19import (
Neha Sharma96b7bf22020-06-15 10:37:32 +000020 "context"
Devmalya Paulfb990a52019-07-09 10:01:49 -040021 "errors"
22 "fmt"
Devmalya Paulfb990a52019-07-09 10:01:49 -040023 "strconv"
24 "strings"
25 "time"
Devmalya Paulfb990a52019-07-09 10:01:49 -040026
Scott Baker589724f2020-02-10 17:56:58 -080027 "github.com/golang/protobuf/ptypes"
Himani Chawlacd407802020-12-10 12:08:59 +053028 "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
Girish Gowdraa09aeab2020-09-14 16:30:52 -070029 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
30 "github.com/opencord/voltha-lib-go/v4/pkg/log"
31 "github.com/opencord/voltha-protos/v4/go/voltha"
Devmalya Paulfb990a52019-07-09 10:01:49 -040032)
33
34type EventProxy struct {
35 kafkaClient kafka.Client
36 eventTopic kafka.Topic
37}
38
39func NewEventProxy(opts ...EventProxyOption) *EventProxy {
40 var proxy EventProxy
41 for _, option := range opts {
42 option(&proxy)
43 }
44 return &proxy
45}
46
47type EventProxyOption func(*EventProxy)
48
49func MsgClient(client kafka.Client) EventProxyOption {
50 return func(args *EventProxy) {
51 args.kafkaClient = client
52 }
53}
54
55func MsgTopic(topic kafka.Topic) EventProxyOption {
56 return func(args *EventProxy) {
57 args.eventTopic = topic
58 }
59}
60
61func (ep *EventProxy) formatId(eventName string) string {
62 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
63}
64
Scott Baker589724f2020-02-10 17:56:58 -080065func (ep *EventProxy) getEventHeader(eventName string,
Himani Chawlacd407802020-12-10 12:08:59 +053066 category eventif.EventCategory,
67 subCategory *eventif.EventSubCategory,
68 eventType eventif.EventType,
Scott Baker589724f2020-02-10 17:56:58 -080069 raisedTs int64) (*voltha.EventHeader, error) {
Devmalya Paulfb990a52019-07-09 10:01:49 -040070 var header voltha.EventHeader
71 if strings.Contains(eventName, "_") {
72 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
73 } else {
74 eventName = "UNKNOWN_EVENT"
75 }
76 /* Populating event header */
77 header.Id = ep.formatId(eventName)
78 header.Category = category
Himani Chawlacd407802020-12-10 12:08:59 +053079 if subCategory != nil {
80 header.SubCategory = *subCategory
81 }
Devmalya Paulfb990a52019-07-09 10:01:49 -040082 header.Type = eventType
Himani Chawlacd407802020-12-10 12:08:59 +053083 header.TypeVersion = eventif.EventTypeVersion
Scott Baker589724f2020-02-10 17:56:58 -080084
85 // raisedTs is in nanoseconds
86 timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
87 if err != nil {
88 return nil, err
89 }
90 header.RaisedTs = timestamp
91
92 timestamp, err = ptypes.TimestampProto(time.Now())
93 if err != nil {
94 return nil, err
95 }
96 header.ReportedTs = timestamp
97
98 return &header, nil
Devmalya Paulfb990a52019-07-09 10:01:49 -040099}
100
Himani Chawlacd407802020-12-10 12:08:59 +0530101/* Send out rpc events*/
102func (ep *EventProxy) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category eventif.EventCategory, subCategory *eventif.EventSubCategory, raisedTs int64) error {
103 if rpcEvent == nil {
104 logger.Error(ctx, "Received empty rpc event")
105 return errors.New("rpc event nil")
106 }
107 var event voltha.Event
108 var err error
109 if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_RPC_EVENT, raisedTs); err != nil {
110 return err
111 }
112 event.EventType = &voltha.Event_RpcEvent{RpcEvent: rpcEvent}
113 if err := ep.sendEvent(ctx, &event); err != nil {
114 logger.Errorw(ctx, "Failed to send rpc event to KAFKA bus", log.Fields{"rpc-event": rpcEvent})
115 return err
116 }
117 logger.Debugw(ctx, "Successfully sent RPC event to KAFKA bus", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
118 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
119 "ReportedTs": event.Header.ReportedTs, "ResourceId": rpcEvent.ResourceId, "Context": rpcEvent.Context,
120 "RPCEventName": id})
121
122 return nil
123
124}
125
Devmalya Paulfb990a52019-07-09 10:01:49 -0400126/* Send out device events*/
Himani Chawlacd407802020-12-10 12:08:59 +0530127func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
Devmalya Paulfb990a52019-07-09 10:01:49 -0400128 if deviceEvent == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000129 logger.Error(ctx, "Recieved empty device event")
Devmalya Paulfb990a52019-07-09 10:01:49 -0400130 return errors.New("Device event nil")
131 }
132 var event voltha.Event
133 var de voltha.Event_DeviceEvent
Scott Baker589724f2020-02-10 17:56:58 -0800134 var err error
Devmalya Paulfb990a52019-07-09 10:01:49 -0400135 de.DeviceEvent = deviceEvent
Himani Chawlacd407802020-12-10 12:08:59 +0530136 if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, &subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
Scott Baker589724f2020-02-10 17:56:58 -0800137 return err
138 }
Devmalya Paulfb990a52019-07-09 10:01:49 -0400139 event.EventType = &de
Neha Sharma96b7bf22020-06-15 10:37:32 +0000140 if err := ep.sendEvent(ctx, &event); err != nil {
141 logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
Devmalya Paulfb990a52019-07-09 10:01:49 -0400142 return err
143 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000144 logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Devmalya Paulfb990a52019-07-09 10:01:49 -0400145 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
146 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
147 "DeviceEventName": deviceEvent.DeviceEventName})
148
149 return nil
150
151}
152
Naga Manjunath7615e552019-10-11 22:35:47 +0530153// SendKpiEvent is to send kpi events to voltha.event topic
Himani Chawlacd407802020-12-10 12:08:59 +0530154func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
Naga Manjunath7615e552019-10-11 22:35:47 +0530155 if kpiEvent == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000156 logger.Error(ctx, "Recieved empty kpi event")
Naga Manjunath7615e552019-10-11 22:35:47 +0530157 return errors.New("KPI event nil")
158 }
159 var event voltha.Event
160 var de voltha.Event_KpiEvent2
Scott Baker589724f2020-02-10 17:56:58 -0800161 var err error
Naga Manjunath7615e552019-10-11 22:35:47 +0530162 de.KpiEvent2 = kpiEvent
Himani Chawlacd407802020-12-10 12:08:59 +0530163 if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
Scott Baker589724f2020-02-10 17:56:58 -0800164 return err
165 }
Naga Manjunath7615e552019-10-11 22:35:47 +0530166 event.EventType = &de
Neha Sharma96b7bf22020-06-15 10:37:32 +0000167 if err := ep.sendEvent(ctx, &event); err != nil {
168 logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
Naga Manjunath7615e552019-10-11 22:35:47 +0530169 return err
170 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000171 logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Naga Manjunath7615e552019-10-11 22:35:47 +0530172 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
173 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
174
175 return nil
176
177}
178
Neha Sharma96b7bf22020-06-15 10:37:32 +0000179func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
Himani Chawlacd407802020-12-10 12:08:59 +0530180 logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
Neha Sharma96b7bf22020-06-15 10:37:32 +0000181 if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
Devmalya Paulfb990a52019-07-09 10:01:49 -0400182 return err
183 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000184 logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
Devmalya Paulfb990a52019-07-09 10:01:49 -0400185
186 return nil
187}