blob: 910fec3ebfa6c0f9fc565e1bdc2ce322f2f14f7e [file] [log] [blame]
Devmalya Paulfb990a52019-07-09 10:01:49 -04001/*
Himani Chawlacd407802020-12-10 12:08:59 +05302 * Copyright 2020-present Open Networking Foundation
Devmalya Paulfb990a52019-07-09 10:01:49 -04003
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Himani Chawlacd407802020-12-10 12:08:59 +053017package events
Devmalya Paulfb990a52019-07-09 10:01:49 -040018
19import (
Gamze Abakacb0e6772021-06-10 08:32:12 +000020 "container/ring"
Neha Sharma96b7bf22020-06-15 10:37:32 +000021 "context"
Devmalya Paulfb990a52019-07-09 10:01:49 -040022 "errors"
23 "fmt"
Devmalya Paulfb990a52019-07-09 10:01:49 -040024 "strconv"
25 "strings"
Gamze Abakacb0e6772021-06-10 08:32:12 +000026 "sync"
Devmalya Paulfb990a52019-07-09 10:01:49 -040027 "time"
Devmalya Paulfb990a52019-07-09 10:01:49 -040028
Scott Baker589724f2020-02-10 17:56:58 -080029 "github.com/golang/protobuf/ptypes"
Himani Chawlacd407802020-12-10 12:08:59 +053030 "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
Girish Gowdraa09aeab2020-09-14 16:30:52 -070031 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
32 "github.com/opencord/voltha-lib-go/v4/pkg/log"
33 "github.com/opencord/voltha-protos/v4/go/voltha"
Devmalya Paulfb990a52019-07-09 10:01:49 -040034)
35
Gamze Abakacb0e6772021-06-10 08:32:12 +000036// TODO: Make configurable through helm chart
37const EVENT_THRESHOLD = 1000
38
39type lastEvent struct{}
40
Devmalya Paulfb990a52019-07-09 10:01:49 -040041type EventProxy struct {
Gamze Abakacb0e6772021-06-10 08:32:12 +000042 kafkaClient kafka.Client
43 eventTopic kafka.Topic
44 eventQueue *EventQueue
45 queueCtx context.Context
46 queueCancelCtx context.CancelFunc
Devmalya Paulfb990a52019-07-09 10:01:49 -040047}
48
49func NewEventProxy(opts ...EventProxyOption) *EventProxy {
50 var proxy EventProxy
51 for _, option := range opts {
52 option(&proxy)
53 }
Gamze Abakacb0e6772021-06-10 08:32:12 +000054 proxy.eventQueue = newEventQueue()
55 proxy.queueCtx, proxy.queueCancelCtx = context.WithCancel(context.Background())
Devmalya Paulfb990a52019-07-09 10:01:49 -040056 return &proxy
57}
58
59type EventProxyOption func(*EventProxy)
60
61func MsgClient(client kafka.Client) EventProxyOption {
62 return func(args *EventProxy) {
63 args.kafkaClient = client
64 }
65}
66
67func MsgTopic(topic kafka.Topic) EventProxyOption {
68 return func(args *EventProxy) {
69 args.eventTopic = topic
70 }
71}
72
73func (ep *EventProxy) formatId(eventName string) string {
74 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
75}
76
Scott Baker589724f2020-02-10 17:56:58 -080077func (ep *EventProxy) getEventHeader(eventName string,
Himani Chawlacd407802020-12-10 12:08:59 +053078 category eventif.EventCategory,
79 subCategory *eventif.EventSubCategory,
80 eventType eventif.EventType,
Scott Baker589724f2020-02-10 17:56:58 -080081 raisedTs int64) (*voltha.EventHeader, error) {
Devmalya Paulfb990a52019-07-09 10:01:49 -040082 var header voltha.EventHeader
83 if strings.Contains(eventName, "_") {
84 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
85 } else {
86 eventName = "UNKNOWN_EVENT"
87 }
88 /* Populating event header */
89 header.Id = ep.formatId(eventName)
90 header.Category = category
Himani Chawlacd407802020-12-10 12:08:59 +053091 if subCategory != nil {
92 header.SubCategory = *subCategory
serkant.uluderya7b8211e2021-02-24 16:39:18 +030093 } else {
94 header.SubCategory = voltha.EventSubCategory_NONE
Himani Chawlacd407802020-12-10 12:08:59 +053095 }
Devmalya Paulfb990a52019-07-09 10:01:49 -040096 header.Type = eventType
Himani Chawlacd407802020-12-10 12:08:59 +053097 header.TypeVersion = eventif.EventTypeVersion
Scott Baker589724f2020-02-10 17:56:58 -080098
Girish Gowdrac1b9d5e2021-04-22 12:47:44 -070099 // raisedTs is in seconds
100 timestamp, err := ptypes.TimestampProto(time.Unix(raisedTs, 0))
Scott Baker589724f2020-02-10 17:56:58 -0800101 if err != nil {
102 return nil, err
103 }
104 header.RaisedTs = timestamp
105
106 timestamp, err = ptypes.TimestampProto(time.Now())
107 if err != nil {
108 return nil, err
109 }
110 header.ReportedTs = timestamp
111
112 return &header, nil
Devmalya Paulfb990a52019-07-09 10:01:49 -0400113}
114
Himani Chawlacd407802020-12-10 12:08:59 +0530115/* Send out rpc events*/
116func (ep *EventProxy) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category eventif.EventCategory, subCategory *eventif.EventSubCategory, raisedTs int64) error {
117 if rpcEvent == nil {
118 logger.Error(ctx, "Received empty rpc event")
119 return errors.New("rpc event nil")
120 }
121 var event voltha.Event
122 var err error
123 if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_RPC_EVENT, raisedTs); err != nil {
124 return err
125 }
126 event.EventType = &voltha.Event_RpcEvent{RpcEvent: rpcEvent}
Gamze Abakacb0e6772021-06-10 08:32:12 +0000127 ep.eventQueue.push(&event)
Himani Chawlacd407802020-12-10 12:08:59 +0530128 return nil
129
130}
131
Devmalya Paulfb990a52019-07-09 10:01:49 -0400132/* Send out device events*/
Himani Chawlacd407802020-12-10 12:08:59 +0530133func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
Devmalya Paulfb990a52019-07-09 10:01:49 -0400134 if deviceEvent == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000135 logger.Error(ctx, "Recieved empty device event")
Devmalya Paulfb990a52019-07-09 10:01:49 -0400136 return errors.New("Device event nil")
137 }
138 var event voltha.Event
139 var de voltha.Event_DeviceEvent
Scott Baker589724f2020-02-10 17:56:58 -0800140 var err error
Devmalya Paulfb990a52019-07-09 10:01:49 -0400141 de.DeviceEvent = deviceEvent
Himani Chawlacd407802020-12-10 12:08:59 +0530142 if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, &subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
Scott Baker589724f2020-02-10 17:56:58 -0800143 return err
144 }
Devmalya Paulfb990a52019-07-09 10:01:49 -0400145 event.EventType = &de
Neha Sharma96b7bf22020-06-15 10:37:32 +0000146 if err := ep.sendEvent(ctx, &event); err != nil {
147 logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
Devmalya Paulfb990a52019-07-09 10:01:49 -0400148 return err
149 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000150 logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Devmalya Paulfb990a52019-07-09 10:01:49 -0400151 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
152 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
153 "DeviceEventName": deviceEvent.DeviceEventName})
154
155 return nil
156
157}
158
Naga Manjunath7615e552019-10-11 22:35:47 +0530159// SendKpiEvent is to send kpi events to voltha.event topic
Himani Chawlacd407802020-12-10 12:08:59 +0530160func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
Naga Manjunath7615e552019-10-11 22:35:47 +0530161 if kpiEvent == nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000162 logger.Error(ctx, "Recieved empty kpi event")
Naga Manjunath7615e552019-10-11 22:35:47 +0530163 return errors.New("KPI event nil")
164 }
165 var event voltha.Event
166 var de voltha.Event_KpiEvent2
Scott Baker589724f2020-02-10 17:56:58 -0800167 var err error
Naga Manjunath7615e552019-10-11 22:35:47 +0530168 de.KpiEvent2 = kpiEvent
Himani Chawlacd407802020-12-10 12:08:59 +0530169 if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
Scott Baker589724f2020-02-10 17:56:58 -0800170 return err
171 }
Naga Manjunath7615e552019-10-11 22:35:47 +0530172 event.EventType = &de
Neha Sharma96b7bf22020-06-15 10:37:32 +0000173 if err := ep.sendEvent(ctx, &event); err != nil {
174 logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
Naga Manjunath7615e552019-10-11 22:35:47 +0530175 return err
176 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000177 logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Naga Manjunath7615e552019-10-11 22:35:47 +0530178 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
179 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
180
181 return nil
182
183}
184
Neha Sharma96b7bf22020-06-15 10:37:32 +0000185func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
Himani Chawlacd407802020-12-10 12:08:59 +0530186 logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
Neha Sharma96b7bf22020-06-15 10:37:32 +0000187 if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
Devmalya Paulfb990a52019-07-09 10:01:49 -0400188 return err
189 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000190 logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
Devmalya Paulfb990a52019-07-09 10:01:49 -0400191
192 return nil
193}
Gamze Abaka7650be62021-02-26 10:50:36 +0000194
195func (ep *EventProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
196 return ep.kafkaClient.EnableLivenessChannel(ctx, enable)
197}
198
199func (ep *EventProxy) SendLiveness(ctx context.Context) error {
200 return ep.kafkaClient.SendLiveness(ctx)
201}
Gamze Abakacb0e6772021-06-10 08:32:12 +0000202
203// Start the event proxy
204func (ep *EventProxy) Start() {
205 eq := ep.eventQueue
206 go eq.start(ep.queueCtx)
207 logger.Debugw(context.Background(), "event-proxy-starting...", log.Fields{"events-threashold": EVENT_THRESHOLD})
208 for {
209 // Notify the queue I am ready
210 eq.readyToSendToKafkaCh <- struct{}{}
211 // Wait for an event
212 elem, ok := <-eq.eventChannel
213 if !ok {
214 logger.Debug(context.Background(), "event-channel-closed-exiting")
215 break
216 }
217 // Check for last event
218 if _, ok := elem.(*lastEvent); ok {
219 // close the queuing loop
220 logger.Info(context.Background(), "received-last-event")
221 ep.queueCancelCtx()
222 break
223 }
224 ctx := context.Background()
225 event, ok := elem.(*voltha.Event)
226 if !ok {
227 logger.Warnw(ctx, "invalid-event", log.Fields{"element": elem})
228 continue
229 }
230 if err := ep.sendEvent(ctx, event); err != nil {
231 logger.Errorw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
232 } else {
233 logger.Debugw(ctx, "successfully-sent-rpc-event-to-kafka-bus", log.Fields{"id": event.Header.Id, "category": event.Header.Category,
234 "sub-category": event.Header.SubCategory, "type": event.Header.Type, "type-version": event.Header.TypeVersion,
235 "reported-ts": event.Header.ReportedTs, "event-type": event.EventType})
236 }
237 }
238}
239
240func (ep *EventProxy) Stop() {
241 ep.eventQueue.stop()
242}
243
244type EventQueue struct {
245 mutex sync.RWMutex
246 eventChannel chan interface{}
247 insertPosition *ring.Ring
248 popPosition *ring.Ring
249 dataToSendAvailable chan struct{}
250 readyToSendToKafkaCh chan struct{}
251 eventQueueStopped chan struct{}
252}
253
254func newEventQueue() *EventQueue {
255 ev := &EventQueue{
256 eventChannel: make(chan interface{}),
257 insertPosition: ring.New(EVENT_THRESHOLD),
258 dataToSendAvailable: make(chan struct{}),
259 readyToSendToKafkaCh: make(chan struct{}),
260 eventQueueStopped: make(chan struct{}),
261 }
262 ev.popPosition = ev.insertPosition
263 return ev
264}
265
266// push is invoked to push an event at the back of a queue
267func (eq *EventQueue) push(event interface{}) {
268 eq.mutex.Lock()
269
270 if eq.insertPosition != nil {
271 // Handle Queue is full.
272 // TODO: Current default is to overwrite old data if queue is full. Is there a need to
273 // block caller if max threshold is reached?
274 if eq.insertPosition.Value != nil && eq.insertPosition == eq.popPosition {
275 eq.popPosition = eq.popPosition.Next()
276 }
277
278 // Insert data and move pointer to next empty position
279 eq.insertPosition.Value = event
280 eq.insertPosition = eq.insertPosition.Next()
281
282 // Check for last event
283 if _, ok := event.(*lastEvent); ok {
284 eq.insertPosition = nil
285 }
286 eq.mutex.Unlock()
287 // Notify waiting thread of data availability
288 eq.dataToSendAvailable <- struct{}{}
289
290 } else {
291 logger.Debug(context.Background(), "event-queue-is-closed-as-insert-position-is-cleared")
292 eq.mutex.Unlock()
293 }
294}
295
296// start starts the routine that extracts an element from the event queue and
297// send it to the kafka sending routine to process.
298func (eq *EventQueue) start(ctx context.Context) {
299 logger.Info(ctx, "starting-event-queue")
300loop:
301 for {
302 select {
303 case <-eq.dataToSendAvailable:
304 // Do nothing - use to prevent caller pushing data to block
305 case <-eq.readyToSendToKafkaCh:
306 {
307 // Kafka sending routine is ready to process an event
308 eq.mutex.Lock()
309 element := eq.popPosition.Value
310 if element == nil {
311 // No events to send. Wait
312 eq.mutex.Unlock()
313 select {
314 case _, ok := <-eq.dataToSendAvailable:
315 if !ok {
316 // channel closed
317 eq.eventQueueStopped <- struct{}{}
318 return
319 }
320 case <-ctx.Done():
321 logger.Info(ctx, "event-queue-context-done")
322 eq.eventQueueStopped <- struct{}{}
323 return
324 }
325 eq.mutex.Lock()
326 element = eq.popPosition.Value
327 }
328 eq.popPosition.Value = nil
329 eq.popPosition = eq.popPosition.Next()
330 eq.mutex.Unlock()
331 eq.eventChannel <- element
332 }
333 case <-ctx.Done():
334 logger.Info(ctx, "event-queue-context-done")
335 eq.eventQueueStopped <- struct{}{}
336 break loop
337 }
338 }
339 logger.Info(ctx, "event-queue-stopped")
340
341}
342
343func (eq *EventQueue) stop() {
344 // Flush all
345 eq.push(&lastEvent{})
346 <-eq.eventQueueStopped
347 eq.mutex.Lock()
348 close(eq.readyToSendToKafkaCh)
349 close(eq.dataToSendAvailable)
350 close(eq.eventChannel)
351 eq.mutex.Unlock()
352
353}