VOL-812: If fiber is pulled at ONU side, ONU LOS alarm will not get clear from kafka bus.
         ONU state is changed to active. (Refer to VOL-483)
         - When pon cable pulled out from OLT end, LoS event raised for Onu's connected to it
         - When pon cable connected back to OLT, Onu LoS Clear event raised for Onu's if
           Onu Discovery is received and Onu LoS already raised
         - When pon disconnection or connection from Onu end respective Onu LoS Raise/Clear
           Event raised
         - Included Onu serial number while formating onuAlarmIndication event context

Change-Id: Ic1e965da62fb3876ea896788c87d04f376af9e53
diff --git a/internal/pkg/core/openolt_eventmgr.go b/internal/pkg/core/openolt_eventmgr.go
index a44d639..82f300e 100644
--- a/internal/pkg/core/openolt_eventmgr.go
+++ b/internal/pkg/core/openolt_eventmgr.go
@@ -207,7 +207,11 @@
 }
 
 func (em *OpenOltEventMgr) oltLosIndication(oltLos *oop.LosIndication, deviceID string, raisedTs int64) error {
+	var err error = nil
 	var de voltha.DeviceEvent
+	var alarmInd oop.OnuAlarmIndication
+	ponIntdID := PortNoToIntfID(oltLos.IntfId, voltha.Port_PON_OLT)
+
 	context := make(map[string]string)
 	/* Populating event context */
 	context["intf-id"] = strconv.FormatUint(uint64(oltLos.IntfId), base10)
@@ -216,6 +220,24 @@
 	de.ResourceId = deviceID
 	if oltLos.Status == statusCheckOn {
 		de.DeviceEventName = fmt.Sprintf("%s_%s", oltLosEvent, "RAISE_EVENT")
+
+		/* When PON cable disconnected from OLT, it was expected OnuAlarmIndication
+		   with "los_status: on" should be raised for each Onu connected to the PON
+		   but BAL does not raise this Alarm hence manually sending OnuLosRaise event
+		   for all the ONU's connected to PON on receiving LoSIndication for PON */
+		em.handler.onus.Range(func(Onukey interface{}, onuInCache interface{}) bool {
+			if onuInCache.(*OnuDevice).intfID == ponIntdID {
+				alarmInd.IntfId = ponIntdID
+				alarmInd.OnuId = onuInCache.(*OnuDevice).onuID
+				alarmInd.LosStatus = statusCheckOn
+				err = em.onuAlarmIndication(&alarmInd, deviceID, raisedTs)
+			}
+			return true
+		})
+		if err != nil {
+			/* Return if any error encountered while processing ONU LoS Event*/
+			return err
+		}
 	} else {
 		de.DeviceEventName = fmt.Sprintf("%s_%s", oltLosEvent, "CLEAR_EVENT")
 	}
@@ -254,40 +276,120 @@
 	return nil
 }
 
