blob: b79bafe88980612441584ae7c7d9dc87ae64fe51 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package common
18
19import (
Neha Sharma3c425fb2020-06-08 16:42:32 +000020 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070021 "errors"
22 "fmt"
23 "strconv"
24 "strings"
25 "time"
26
Scott Baker8e2be6b2020-02-10 17:27:15 -080027 "github.com/golang/protobuf/ptypes"
serkant.uluderyab38671c2019-11-01 09:35:38 -070028 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
29 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
30 "github.com/opencord/voltha-lib-go/v3/pkg/log"
31 "github.com/opencord/voltha-protos/v3/go/voltha"
Scott Baker2c1c4822019-10-16 11:02:41 -070032)
33
34type EventProxy struct {
35 kafkaClient kafka.Client
36 eventTopic kafka.Topic
37}
38
39func NewEventProxy(opts ...EventProxyOption) *EventProxy {
40 var proxy EventProxy
41 for _, option := range opts {
42 option(&proxy)
43 }
44 return &proxy
45}
46
47type EventProxyOption func(*EventProxy)
48
49func MsgClient(client kafka.Client) EventProxyOption {
50 return func(args *EventProxy) {
51 args.kafkaClient = client
52 }
53}
54
55func MsgTopic(topic kafka.Topic) EventProxyOption {
56 return func(args *EventProxy) {
57 args.eventTopic = topic
58 }
59}
60
61func (ep *EventProxy) formatId(eventName string) string {
62 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
63}
64
Scott Baker8e2be6b2020-02-10 17:27:15 -080065func (ep *EventProxy) getEventHeader(eventName string,
66 category adapterif.EventCategory,
67 subCategory adapterif.EventSubCategory,
68 eventType adapterif.EventType,
69 raisedTs int64) (*voltha.EventHeader, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -070070 var header voltha.EventHeader
71 if strings.Contains(eventName, "_") {
72 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
73 } else {
74 eventName = "UNKNOWN_EVENT"
75 }
76 /* Populating event header */
77 header.Id = ep.formatId(eventName)
78 header.Category = category
79 header.SubCategory = subCategory
80 header.Type = eventType
81 header.TypeVersion = adapterif.EventTypeVersion
Scott Baker8e2be6b2020-02-10 17:27:15 -080082
83 // raisedTs is in nanoseconds
84 timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
85 if err != nil {
86 return nil, err
87 }
88 header.RaisedTs = timestamp
89
90 timestamp, err = ptypes.TimestampProto(time.Now())
91 if err != nil {
92 return nil, err
93 }
94 header.ReportedTs = timestamp
95
96 return &header, nil
Scott Baker2c1c4822019-10-16 11:02:41 -070097}
98
99/* Send out device events*/
Neha Sharma3c425fb2020-06-08 16:42:32 +0000100func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700101 if deviceEvent == nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000102 logger.Error(ctx, "Recieved empty device event")
Scott Baker2c1c4822019-10-16 11:02:41 -0700103 return errors.New("Device event nil")
104 }
105 var event voltha.Event
106 var de voltha.Event_DeviceEvent
Scott Baker8e2be6b2020-02-10 17:27:15 -0800107 var err error
Scott Baker2c1c4822019-10-16 11:02:41 -0700108 de.DeviceEvent = deviceEvent
Scott Baker8e2be6b2020-02-10 17:27:15 -0800109 if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
110 return err
111 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700112 event.EventType = &de
Neha Sharma3c425fb2020-06-08 16:42:32 +0000113 if err := ep.sendEvent(ctx, &event); err != nil {
114 logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
Scott Baker2c1c4822019-10-16 11:02:41 -0700115 return err
116 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000117 logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Scott Baker2c1c4822019-10-16 11:02:41 -0700118 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
119 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
120 "DeviceEventName": deviceEvent.DeviceEventName})
121
122 return nil
123
124}
125
Naga Manjunatha59c9152019-10-30 12:48:49 +0530126// SendKpiEvent is to send kpi events to voltha.event topic
Neha Sharma3c425fb2020-06-08 16:42:32 +0000127func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530128 if kpiEvent == nil {
Neha Sharma3c425fb2020-06-08 16:42:32 +0000129 logger.Error(ctx, "Recieved empty kpi event")
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530130 return errors.New("KPI event nil")
131 }
132 var event voltha.Event
133 var de voltha.Event_KpiEvent2
Scott Baker8e2be6b2020-02-10 17:27:15 -0800134 var err error
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530135 de.KpiEvent2 = kpiEvent
Scott Baker8e2be6b2020-02-10 17:27:15 -0800136 if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
137 return err
138 }
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530139 event.EventType = &de
Neha Sharma3c425fb2020-06-08 16:42:32 +0000140 if err := ep.sendEvent(ctx, &event); err != nil {
141 logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530142 return err
143 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000144 logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530145 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
146 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
147
148 return nil
149
150}
151
Scott Baker2c1c4822019-10-16 11:02:41 -0700152/* TODO: Send out KPI events*/
153
Neha Sharma3c425fb2020-06-08 16:42:32 +0000154func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
155 if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
Scott Baker2c1c4822019-10-16 11:02:41 -0700156 return err
157 }
Neha Sharma3c425fb2020-06-08 16:42:32 +0000158 logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
Scott Baker2c1c4822019-10-16 11:02:41 -0700159
160 return nil
161}