SEBA-949 support for publishing bbsim events on kafka
Change-Id: I4354cd026bbadc801e4d6d08b2f9cd3462917b4c
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index d5ba84f..fa0ba5d 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -61,6 +61,8 @@
Flows map[FlowKey]openolt.Flow
Delay int
ControlledActivation mode
+ EventChannel chan common.Event
+ PublishEvents bool
Pons []*PonPort
Nnis []*NniPort
@@ -82,7 +84,7 @@
return &olt
}
-func CreateOLT(oltId int, nni int, pon int, onuPerPon int, sTag int, cTagInit int, auth bool, dhcp bool, delay int, ca string, enablePerf bool, isMock bool) *OltDevice {
+func CreateOLT(oltId int, nni int, pon int, onuPerPon int, sTag int, cTagInit int, auth bool, dhcp bool, delay int, ca string, enablePerf bool, event bool, isMock bool) *OltDevice {
oltLogger.WithFields(log.Fields{
"ID": oltId,
"NumNni": nni,
@@ -104,6 +106,7 @@
Delay: delay,
Flows: make(map[FlowKey]openolt.Flow),
enablePerf: enablePerf,
+ PublishEvents: event,
}
if val, ok := ControlledActivationModes[ca]; ok {
@@ -165,6 +168,12 @@
}
}
+ if olt.PublishEvents {
+ log.Debugf("BBSim event publishing is enabled")
+ // Create a channel to write event messages
+ olt.EventChannel = make(chan common.Event, 100)
+ }
+
return &olt
}
@@ -728,6 +737,7 @@
oltLogger.WithFields(log.Fields{
"OnuSn": onuSnToString(onu.SerialNumber),
}).Info("Received ActivateOnu call from VOLTHA")
+ publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), onuSnToString(onu.SerialNumber))
pon, _ := o.GetPonById(onu.IntfId)
_onu, _ := pon.GetOnuBySn(onu.SerialNumber)
@@ -797,6 +807,7 @@
oltLogger.WithFields(log.Fields{
"oltId": o.ID,
}).Info("Disabling OLT")
+ publishEvent("OLT-disable-received", -1, -1, "")
for _, pon := range o.Pons {
if pon.InternalState.Current() == "enabled" {
@@ -833,6 +844,7 @@
func (o *OltDevice) EnableIndication(_ *openolt.Empty, stream openolt.Openolt_EnableIndicationServer) error {
oltLogger.WithField("oltId", o.ID).Info("OLT receives EnableIndication call from VOLTHA")
+ publishEvent("OLT-enable-received", -1, -1, "")
o.Enable(stream)
return nil
}
@@ -1051,6 +1063,7 @@
oltLogger.WithFields(log.Fields{
"oltId": o.ID,
}).Info("Shutting down")
+ publishEvent("OLT-reboot-received", -1, -1, "")
go o.RestartOLT()
return new(openolt.Empty), nil
}
@@ -1059,6 +1072,7 @@
oltLogger.WithFields(log.Fields{
"oltId": o.ID,
}).Info("Received ReenableOlt request from VOLTHA")
+ publishEvent("OLT-reenable-received", -1, -1, "")
// enable OLT
oltMsg := Message{