blob: c4014eebd8de797efc4cfc8d895b98b71325a9af [file] [log] [blame]
Himani Chawlab4c25912020-11-12 17:16:38 +05301/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package events
18
19import (
20 "context"
21 "errors"
22 "fmt"
23 "strconv"
24 "strings"
25 "time"
26
27 "github.com/golang/protobuf/ptypes"
28 "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
29 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
30 "github.com/opencord/voltha-lib-go/v4/pkg/log"
31 "github.com/opencord/voltha-protos/v4/go/voltha"
32)
33
34type EventProxy struct {
35 kafkaClient kafka.Client
36 eventTopic kafka.Topic
37}
38
39func NewEventProxy(opts ...EventProxyOption) *EventProxy {
40 var proxy EventProxy
41 for _, option := range opts {
42 option(&proxy)
43 }
44 return &proxy
45}
46
47type EventProxyOption func(*EventProxy)
48
49func MsgClient(client kafka.Client) EventProxyOption {
50 return func(args *EventProxy) {
51 args.kafkaClient = client
52 }
53}
54
55func MsgTopic(topic kafka.Topic) EventProxyOption {
56 return func(args *EventProxy) {
57 args.eventTopic = topic
58 }
59}
60
61func (ep *EventProxy) formatId(eventName string) string {
62 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
63}
64
65func (ep *EventProxy) getEventHeader(eventName string,
66 category eventif.EventCategory,
67 subCategory *eventif.EventSubCategory,
68 eventType eventif.EventType,
69 raisedTs int64) (*voltha.EventHeader, error) {
70 var header voltha.EventHeader
71 if strings.Contains(eventName, "_") {
72 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
73 } else {
74 eventName = "UNKNOWN_EVENT"
75 }
76 /* Populating event header */
77 header.Id = ep.formatId(eventName)
78 header.Category = category
79 if subCategory != nil {
80 header.SubCategory = *subCategory
81 } else {
82 header.SubCategory = voltha.EventSubCategory_NONE
83 }
84 header.Type = eventType
85 header.TypeVersion = eventif.EventTypeVersion
86
87 // raisedTs is in nanoseconds
88 timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
89 if err != nil {
90 return nil, err
91 }
92 header.RaisedTs = timestamp
93
94 timestamp, err = ptypes.TimestampProto(time.Now())
95 if err != nil {
96 return nil, err
97 }
98 header.ReportedTs = timestamp
99
100 return &header, nil
101}
102
103/* Send out rpc events*/
104func (ep *EventProxy) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category eventif.EventCategory, subCategory *eventif.EventSubCategory, raisedTs int64) error {
105 if rpcEvent == nil {
106 logger.Error(ctx, "Received empty rpc event")
107 return errors.New("rpc event nil")
108 }
109 var event voltha.Event
110 var err error
111 if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_RPC_EVENT, raisedTs); err != nil {
112 return err
113 }
114 event.EventType = &voltha.Event_RpcEvent{RpcEvent: rpcEvent}
115 if err := ep.sendEvent(ctx, &event); err != nil {
116 logger.Errorw(ctx, "Failed to send rpc event to KAFKA bus", log.Fields{"rpc-event": rpcEvent})
117 return err
118 }
119 logger.Debugw(ctx, "Successfully sent RPC event to KAFKA bus", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
120 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
121 "ReportedTs": event.Header.ReportedTs, "ResourceId": rpcEvent.ResourceId, "Context": rpcEvent.Context,
122 "RPCEventName": id})
123
124 return nil
125
126}
127
128/* Send out device events*/
129func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
130 if deviceEvent == nil {
131 logger.Error(ctx, "Recieved empty device event")
132 return errors.New("Device event nil")
133 }
134 var event voltha.Event
135 var de voltha.Event_DeviceEvent
136 var err error
137 de.DeviceEvent = deviceEvent
138 if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, &subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
139 return err
140 }
141 event.EventType = &de
142 if err := ep.sendEvent(ctx, &event); err != nil {
143 logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
144 return err
145 }
146 logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
147 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
148 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
149 "DeviceEventName": deviceEvent.DeviceEventName})
150
151 return nil
152
153}
154
155// SendKpiEvent is to send kpi events to voltha.event topic
156func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
157 if kpiEvent == nil {
158 logger.Error(ctx, "Recieved empty kpi event")
159 return errors.New("KPI event nil")
160 }
161 var event voltha.Event
162 var de voltha.Event_KpiEvent2
163 var err error
164 de.KpiEvent2 = kpiEvent
165 if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
166 return err
167 }
168 event.EventType = &de
169 if err := ep.sendEvent(ctx, &event); err != nil {
170 logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
171 return err
172 }
173 logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
174 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
175 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
176
177 return nil
178
179}
180
181func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
182 logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
183 if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
184 return err
185 }
186 logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
187
188 return nil
189}