blob: 1f14b3a502b753f0b2fea7fa06dd4a707e3c42d0 [file] [log] [blame]
Devmalya Paulf2e13782019-07-09 07:52:15 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package common
18
19import (
20 "errors"
21 "fmt"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 "github.com/opencord/voltha-protos/go/voltha"
25 "strconv"
26 "strings"
27 "time"
28)
29
30const (
31 EventTypeVersion = "0.1"
32)
33
34type (
35 EventType = voltha.EventType_EventType
36 EventCategory = voltha.EventCategory_EventCategory
37 EventSubCategory = voltha.EventSubCategory_EventSubCategory
38)
39
40type EventProxy struct {
41 kafkaClient kafka.Client
42 eventTopic kafka.Topic
43}
44
45func NewEventProxy(opts ...EventProxyOption) *EventProxy {
46 var proxy EventProxy
47 for _, option := range opts {
48 option(&proxy)
49 }
50 return &proxy
51}
52
53type EventProxyOption func(*EventProxy)
54
55func MsgClient(client kafka.Client) EventProxyOption {
56 return func(args *EventProxy) {
57 args.kafkaClient = client
58 }
59}
60
61func MsgTopic(topic kafka.Topic) EventProxyOption {
62 return func(args *EventProxy) {
63 args.eventTopic = topic
64 }
65}
66
67func (ep *EventProxy) formatId(eventName string) string {
68 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
69}
70
71func (ep *EventProxy) getEventHeader(eventName string, category EventCategory, subCategory EventSubCategory, eventType EventType, raisedTs int64) *voltha.EventHeader {
72 var header voltha.EventHeader
73 if strings.Contains(eventName, "_") {
74 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
75 } else {
76 eventName = "UNKNOWN_EVENT"
77 }
78 /* Populating event header */
79 header.Id = ep.formatId(eventName)
80 header.Category = category
81 header.SubCategory = subCategory
82 header.Type = eventType
83 header.TypeVersion = EventTypeVersion
84 header.RaisedTs = float32(raisedTs)
85 header.ReportedTs = float32(time.Now().UnixNano())
86 return &header
87}
88
89/* Send out device events*/
90func (ep *EventProxy) SendDeviceEvent(deviceEvent *voltha.DeviceEvent, category EventCategory, subCategory EventSubCategory, raisedTs int64) error {
91 if deviceEvent == nil {
92 log.Error("Recieved empty device event")
93 return errors.New("Device event nil")
94 }
95 var event voltha.Event
96 var de voltha.Event_DeviceEvent
97 de.DeviceEvent = deviceEvent
98 event.Header = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs)
99 event.EventType = &de
100 if err := ep.sendEvent(&event); err != nil {
101 log.Errorw("Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
102 return err
103 }
104 log.Infow("Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
105 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
106 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
107 "DeviceEventName": deviceEvent.DeviceEventName})
108
109 return nil
110
111}
112
113/* TODO: Send out KPI events*/
114
115func (ep *EventProxy) sendEvent(event *voltha.Event) error {
116 if err := ep.kafkaClient.Send(event, &ep.eventTopic); err != nil {
117 return err
118 }
119 log.Debugw("Sent event to kafka", log.Fields{"event": event})
120
121 return nil
122}