blob: a4b12f7e6dfe9fc11759ea5c82da97f231cac903 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
Himani Chawlac07fda02020-12-09 16:21:21 +05302 * Copyright 2020-present Open Networking Foundation
Holger Hildebrandtfa074992020-03-27 15:42:06 +00003
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Himani Chawlac07fda02020-12-09 16:21:21 +053017package events
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018
19import (
dbainbri4d3a0dc2020-12-02 00:33:42 +000020 "context"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000021 "errors"
22 "fmt"
23 "strconv"
24 "strings"
25 "time"
26
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -070027 "github.com/golang/protobuf/ptypes"
Himani Chawlac07fda02020-12-09 16:21:21 +053028 "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
dbainbri4d3a0dc2020-12-02 00:33:42 +000029 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
30 "github.com/opencord/voltha-lib-go/v4/pkg/log"
31 "github.com/opencord/voltha-protos/v4/go/voltha"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000032)
33
34type EventProxy struct {
35 kafkaClient kafka.Client
36 eventTopic kafka.Topic
37}
38
39func NewEventProxy(opts ...EventProxyOption) *EventProxy {
40 var proxy EventProxy
41 for _, option := range opts {
42 option(&proxy)
43 }
44 return &proxy
45}
46
47type EventProxyOption func(*EventProxy)
48
49func MsgClient(client kafka.Client) EventProxyOption {
50 return func(args *EventProxy) {
51 args.kafkaClient = client
52 }
53}
54
55func MsgTopic(topic kafka.Topic) EventProxyOption {
56 return func(args *EventProxy) {
57 args.eventTopic = topic
58 }
59}
60
61func (ep *EventProxy) formatId(eventName string) string {
62 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
63}
64
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -070065func (ep *EventProxy) getEventHeader(eventName string,
Himani Chawlac07fda02020-12-09 16:21:21 +053066 category eventif.EventCategory,
67 subCategory *eventif.EventSubCategory,
68 eventType eventif.EventType,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -070069 raisedTs int64) (*voltha.EventHeader, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +000070 var header voltha.EventHeader
71 if strings.Contains(eventName, "_") {
72 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
73 } else {
74 eventName = "UNKNOWN_EVENT"
75 }
76 /* Populating event header */
77 header.Id = ep.formatId(eventName)
78 header.Category = category
Himani Chawlac07fda02020-12-09 16:21:21 +053079 if subCategory != nil {
80 header.SubCategory = *subCategory
81 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +000082 header.Type = eventType
Himani Chawlac07fda02020-12-09 16:21:21 +053083 header.TypeVersion = eventif.EventTypeVersion
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -070084
85 // raisedTs is in nanoseconds
86 timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
87 if err != nil {
88 return nil, err
89 }
90 header.RaisedTs = timestamp
91
92 timestamp, err = ptypes.TimestampProto(time.Now())
93 if err != nil {
94 return nil, err
95 }
96 header.ReportedTs = timestamp
97
98 return &header, nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +000099}
100
Himani Chawlac07fda02020-12-09 16:21:21 +0530101/* Send out rpc events*/
102func (ep *EventProxy) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category eventif.EventCategory, subCategory *eventif.EventSubCategory, raisedTs int64) error {
103 if rpcEvent == nil {
104 logger.Error(ctx, "Received empty rpc event")
105 return errors.New("rpc event nil")
106 }
107 var event voltha.Event
108 var err error
109 if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_RPC_EVENT, raisedTs); err != nil {
110 return err
111 }
112 event.EventType = &voltha.Event_RpcEvent{RpcEvent: rpcEvent}
113 if err := ep.sendEvent(ctx, &event); err != nil {
114 logger.Errorw(ctx, "Failed to send rpc event to KAFKA bus", log.Fields{"rpc-event": rpcEvent})
115 return err
116 }
117 logger.Debugw(ctx, "Successfully sent RPC event to KAFKA bus", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
118 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
119 "ReportedTs": event.Header.ReportedTs, "ResourceId": rpcEvent.ResourceId, "Context": rpcEvent.Context,
120 "RPCEventName": id})
121
122 return nil
123
124}
125
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000126/* Send out device events*/
Himani Chawlac07fda02020-12-09 16:21:21 +0530127func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000128 if deviceEvent == nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000129 logger.Error(ctx, "Recieved empty device event")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000130 return errors.New("Device event nil")
131 }
132 var event voltha.Event
133 var de voltha.Event_DeviceEvent
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700134 var err error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000135 de.DeviceEvent = deviceEvent
Himani Chawlac07fda02020-12-09 16:21:21 +0530136 if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, &subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700137 return err
138 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000139 event.EventType = &de
dbainbri4d3a0dc2020-12-02 00:33:42 +0000140 if err := ep.sendEvent(ctx, &event); err != nil {
141 logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000142 return err
143 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000144 logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000145 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
146 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
147 "DeviceEventName": deviceEvent.DeviceEventName})
148
149 return nil
150
151}
152
153// SendKpiEvent is to send kpi events to voltha.event topic
Himani Chawlac07fda02020-12-09 16:21:21 +0530154func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000155 if kpiEvent == nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000156 logger.Error(ctx, "Recieved empty kpi event")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000157 return errors.New("KPI event nil")
158 }
159 var event voltha.Event
160 var de voltha.Event_KpiEvent2
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700161 var err error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000162 de.KpiEvent2 = kpiEvent
Himani Chawlac07fda02020-12-09 16:21:21 +0530163 if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700164 return err
165 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000166 event.EventType = &de
dbainbri4d3a0dc2020-12-02 00:33:42 +0000167 if err := ep.sendEvent(ctx, &event); err != nil {
168 logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000169 return err
170 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000171 logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000172 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
173 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
174
175 return nil
176
177}
178
dbainbri4d3a0dc2020-12-02 00:33:42 +0000179func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
Himani Chawlac07fda02020-12-09 16:21:21 +0530180 logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
dbainbri4d3a0dc2020-12-02 00:33:42 +0000181 if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000182 return err
183 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000184 logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000185
186 return nil
187}