SEBA-949 support for publishing bbsim events on kafka

Change-Id: I4354cd026bbadc801e4d6d08b2f9cd3462917b4c
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index d5ba84f..fa0ba5d 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -61,6 +61,8 @@
 	Flows                map[FlowKey]openolt.Flow
 	Delay                int
 	ControlledActivation mode
+	EventChannel         chan common.Event
+	PublishEvents        bool
 
 	Pons []*PonPort
 	Nnis []*NniPort
@@ -82,7 +84,7 @@
 	return &olt
 }
 
-func CreateOLT(oltId int, nni int, pon int, onuPerPon int, sTag int, cTagInit int, auth bool, dhcp bool, delay int, ca string, enablePerf bool, isMock bool) *OltDevice {
+func CreateOLT(oltId int, nni int, pon int, onuPerPon int, sTag int, cTagInit int, auth bool, dhcp bool, delay int, ca string, enablePerf bool, event bool, isMock bool) *OltDevice {
 	oltLogger.WithFields(log.Fields{
 		"ID":           oltId,
 		"NumNni":       nni,
@@ -104,6 +106,7 @@
 		Delay:        delay,
 		Flows:        make(map[FlowKey]openolt.Flow),
 		enablePerf:   enablePerf,
+		PublishEvents: event,
 	}
 
 	if val, ok := ControlledActivationModes[ca]; ok {
@@ -165,6 +168,12 @@
 		}
 	}
 
+	if olt.PublishEvents {
+		log.Debugf("BBSim event publishing is enabled")
+		// Create a channel to write event messages
+		olt.EventChannel = make(chan common.Event, 100)
+	}
+
 	return &olt
 }
 
@@ -728,6 +737,7 @@
 	oltLogger.WithFields(log.Fields{
 		"OnuSn": onuSnToString(onu.SerialNumber),
 	}).Info("Received ActivateOnu call from VOLTHA")
+	publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), onuSnToString(onu.SerialNumber))
 
 	pon, _ := o.GetPonById(onu.IntfId)
 	_onu, _ := pon.GetOnuBySn(onu.SerialNumber)
@@ -797,6 +807,7 @@
 	oltLogger.WithFields(log.Fields{
 		"oltId": o.ID,
 	}).Info("Disabling OLT")
+	publishEvent("OLT-disable-received", -1, -1, "")
 
 	for _, pon := range o.Pons {
 		if pon.InternalState.Current() == "enabled" {
@@ -833,6 +844,7 @@
 
 func (o *OltDevice) EnableIndication(_ *openolt.Empty, stream openolt.Openolt_EnableIndicationServer) error {
 	oltLogger.WithField("oltId", o.ID).Info("OLT receives EnableIndication call from VOLTHA")
+	publishEvent("OLT-enable-received", -1, -1, "")
 	o.Enable(stream)
 	return nil
 }
@@ -1051,6 +1063,7 @@
 	oltLogger.WithFields(log.Fields{
 		"oltId": o.ID,
 	}).Info("Shutting down")
+	publishEvent("OLT-reboot-received", -1, -1, "")
 	go o.RestartOLT()
 	return new(openolt.Empty), nil
 }
@@ -1059,6 +1072,7 @@
 	oltLogger.WithFields(log.Fields{
 		"oltId": o.ID,
 	}).Info("Received ReenableOlt request from VOLTHA")
+	publishEvent("OLT-reenable-received", -1, -1, "")
 
 	// enable OLT
 	oltMsg := Message{