blob: 4b46854c563f25854c0ca31c6bb504f5675ec9f0 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package events
18
19import (
20 "container/ring"
21 "context"
22 "errors"
23 "fmt"
24 "strconv"
25 "strings"
26 "sync"
27 "time"
28
29 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
30 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
31 "github.com/opencord/voltha-lib-go/v7/pkg/log"
32 "github.com/opencord/voltha-protos/v5/go/voltha"
33 "google.golang.org/protobuf/types/known/timestamppb"
34)
35
36// TODO: Make configurable through helm chart
37const EVENT_THRESHOLD = 1000
38
39type lastEvent struct{}
40
41type EventProxy struct {
42 kafkaClient kafka.Client
43 eventTopic kafka.Topic
44 eventQueue *EventQueue
45 queueCtx context.Context
46 queueCancelCtx context.CancelFunc
47}
48
49func NewEventProxy(opts ...EventProxyOption) *EventProxy {
50 var proxy EventProxy
51 for _, option := range opts {
52 option(&proxy)
53 }
54 proxy.eventQueue = newEventQueue()
55 proxy.queueCtx, proxy.queueCancelCtx = context.WithCancel(context.Background())
56 return &proxy
57}
58
59type EventProxyOption func(*EventProxy)
60
61func MsgClient(client kafka.Client) EventProxyOption {
62 return func(args *EventProxy) {
63 args.kafkaClient = client
64 }
65}
66
67func MsgTopic(topic kafka.Topic) EventProxyOption {
68 return func(args *EventProxy) {
69 args.eventTopic = topic
70 }
71}
72
73func (ep *EventProxy) formatId(eventName string) string {
74 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
75}
76
77func (ep *EventProxy) getEventHeader(eventName string,
78 category eventif.EventCategory,
79 subCategory *eventif.EventSubCategory,
80 eventType eventif.EventType,
81 raisedTs int64) (*voltha.EventHeader, error) {
82 var header voltha.EventHeader
83 if strings.Contains(eventName, "_") {
84 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
85 } else {
86 eventName = "UNKNOWN_EVENT"
87 }
88 /* Populating event header */
89 header.Id = ep.formatId(eventName)
90 header.Category = category
91 if subCategory != nil {
92 header.SubCategory = *subCategory
93 } else {
94 header.SubCategory = voltha.EventSubCategory_NONE
95 }
96 header.Type = eventType
97 header.TypeVersion = eventif.EventTypeVersion
98
99 // raisedTs is in seconds
100 header.RaisedTs = timestamppb.New(time.Unix(raisedTs, 0))
101 header.ReportedTs = timestamppb.New(time.Now())
102
103 return &header, nil
104}
105
106/* Send out rpc events*/
107func (ep *EventProxy) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category eventif.EventCategory, subCategory *eventif.EventSubCategory, raisedTs int64) error {
108 if rpcEvent == nil {
109 logger.Error(ctx, "Received empty rpc event")
110 return errors.New("rpc event nil")
111 }
112 var event voltha.Event
113 var err error
114 if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_RPC_EVENT, raisedTs); err != nil {
115 return err
116 }
117 event.EventType = &voltha.Event_RpcEvent{RpcEvent: rpcEvent}
118 ep.eventQueue.push(&event)
119 return nil
120
121}
122
123/* Send out device events*/
124func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
kesavand92fac102022-03-16 12:33:06 +0530125 return ep.SendDeviceEventWithKey(ctx, deviceEvent, category, subCategory, raisedTs, "")
126}
127
128/* Send out device events with key*/
129func (ep *EventProxy) SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64, key string) error {
khenaidood948f772021-08-11 17:49:24 -0400130 if deviceEvent == nil {
131 logger.Error(ctx, "Recieved empty device event")
132 return errors.New("Device event nil")
133 }
134 var event voltha.Event
135 var de voltha.Event_DeviceEvent
136 var err error
137 de.DeviceEvent = deviceEvent
138 if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, &subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
139 return err
140 }
141 event.EventType = &de
kesavand92fac102022-03-16 12:33:06 +0530142
143 if err := ep.sendEvent(ctx, &event, key); err != nil {
khenaidood948f772021-08-11 17:49:24 -0400144 logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
145 return err
146 }
kesavand92fac102022-03-16 12:33:06 +0530147 logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"key": key, "Id": event.Header.Id, "Category": event.Header.Category,
khenaidood948f772021-08-11 17:49:24 -0400148 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
149 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
150 "DeviceEventName": deviceEvent.DeviceEventName})
151
152 return nil
153
154}
155
156// SendKpiEvent is to send kpi events to voltha.event topic
157func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
158 if kpiEvent == nil {
159 logger.Error(ctx, "Recieved empty kpi event")
160 return errors.New("KPI event nil")
161 }
162 var event voltha.Event
163 var de voltha.Event_KpiEvent2
164 var err error
165 de.KpiEvent2 = kpiEvent
166 if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
167 return err
168 }
169 event.EventType = &de
kesavand92fac102022-03-16 12:33:06 +0530170
171 if err := ep.sendEvent(ctx, &event, strconv.FormatInt(raisedTs, 10)); err != nil {
khenaidood948f772021-08-11 17:49:24 -0400172 logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
173 return err
174 }
175 logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
176 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
177 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
178
179 return nil
180
181}
182
kesavand92fac102022-03-16 12:33:06 +0530183func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event, key string) error {
khenaidood948f772021-08-11 17:49:24 -0400184 logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
kesavand92fac102022-03-16 12:33:06 +0530185 if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic, key); err != nil {
khenaidood948f772021-08-11 17:49:24 -0400186 return err
187 }
188 logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
189
190 return nil
191}
192
193func (ep *EventProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
194 return ep.kafkaClient.EnableLivenessChannel(ctx, enable)
195}
196
197func (ep *EventProxy) SendLiveness(ctx context.Context) error {
198 return ep.kafkaClient.SendLiveness(ctx)
199}
200
201// Start the event proxy
202func (ep *EventProxy) Start() error {
kesavand92fac102022-03-16 12:33:06 +0530203 if !ep.eventTopicExits(context.Background()) {
204 logger.Errorw(context.Background(), "event-topic-doesn't-exist-in-kafka", log.Fields{"element": ep.eventTopic.Name})
205 return fmt.Errorf("event topic doesn't exist in kafka")
206 }
207
khenaidood948f772021-08-11 17:49:24 -0400208 eq := ep.eventQueue
kesavand92fac102022-03-16 12:33:06 +0530209
khenaidood948f772021-08-11 17:49:24 -0400210 go eq.start(ep.queueCtx)
211 logger.Debugw(context.Background(), "event-proxy-starting...", log.Fields{"events-threashold": EVENT_THRESHOLD})
212 for {
213 // Notify the queue I am ready
214 eq.readyToSendToKafkaCh <- struct{}{}
215 // Wait for an event
216 elem, ok := <-eq.eventChannel
217 if !ok {
218 logger.Debug(context.Background(), "event-channel-closed-exiting")
219 break
220 }
221 // Check for last event
222 if _, ok := elem.(*lastEvent); ok {
223 // close the queuing loop
224 logger.Info(context.Background(), "received-last-event")
225 ep.queueCancelCtx()
226 break
227 }
228 ctx := context.Background()
229 event, ok := elem.(*voltha.Event)
230 if !ok {
231 logger.Warnw(ctx, "invalid-event", log.Fields{"element": elem})
232 continue
233 }
kesavand92fac102022-03-16 12:33:06 +0530234 if err := ep.sendEvent(ctx, event, ""); err != nil {
235 logger.Warnw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
khenaidood948f772021-08-11 17:49:24 -0400236 } else {
237 logger.Debugw(ctx, "successfully-sent-rpc-event-to-kafka-bus", log.Fields{"id": event.Header.Id, "category": event.Header.Category,
238 "sub-category": event.Header.SubCategory, "type": event.Header.Type, "type-version": event.Header.TypeVersion,
239 "reported-ts": event.Header.ReportedTs, "event-type": event.EventType})
240 }
241 }
242 return nil
243}
244
245func (ep *EventProxy) Stop() {
246 if ep.eventQueue != nil {
247 ep.eventQueue.stop()
248 }
249}
250
251type EventQueue struct {
252 mutex sync.RWMutex
253 eventChannel chan interface{}
254 insertPosition *ring.Ring
255 popPosition *ring.Ring
256 dataToSendAvailable chan struct{}
257 readyToSendToKafkaCh chan struct{}
258 eventQueueStopped chan struct{}
259}
260
261func newEventQueue() *EventQueue {
262 ev := &EventQueue{
263 eventChannel: make(chan interface{}),
264 insertPosition: ring.New(EVENT_THRESHOLD),
265 dataToSendAvailable: make(chan struct{}),
266 readyToSendToKafkaCh: make(chan struct{}),
267 eventQueueStopped: make(chan struct{}),
268 }
269 ev.popPosition = ev.insertPosition
270 return ev
271}
272
273// push is invoked to push an event at the back of a queue
274func (eq *EventQueue) push(event interface{}) {
275 eq.mutex.Lock()
276
277 if eq.insertPosition != nil {
278 // Handle Queue is full.
279 // TODO: Current default is to overwrite old data if queue is full. Is there a need to
280 // block caller if max threshold is reached?
281 if eq.insertPosition.Value != nil && eq.insertPosition == eq.popPosition {
282 eq.popPosition = eq.popPosition.Next()
283 }
284
285 // Insert data and move pointer to next empty position
286 eq.insertPosition.Value = event
287 eq.insertPosition = eq.insertPosition.Next()
288
289 // Check for last event
290 if _, ok := event.(*lastEvent); ok {
291 eq.insertPosition = nil
292 }
293 eq.mutex.Unlock()
294 // Notify waiting thread of data availability
295 eq.dataToSendAvailable <- struct{}{}
296
297 } else {
298 logger.Debug(context.Background(), "event-queue-is-closed-as-insert-position-is-cleared")
299 eq.mutex.Unlock()
300 }
301}
302
303// start starts the routine that extracts an element from the event queue and
304// send it to the kafka sending routine to process.
305func (eq *EventQueue) start(ctx context.Context) {
306 logger.Info(ctx, "starting-event-queue")
307loop:
308 for {
309 select {
310 case <-eq.dataToSendAvailable:
311 // Do nothing - use to prevent caller pushing data to block
312 case <-eq.readyToSendToKafkaCh:
313 {
314 // Kafka sending routine is ready to process an event
315 eq.mutex.Lock()
316 element := eq.popPosition.Value
317 if element == nil {
318 // No events to send. Wait
319 eq.mutex.Unlock()
320 select {
321 case _, ok := <-eq.dataToSendAvailable:
322 if !ok {
323 // channel closed
324 eq.eventQueueStopped <- struct{}{}
325 return
326 }
327 case <-ctx.Done():
328 logger.Info(ctx, "event-queue-context-done")
329 eq.eventQueueStopped <- struct{}{}
330 return
331 }
332 eq.mutex.Lock()
333 element = eq.popPosition.Value
334 }
335 eq.popPosition.Value = nil
336 eq.popPosition = eq.popPosition.Next()
337 eq.mutex.Unlock()
338 eq.eventChannel <- element
339 }
340 case <-ctx.Done():
341 logger.Info(ctx, "event-queue-context-done")
342 eq.eventQueueStopped <- struct{}{}
343 break loop
344 }
345 }
346 logger.Info(ctx, "event-queue-stopped")
347
348}
349
350func (eq *EventQueue) stop() {
351 // Flush all
352 eq.push(&lastEvent{})
353 <-eq.eventQueueStopped
354 eq.mutex.Lock()
355 close(eq.readyToSendToKafkaCh)
356 close(eq.dataToSendAvailable)
357 close(eq.eventChannel)
358 eq.mutex.Unlock()
359
360}
kesavand92fac102022-03-16 12:33:06 +0530361
362func (ep *EventProxy) eventTopicExits(ctx context.Context) bool {
363
364 // check if voltha.events topic exists
365 topics, err := ep.kafkaClient.ListTopics(ctx)
366 if err != nil {
367 logger.Errorw(ctx, "fail-to-get-topics", log.Fields{"topic": ep.eventTopic.Name, "error": err})
368 return false
369 }
370
371 logger.Debugw(ctx, "topics in kafka", log.Fields{"topics": topics, "event-topic": ep.eventTopic.Name})
372 for _, topic := range topics {
373 if topic == ep.eventTopic.Name {
374 return true
375 }
376 }
377 return false
378}