+//wasLosRaised checks whether los raised already. If already raised returns true else false
+func (em *OpenOltEventMgr) wasLosRaised(onuAlarm *oop.OnuAlarmIndication) bool {
+	onuKey := em.handler.formOnuKey(onuAlarm.IntfId, onuAlarm.OnuId)
+	if onuInCache, ok := em.handler.onus.Load(onuKey); ok {
+		log.Debugw("onu-device-found-in-cache.", log.Fields{"intfID": onuAlarm.IntfId, "onuID": onuAlarm.OnuId})
+
+		if onuAlarm.LosStatus == statusCheckOn {
+			if onuInCache.(*OnuDevice).losRaised {
+				log.Warnw("onu-los-raised-already", log.Fields{"onu_id": onuAlarm.OnuId,
+					"intf_id": onuAlarm.IntfId, "LosStatus": onuAlarm.LosStatus})
+				return true
+			}
+			return false
+		}
+	}
+	return true
+}
+
+//wasLosCleared checks whether los cleared already. If already cleared returns true else false
+func (em *OpenOltEventMgr) wasLosCleared(onuAlarm *oop.OnuAlarmIndication) bool {
+	onuKey := em.handler.formOnuKey(onuAlarm.IntfId, onuAlarm.OnuId)
+	if onuInCache, ok := em.handler.onus.Load(onuKey); ok {
+		log.Debugw("onu-device-found-in-cache.", log.Fields{"intfID": onuAlarm.IntfId, "onuID": onuAlarm.OnuId})
+
+		if onuAlarm.LosStatus == statusCheckOff {
+			if !onuInCache.(*OnuDevice).losRaised {
+				log.Warnw("onu-los-cleared-already", log.Fields{"onu_id": onuAlarm.OnuId,
+					"intf_id": onuAlarm.IntfId, "LosStatus": onuAlarm.LosStatus})
+				return true
+			}
+			return false
+		}
+	}
+	return true
+}
+
+func (em *OpenOltEventMgr) getDeviceEventName(onuAlarm *oop.OnuAlarmIndication) string {
+	var deviceEventName string
+	if onuAlarm.LosStatus == statusCheckOn {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLosEvent, "RAISE_EVENT")
+	} else if onuAlarm.LosStatus == statusCheckOff {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLosEvent, "CLEAR_EVENT")
+	} else if onuAlarm.LobStatus == statusCheckOn {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLobEvent, "RAISE_EVENT")
+	} else if onuAlarm.LobStatus == statusCheckOff {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLobEvent, "CLEAR_EVENT")
+	} else if onuAlarm.LopcMissStatus == statusCheckOn {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLopcMissEvent, "RAISE_EVENT")
+	} else if onuAlarm.LopcMissStatus == statusCheckOff {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLopcMissEvent, "CLEAR_EVENT")
+	} else if onuAlarm.LopcMicErrorStatus == statusCheckOn {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLopcMicErrorEvent, "RAISE_EVENT")
+	} else if onuAlarm.LopcMicErrorStatus == statusCheckOff {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLopcMicErrorEvent, "CLEAR_EVENT")
+	} else if onuAlarm.LofiStatus == statusCheckOn {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLossOfFrameEvent, "RAISE_EVENT")
+	} else if onuAlarm.LofiStatus == statusCheckOff {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLossOfFrameEvent, "CLEAR_EVENT")
+	} else if onuAlarm.LoamiStatus == statusCheckOn {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLossOfPloamEvent, "RAISE_EVENT")
+	} else if onuAlarm.LoamiStatus == statusCheckOff {
+		deviceEventName = fmt.Sprintf("%s_%s", onuLossOfPloamEvent, "CLEAR_EVENT")
+	}
+	return deviceEventName
+}
+
 func (em *OpenOltEventMgr) onuAlarmIndication(onuAlarm *oop.OnuAlarmIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
+	var serialNumber string
 	context := make(map[string]string)
 	/* Populating event context */
 	context["intf-id"] = strconv.FormatUint(uint64(onuAlarm.IntfId), base10)
 	context["onu-id"] = strconv.FormatUint(uint64(onuAlarm.OnuId), base10)
+	serialNumber = ""
+	onuKey := em.handler.formOnuKey(onuAlarm.IntfId, onuAlarm.OnuId)
+	if onu, ok := em.handler.onus.Load(onuKey); ok {
+		serialNumber = onu.(*OnuDevice).serialNumber
+	}
+	context["serial-number"] = serialNumber
+
 	/* Populating device event body */
 	de.Context = context
 	de.ResourceId = deviceID
-	if onuAlarm.LosStatus == statusCheckOn {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLosEvent, "RAISE_EVENT")
-	} else if onuAlarm.LosStatus == statusCheckOff {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLosEvent, "CLEAR_EVENT")
-	} else if onuAlarm.LobStatus == statusCheckOn {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLobEvent, "RAISE_EVENT")
-	} else if onuAlarm.LobStatus == statusCheckOff {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLobEvent, "CLEAR_EVENT")
-	} else if onuAlarm.LopcMissStatus == statusCheckOn {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLopcMissEvent, "RAISE_EVENT")
-	} else if onuAlarm.LopcMissStatus == statusCheckOff {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLopcMissEvent, "CLEAR_EVENT")
-	} else if onuAlarm.LopcMicErrorStatus == statusCheckOn {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLopcMicErrorEvent, "RAISE_EVENT")
-	} else if onuAlarm.LopcMicErrorStatus == statusCheckOff {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLopcMicErrorEvent, "CLEAR_EVENT")
-	} else if onuAlarm.LofiStatus == statusCheckOn {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfFrameEvent, "RAISE_EVENT")
-	} else if onuAlarm.LofiStatus == statusCheckOff {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfFrameEvent, "CLEAR_EVENT")
-	} else if onuAlarm.LoamiStatus == statusCheckOn {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfPloamEvent, "RAISE_EVENT")
-	} else if onuAlarm.LoamiStatus == statusCheckOff {
-		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfPloamEvent, "CLEAR_EVENT")
+	de.DeviceEventName = em.getDeviceEventName(onuAlarm)
+
+	switch onuAlarm.LosStatus {
+	case statusCheckOn:
+		if em.wasLosRaised(onuAlarm) {
+			/* No need to raise Onu Los Event as it might have already raised
+			   or Onu might have deleted */
+			return nil
+		}
+		onuKey := em.handler.formOnuKey(onuAlarm.IntfId, onuAlarm.OnuId)
+		if onuInCache, ok := em.handler.onus.Load(onuKey); ok {
+			/* Update onu device with LoS raised state as true */
+			em.handler.onus.Store(onuKey, NewOnuDevice(onuInCache.(*OnuDevice).deviceID, onuInCache.(*OnuDevice).deviceType,
+				onuInCache.(*OnuDevice).serialNumber, onuInCache.(*OnuDevice).onuID, onuInCache.(*OnuDevice).intfID,
+				onuInCache.(*OnuDevice).proxyDeviceID, true))
+		}
+	case statusCheckOff:
+		if em.wasLosCleared(onuAlarm) {
+			/* No need to clear Onu Los Event as it might have already cleared
+			   or Onu might have deleted */
+			return nil
+		}
+		onuKey := em.handler.formOnuKey(onuAlarm.IntfId, onuAlarm.OnuId)
+		if onuInCache, ok := em.handler.onus.Load(onuKey); ok {
+			/* Update onu device with LoS raised state as false */
+			em.handler.onus.Store(onuKey, NewOnuDevice(onuInCache.(*OnuDevice).deviceID, onuInCache.(*OnuDevice).deviceType,
+				onuInCache.(*OnuDevice).serialNumber, onuInCache.(*OnuDevice).onuID, onuInCache.(*OnuDevice).intfID,
+				onuInCache.(*OnuDevice).proxyDeviceID, false))
+		}
 	}
+
 	/* Send event to KAFKA */
 	if err := em.eventProxy.SendDeviceEvent(&de, communication, onu, raisedTs); err != nil {
 		log.Errorw("Failed to send ONU Los event", log.Fields{"onu-id": onuAlarm.OnuId, "intf-id": onuAlarm.IntfId})