reconcile onus and pon ports (VOL-4972)
Change-Id: Ic5d08180c34a291579f35c9608f5a9f05b93d3e2
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
old mode 100644
new mode 100755
index 66e2133..3eb3a08
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -772,11 +772,187 @@
}
}
+func generateOnuIndication(intfID, onuID uint32, operState, adminState string) *oop.Indication {
+ onuInd := &oop.OnuIndication{
+ IntfId: intfID,
+ OnuId: onuID,
+ OperState: operState,
+ AdminState: adminState,
+ }
+ indication := &oop.Indication{
+ Data: &oop.Indication_OnuInd{
+ OnuInd: onuInd,
+ },
+ }
+ return indication
+}
+
+func generateOnuAlarmIndication(intfID uint32, onuID uint32, losStatus string) *oop.AlarmIndication {
+ onuAlarmInd := &oop.OnuAlarmIndication{
+ IntfId: intfID,
+ OnuId: onuID,
+ LosStatus: losStatus,
+ }
+ alarmInd := &oop.AlarmIndication{
+ Data: &oop.AlarmIndication_OnuAlarmInd{
+ OnuAlarmInd: onuAlarmInd,
+ },
+ }
+ return alarmInd
+}
+func generatePonLosAlarmIndication(intfID uint32, losStatus string) *oop.AlarmIndication {
+
+ ponlosAlarmInd := &oop.LosIndication{
+ IntfId: intfID,
+ Status: losStatus,
+ }
+ alarmInd := &oop.AlarmIndication{
+ Data: &oop.AlarmIndication_LosInd{
+ LosInd: ponlosAlarmInd,
+ },
+ }
+ return alarmInd
+}
+func (dh *DeviceHandler) updateIntfOperStateAndRaiseIndication(ctx context.Context, operState string, intfID uint32) {
+ go func() {
+ if err := dh.addPort(ctx, intfID, voltha.Port_PON_OLT, operState, defaultPortSpeedMbps); err != nil {
+ _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "interface-oper-pon", "device-id": dh.device.Id}, err).Log()
+ }
+ }()
+
+ raisedTs := time.Now().Unix()
+ go dh.eventMgr.oltIntfOperIndication(ctx, &oop.IntfOperIndication{Type: "pon", IntfId: intfID, OperState: operState}, dh.device.Id, raisedTs)
+}
+
+func (dh *DeviceHandler) reconcileOnus(ctx context.Context) error {
+
+ onuDevicesFromCore, err := dh.getChildDevicesFromCore(ctx, dh.device.Id)
+ if err != nil {
+
+ logger.Error(ctx, "unable to fetch child device", log.Fields{"eeror": err})
+
+ return err
+ }
+ for _, onuDeviceFromCore := range onuDevicesFromCore.Items {
+
+ onuOperStatusFromCore := onuDeviceFromCore.OperStatus
+ onuConnectStatusFromCore := onuDeviceFromCore.ConnectStatus
+ intfID := plt.PortNoToIntfID(onuDeviceFromCore.ParentPortNo, voltha.Port_PON_OLT)
+
+ onuID := onuDeviceFromCore.ProxyAddress.OnuId
+ onuDeviceFromOlt, err := dh.getOnuInfo(ctx, intfID, &onuID)
+ if err != nil {
+ logger.Error(ctx, "unable to get onu object from olt agent", log.Fields{"eeror": err})
+
+ } else {
+ onuOperStatusFromOlt := onuDeviceFromOlt.GetState()
+ onuLosFromOlt := onuDeviceFromOlt.GetLosi()
+ switch {
+ case onuOperStatusFromOlt.String() == "ACTIVE" && onuOperStatusFromCore.String() != "ACTIVE":
+ OnuIndication := generateOnuIndication(intfID, onuID, "up", "up")
+ dh.putOnuIndicationToChannel(ctx, OnuIndication, intfID)
+
+ case onuLosFromOlt.String() == "ON" && onuConnectStatusFromCore.String() == "REACHABLE":
+ OnuIndication := generateOnuIndication(intfID, onuID, "down", "down") //check bal cli login notepad
+ alarmInd := generateOnuAlarmIndication(intfID, onuID, "on")
+ raisedTs := time.Now().Unix()
+ go dh.eventMgr.ProcessEvents(ctx, alarmInd, dh.device.Id, raisedTs)
+
+ dh.putOnuIndicationToChannel(ctx, OnuIndication, intfID)
+ }
+
+ }
+
+ }
+
+ return nil
+}
+
+func (dh *DeviceHandler) reconcilePonPorts(ctx context.Context) error { // need onuid and pon id
+ portsFromCore, err := dh.getAllPortsFromCore(ctx, &ca.PortFilter{
+ DeviceId: dh.device.Id,
+ PortType: voltha.Port_PON_OLT,
+ })
+ if err != nil {
+ logger.Error(ctx, "unable to fetch ports from core", log.Fields{"eeror": err})
+
+ return err
+
+ }
+ for _, portFromCore := range portsFromCore.Items {
+ portNum := portFromCore.GetPortNo()
+ intfID := plt.PortNoToIntfID(portNum, voltha.Port_PON_OLT)
+ portOperStatusFromCore := portFromCore.OperStatus
+ portAdminStateFromCore := portFromCore.AdminState
+ ponPortFromOlt, err := dh.getIntfInfo(ctx, intfID)
+ if err != nil {
+ logger.Error(ctx, "unable to get pon objects from olt agent", log.Fields{"eeror": err})
+ } else {
+ portLosFromOlt := ponPortFromOlt.GetLos()
+ portStateFromOlt := ponPortFromOlt.GetState()
+ if portOperStatusFromCore.String() == "ACTIVE" && portLosFromOlt.String() == "ON" {
+ logger.Debug(ctx, "port is active in core but los is fired from olt", log.Fields{
+ "portStateFromOlt": portStateFromOlt.String(),
+ "portOperStatusFromCore": portOperStatusFromCore.String(),
+ "device-id": dh.device.Id,
+ "port": portNum})
+ ponLosindication := generatePonLosAlarmIndication(intfID, "on")
+ raisedTs := time.Now().Unix()
+ go dh.eventMgr.ProcessEvents(ctx, ponLosindication, dh.device.Id, raisedTs)
+
+ }
+ switch {
+ case portStateFromOlt.String() == "ACTIVE_WORKING" && portOperStatusFromCore.String() != "ACTIVE":
+ logger.Debug(ctx, "mismatch between port state in voltha core and raising port up event", log.Fields{
+ "portStateFromOlt": portStateFromOlt.String(),
+ "portOperStatusFromCore": portOperStatusFromCore.String(),
+ "device-id": dh.device.Id,
+ "port": portNum})
+ dh.updateIntfOperStateAndRaiseIndication(ctx, "up", intfID)
+ case (portStateFromOlt.String() == "INACTIVE" || portStateFromOlt.String() == "UNKNOWN") && portOperStatusFromCore.String() == "ACTIVE":
+ logger.Debug(ctx, "mismatch between port state in voltha core and raising port down event", log.Fields{
+ "portStateFromOlt": portStateFromOlt.String(),
+ "portOperStatusFromCore": portOperStatusFromCore.String(),
+ "device-id": dh.device.Id,
+ "port": portNum})
+ dh.updateIntfOperStateAndRaiseIndication(ctx, "down", intfID)
+ case portStateFromOlt.String() == "DISABLED" && portAdminStateFromCore.String() == "ENABLED":
+ logger.Error(ctx, "port enabled in device but disabled at votlha core", log.Fields{
+ "device-id": dh.device.Id,
+ "port": portNum})
+ default:
+ logger.Error(ctx, "mismatch between port state in voltha core and voltha device", log.Fields{
+ "portStateFromOlt": portStateFromOlt.String(),
+ "portOperStatusFromCore": portOperStatusFromCore.String(),
+ "device-id": dh.device.Id,
+ "port": portNum})
+
+ }
+
+ }
+
+ }
+
+ return nil
+}
+
// doStateUp handle the olt up indication and update to voltha core
func (dh *DeviceHandler) doStateUp(ctx context.Context) error {
//starting the stat collector
go startCollector(ctx, dh)
-
+ device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
+ if err == nil {
+ if device.OperStatus == voltha.OperStatus_RECONCILING {
+ err = dh.reconcileOnus(ctx)
+ if err != nil {
+ logger.Error(ctx, "unable to reconcile onu", log.Fields{"eeror": err})
+ }
+ err = dh.reconcilePonPorts(ctx)
+ if err != nil {
+ logger.Error(ctx, "unable to reconcile pon ports", log.Fields{"eeror": err})
+ }
+ }
+ }
// instantiate the mcast handler routines.
for i := range dh.incomingMcastFlowOrGroup {
// We land inside the below "if" code path, after the OLT comes back from a reboot, otherwise the routines
@@ -3400,6 +3576,28 @@
}
+func (dh *DeviceHandler) getOnuInfo(ctx context.Context, intfID uint32, onuID *uint32) (*oop.OnuInfo, error) {
+
+ Onu := oop.Onu{IntfId: intfID, OnuId: *onuID}
+ OnuInfo, err := dh.Client.GetOnuInfo(ctx, &Onu)
+ if err != nil {
+ return nil, err
+ }
+ return OnuInfo, nil
+
+}
+
+func (dh *DeviceHandler) getIntfInfo(ctx context.Context, intfID uint32) (*oop.PonIntfInfo, error) {
+
+ Intf := oop.Interface{IntfId: intfID}
+ IntfInfo, err := dh.Client.GetPonInterfaceInfo(ctx, &Intf)
+ if err != nil {
+ return nil, err
+ }
+ return IntfInfo, nil
+
+}
+
func (dh *DeviceHandler) getRxPower(ctx context.Context, rxPowerRequest *extension.GetRxPowerRequest) *extension.SingleGetValueResponse {
Onu := oop.Onu{IntfId: rxPowerRequest.IntfId, OnuId: rxPowerRequest.OnuId}
@@ -3679,6 +3877,16 @@
return cClient.GetDevicePort(subCtx, portFilter)
}
+func (dh *DeviceHandler) getAllPortsFromCore(ctx context.Context, portFilter *ca.PortFilter) (*voltha.Ports, error) {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ return cClient.GetPorts(subCtx, portFilter)
+}
+
/*
Helper functions to communicate with child adapter
*/
diff --git a/pkg/mocks/mockOpenOltClient.go b/pkg/mocks/mockOpenOltClient.go
old mode 100644
new mode 100755