reconcile onus and pon ports (VOL-4972)

Change-Id: Ic5d08180c34a291579f35c9608f5a9f05b93d3e2
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
old mode 100644
new mode 100755
index 66e2133..3eb3a08
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -772,11 +772,187 @@
 	}
 }
 
+func generateOnuIndication(intfID, onuID uint32, operState, adminState string) *oop.Indication {
+	onuInd := &oop.OnuIndication{
+		IntfId:     intfID,
+		OnuId:      onuID,
+		OperState:  operState,
+		AdminState: adminState,
+	}
+	indication := &oop.Indication{
+		Data: &oop.Indication_OnuInd{
+			OnuInd: onuInd,
+		},
+	}
+	return indication
+}
+
+func generateOnuAlarmIndication(intfID uint32, onuID uint32, losStatus string) *oop.AlarmIndication {
+	onuAlarmInd := &oop.OnuAlarmIndication{
+		IntfId:    intfID,
+		OnuId:     onuID,
+		LosStatus: losStatus,
+	}
+	alarmInd := &oop.AlarmIndication{
+		Data: &oop.AlarmIndication_OnuAlarmInd{
+			OnuAlarmInd: onuAlarmInd,
+		},
+	}
+	return alarmInd
+}
+func generatePonLosAlarmIndication(intfID uint32, losStatus string) *oop.AlarmIndication {
+
+	ponlosAlarmInd := &oop.LosIndication{
+		IntfId: intfID,
+		Status: losStatus,
+	}
+	alarmInd := &oop.AlarmIndication{
+		Data: &oop.AlarmIndication_LosInd{
+			LosInd: ponlosAlarmInd,
+		},
+	}
+	return alarmInd
+}
+func (dh *DeviceHandler) updateIntfOperStateAndRaiseIndication(ctx context.Context, operState string, intfID uint32) {
+	go func() {
+		if err := dh.addPort(ctx, intfID, voltha.Port_PON_OLT, operState, defaultPortSpeedMbps); err != nil {
+			_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "interface-oper-pon", "device-id": dh.device.Id}, err).Log()
+		}
+	}()
+
+	raisedTs := time.Now().Unix()
+	go dh.eventMgr.oltIntfOperIndication(ctx, &oop.IntfOperIndication{Type: "pon", IntfId: intfID, OperState: operState}, dh.device.Id, raisedTs)
+}
+
+func (dh *DeviceHandler) reconcileOnus(ctx context.Context) error {
+
+	onuDevicesFromCore, err := dh.getChildDevicesFromCore(ctx, dh.device.Id)
+	if err != nil {
+
+		logger.Error(ctx, "unable to fetch child device", log.Fields{"eeror": err})
+
+		return err
+	}
+	for _, onuDeviceFromCore := range onuDevicesFromCore.Items {
+
+		onuOperStatusFromCore := onuDeviceFromCore.OperStatus
+		onuConnectStatusFromCore := onuDeviceFromCore.ConnectStatus
+		intfID := plt.PortNoToIntfID(onuDeviceFromCore.ParentPortNo, voltha.Port_PON_OLT)
+
+		onuID := onuDeviceFromCore.ProxyAddress.OnuId
+		onuDeviceFromOlt, err := dh.getOnuInfo(ctx, intfID, &onuID)
+		if err != nil {
+			logger.Error(ctx, "unable to get onu object from olt agent", log.Fields{"eeror": err})
+
+		} else {
+			onuOperStatusFromOlt := onuDeviceFromOlt.GetState()
+			onuLosFromOlt := onuDeviceFromOlt.GetLosi()
+			switch {
+			case onuOperStatusFromOlt.String() == "ACTIVE" && onuOperStatusFromCore.String() != "ACTIVE":
+				OnuIndication := generateOnuIndication(intfID, onuID, "up", "up")
+				dh.putOnuIndicationToChannel(ctx, OnuIndication, intfID)
+
+			case onuLosFromOlt.String() == "ON" && onuConnectStatusFromCore.String() == "REACHABLE":
+				OnuIndication := generateOnuIndication(intfID, onuID, "down", "down") //check bal cli login notepad
+				alarmInd := generateOnuAlarmIndication(intfID, onuID, "on")
+				raisedTs := time.Now().Unix()
+				go dh.eventMgr.ProcessEvents(ctx, alarmInd, dh.device.Id, raisedTs)
+
+				dh.putOnuIndicationToChannel(ctx, OnuIndication, intfID)
+			}
+
+		}
+
+	}
+
+	return nil
+}
+
+func (dh *DeviceHandler) reconcilePonPorts(ctx context.Context) error { // need onuid and pon id
+	portsFromCore, err := dh.getAllPortsFromCore(ctx, &ca.PortFilter{
+		DeviceId: dh.device.Id,
+		PortType: voltha.Port_PON_OLT,
+	})
+	if err != nil {
+		logger.Error(ctx, "unable to fetch ports from core", log.Fields{"eeror": err})
+
+		return err
+
+	}
+	for _, portFromCore := range portsFromCore.Items {
+		portNum := portFromCore.GetPortNo()
+		intfID := plt.PortNoToIntfID(portNum, voltha.Port_PON_OLT)
+		portOperStatusFromCore := portFromCore.OperStatus
+		portAdminStateFromCore := portFromCore.AdminState
+		ponPortFromOlt, err := dh.getIntfInfo(ctx, intfID)
+		if err != nil {
+			logger.Error(ctx, "unable to get pon objects from olt agent", log.Fields{"eeror": err})
+		} else {
+			portLosFromOlt := ponPortFromOlt.GetLos()
+			portStateFromOlt := ponPortFromOlt.GetState()
+			if portOperStatusFromCore.String() == "ACTIVE" && portLosFromOlt.String() == "ON" {
+				logger.Debug(ctx, "port is active in core but los is fired from olt", log.Fields{
+					"portStateFromOlt":       portStateFromOlt.String(),
+					"portOperStatusFromCore": portOperStatusFromCore.String(),
+					"device-id":              dh.device.Id,
+					"port":                   portNum})
+				ponLosindication := generatePonLosAlarmIndication(intfID, "on")
+				raisedTs := time.Now().Unix()
+				go dh.eventMgr.ProcessEvents(ctx, ponLosindication, dh.device.Id, raisedTs)
+
+			}
+			switch {
+			case portStateFromOlt.String() == "ACTIVE_WORKING" && portOperStatusFromCore.String() != "ACTIVE":
+				logger.Debug(ctx, "mismatch between port state in voltha core and raising port up event", log.Fields{
+					"portStateFromOlt":       portStateFromOlt.String(),
+					"portOperStatusFromCore": portOperStatusFromCore.String(),
+					"device-id":              dh.device.Id,
+					"port":                   portNum})
+				dh.updateIntfOperStateAndRaiseIndication(ctx, "up", intfID)
+			case (portStateFromOlt.String() == "INACTIVE" || portStateFromOlt.String() == "UNKNOWN") && portOperStatusFromCore.String() == "ACTIVE":
+				logger.Debug(ctx, "mismatch between port state in voltha core and raising port down event", log.Fields{
+					"portStateFromOlt":       portStateFromOlt.String(),
+					"portOperStatusFromCore": portOperStatusFromCore.String(),
+					"device-id":              dh.device.Id,
+					"port":                   portNum})
+				dh.updateIntfOperStateAndRaiseIndication(ctx, "down", intfID)
+			case portStateFromOlt.String() == "DISABLED" && portAdminStateFromCore.String() == "ENABLED":
+				logger.Error(ctx, "port enabled in device but disabled at votlha core", log.Fields{
+					"device-id": dh.device.Id,
+					"port":      portNum})
+			default:
+				logger.Error(ctx, "mismatch between port state in voltha core and voltha device", log.Fields{
+					"portStateFromOlt":       portStateFromOlt.String(),
+					"portOperStatusFromCore": portOperStatusFromCore.String(),
+					"device-id":              dh.device.Id,
+					"port":                   portNum})
+
+			}
+
+		}
+
+	}
+
+	return nil
+}
+
 // doStateUp handle the olt up indication and update to voltha core
 func (dh *DeviceHandler) doStateUp(ctx context.Context) error {
 	//starting the stat collector
 	go startCollector(ctx, dh)
-
+	device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
+	if err == nil {
+		if device.OperStatus == voltha.OperStatus_RECONCILING {
+			err = dh.reconcileOnus(ctx)
+			if err != nil {
+				logger.Error(ctx, "unable to reconcile onu", log.Fields{"eeror": err})
+			}
+			err = dh.reconcilePonPorts(ctx)
+			if err != nil {
+				logger.Error(ctx, "unable to reconcile pon ports", log.Fields{"eeror": err})
+			}
+		}
+	}
 	// instantiate the mcast handler routines.
 	for i := range dh.incomingMcastFlowOrGroup {
 		// We land inside the below "if" code path, after the OLT comes back from a reboot, otherwise the routines
@@ -3400,6 +3576,28 @@
 
 }
 
+func (dh *DeviceHandler) getOnuInfo(ctx context.Context, intfID uint32, onuID *uint32) (*oop.OnuInfo, error) {
+
+	Onu := oop.Onu{IntfId: intfID, OnuId: *onuID}
+	OnuInfo, err := dh.Client.GetOnuInfo(ctx, &Onu)
+	if err != nil {
+		return nil, err
+	}
+	return OnuInfo, nil
+
+}
+
+func (dh *DeviceHandler) getIntfInfo(ctx context.Context, intfID uint32) (*oop.PonIntfInfo, error) {
+
+	Intf := oop.Interface{IntfId: intfID}
+	IntfInfo, err := dh.Client.GetPonInterfaceInfo(ctx, &Intf)
+	if err != nil {
+		return nil, err
+	}
+	return IntfInfo, nil
+
+}
+
 func (dh *DeviceHandler) getRxPower(ctx context.Context, rxPowerRequest *extension.GetRxPowerRequest) *extension.SingleGetValueResponse {
 
 	Onu := oop.Onu{IntfId: rxPowerRequest.IntfId, OnuId: rxPowerRequest.OnuId}
@@ -3679,6 +3877,16 @@
 	return cClient.GetDevicePort(subCtx, portFilter)
 }
 
+func (dh *DeviceHandler) getAllPortsFromCore(ctx context.Context, portFilter *ca.PortFilter) (*voltha.Ports, error) {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	return cClient.GetPorts(subCtx, portFilter)
+}
+
 /*
 Helper functions to communicate with child adapter
 */
diff --git a/pkg/mocks/mockOpenOltClient.go b/pkg/mocks/mockOpenOltClient.go
old mode 100644
new mode 100755