VOL-1537 : Create the Alarm Framework in golang openolt adapter
Event manager is added to process indications comming from the
OLT and publish them as generic events on to the KAFKA bus which
could be device alarms or KPIs.
It depends on the updated events.proto which contains the defination
for the generic event gRPC message and the event proxy from the VOLTHA
core.
So the changes in voltha-proto needs to be merged first followed by the
changes in voltha-go and then voltha-openolt-adapter.
Change-Id: Ie38b2ea01bd738737522c018e65e685ee41589d5
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index fef55bf..20701ab 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -51,6 +51,7 @@
device *voltha.Device
coreProxy *com.CoreProxy
AdapterProxy *com.AdapterProxy
+ EventProxy *com.EventProxy
openOLT *OpenOLT
exitChannel chan int
lockDevice sync.RWMutex
@@ -58,6 +59,7 @@
transitionMap *TransitionMap
clientCon *grpc.ClientConn
flowMgr *OpenOltFlowMgr
+ eventMgr *OpenOltEventMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
discOnus map[string]bool
onus map[string]*OnuDevice
@@ -87,10 +89,11 @@
}
//NewDeviceHandler creates a new device handler
-func NewDeviceHandler(cp *com.CoreProxy, ap *com.AdapterProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
+func NewDeviceHandler(cp *com.CoreProxy, ap *com.AdapterProxy, ep *com.EventProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
var dh DeviceHandler
dh.coreProxy = cp
dh.AdapterProxy = ap
+ dh.EventProxy = ep
cloned := (proto.Clone(device)).(*voltha.Device)
dh.deviceID = cloned.Id
dh.deviceType = cloned.Type
@@ -254,6 +257,7 @@
}
func (dh *DeviceHandler) handleIndication(indication *oop.Indication) {
+ raisedTs := time.Now().UnixNano()
switch indication.Data.(type) {
case *oop.Indication_OltInd:
dh.handleOltIndication(indication.GetOltInd())
@@ -298,6 +302,8 @@
case *oop.Indication_AlarmInd:
alarmInd := indication.GetAlarmInd()
log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
+ dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
+
}
}
@@ -463,6 +469,8 @@
return errors.New("instantiating flow manager failed")
}
/* TODO: Instantiate Alarm , stats , BW managers */
+ /* Instantiating Event Manager to handle Alarms and KPIs */
+ dh.eventMgr = NewEventMgr(dh.EventProxy)
// Start reading indications
go dh.readIndications()