[VOL-4293] OpenONU Adapter update for gRPC migration
Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index cfe99c9..2c5a4c8 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -25,25 +25,23 @@
"sync"
"time"
- "github.com/opencord/voltha-protos/v4/go/tech_profile"
+ "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
+ "github.com/opencord/voltha-protos/v5/go/tech_profile"
"github.com/gogo/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
"github.com/looplab/fsm"
me "github.com/opencord/omci-lib-go/generated"
- "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v5/pkg/db"
- "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
- flow "github.com/opencord/voltha-lib-go/v5/pkg/flows"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- vc "github.com/opencord/voltha-protos/v4/go/common"
- "github.com/opencord/voltha-protos/v4/go/extension"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
- "github.com/opencord/voltha-protos/v4/go/openflow_13"
- of "github.com/opencord/voltha-protos/v4/go/openflow_13"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- oop "github.com/opencord/voltha-protos/v4/go/openolt"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+ flow "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ vc "github.com/opencord/voltha-protos/v5/go/common"
+ "github.com/opencord/voltha-protos/v5/go/extension"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+ of "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ oop "github.com/opencord/voltha-protos/v5/go/openolt"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
// Constants for timeouts
@@ -178,11 +176,11 @@
parentID string
ponPortNumber uint32
- coreProxy adapterif.CoreProxy
- AdapterProxy adapterif.AdapterProxy
- EventProxy eventif.EventProxy
+ coreClient *vgrpc.Client
+ EventProxy eventif.EventProxy
pmConfigs *voltha.PmConfigs
+ config *config.AdapterFlags
pOpenOnuAc *OpenONUAC
pDeviceStateFsm *fsm.FSM
@@ -237,11 +235,11 @@
}
//newDeviceHandler creates a new device handler
-func newDeviceHandler(ctx context.Context, cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy, device *voltha.Device, adapter *OpenONUAC) *deviceHandler {
+func newDeviceHandler(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy, device *voltha.Device, adapter *OpenONUAC) *deviceHandler {
var dh deviceHandler
- dh.coreProxy = cp
- dh.AdapterProxy = ap
+ dh.coreClient = cc
dh.EventProxy = ep
+ dh.config = adapter.config
cloned := (proto.Clone(device)).(*voltha.Device)
dh.deviceID = cloned.Id
dh.DeviceType = cloned.Type
@@ -327,7 +325,7 @@
//adoptOrReconcileDevice adopts the ONU device
func (dh *deviceHandler) adoptOrReconcileDevice(ctx context.Context, device *voltha.Device) {
- logger.Debugw(ctx, "Adopt_or_reconcile_device", log.Fields{"device-id": device.Id, "Address": device.GetHostAndPort()})
+ logger.Debugw(ctx, "adopt_or_reconcile_device", log.Fields{"device-id": device.Id, "Address": device.GetHostAndPort()})
logger.Debugw(ctx, "Device FSM: ", log.Fields{"state": string(dh.pDeviceStateFsm.Current())})
if dh.pDeviceStateFsm.Is(devStNull) {
@@ -338,7 +336,7 @@
// device.PmConfigs is not nil in cases when adapter restarts. We should not re-set the core again.
if device.PmConfigs == nil {
// Now, set the initial PM configuration for that device
- if err := dh.coreProxy.DevicePMConfigUpdate(ctx, dh.pmConfigs); err != nil {
+ if err := dh.updatePMConfigInCore(ctx, dh.pmConfigs); err != nil {
logger.Errorw(ctx, "error updating pm config to core", log.Fields{"device-id": dh.deviceID, "err": err})
}
}
@@ -348,36 +346,26 @@
}
-func (dh *deviceHandler) processInterAdapterOMCIReceiveMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
- msgBody := msg.GetBody()
- omciMsg := &ic.InterAdapterOmciMessage{}
- if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-omci-msg-body", log.Fields{
- "device-id": dh.deviceID, "error": err})
- return err
- }
-
+func (dh *deviceHandler) handleOMCIIndication(ctx context.Context, msg *ic.OmciMessage) error {
/* msg print moved symmetrically to omci_cc, if wanted here as additional debug, than perhaps only based on additional debug setting!
//assuming omci message content is hex coded!
// with restricted output of 16(?) bytes would be ...omciMsg.Message[:16]
logger.Debugw(ctx, "inter-adapter-recv-omci", log.Fields{
"device-id": dh.deviceID, "RxOmciMessage": hex.EncodeToString(omciMsg.Message)})
*/
+
pDevEntry := dh.getOnuDeviceEntry(ctx, true)
if pDevEntry != nil {
if pDevEntry.PDevOmciCC != nil {
- return pDevEntry.PDevOmciCC.receiveMessage(log.WithSpanFromContext(context.TODO(), ctx), omciMsg.Message)
+ return pDevEntry.PDevOmciCC.receiveMessage(log.WithSpanFromContext(context.TODO(), ctx), msg.Message)
}
- logger.Debugw(ctx, "omciCC not ready to receive omci messages - incoming omci message ignored", log.Fields{"rxMsg": omciMsg.Message})
+ logger.Debugw(ctx, "omciCC not ready to receive omci messages - incoming omci message ignored", log.Fields{"rxMsg": msg.Message})
}
logger.Errorw(ctx, "No valid OnuDevice -aborting", log.Fields{"device-id": dh.deviceID})
return fmt.Errorf("no valid OnuDevice: %s", dh.deviceID)
}
-func (dh *deviceHandler) processInterAdapterTechProfileDownloadReqMessage(
- ctx context.Context,
- msg *ic.InterAdapterMessage) error {
-
+func (dh *deviceHandler) handleTechProfileDownloadRequest(ctx context.Context, techProfMsg *ic.TechProfileDownloadMessage) error {
logger.Infow(ctx, "tech-profile-download-request", log.Fields{"device-id": dh.deviceID})
pDevEntry := dh.getOnuDeviceEntry(ctx, true)
@@ -400,14 +388,6 @@
// at least 'mib-downloaded' should be reached for processing of this specific ONU configuration
// if (dh.deviceReason == "stopping-openomci") || (dh.deviceReason == "omci-admin-lock")
- msgBody := msg.GetBody()
- techProfMsg := &ic.InterAdapterTechProfileDownloadMessage{}
- if err := ptypes.UnmarshalAny(msgBody, techProfMsg); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-techprof-msg-body", log.Fields{
- "device-id": dh.deviceID, "error": err})
- return err
- }
-
// we have to lock access to TechProfile processing based on different messageType calls or
// even to fast subsequent calls of the same messageType as well as OnuKVStore processing due
// to possible concurrent access by flow processing
@@ -429,7 +409,7 @@
if bTpModify := pDevEntry.updateOnuUniTpPath(ctx, uniID, uint8(tpID), techProfMsg.TpInstancePath); bTpModify {
switch tpInst := techProfMsg.TechTpInstance.(type) {
- case *ic.InterAdapterTechProfileDownloadMessage_TpInstance:
+ case *ic.TechProfileDownloadMessage_TpInstance:
logger.Debugw(ctx, "onu-uni-tp-path-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.TpInstancePath, "tpID": tpID})
// if there has been some change for some uni TechProfilePath
//in order to allow concurrent calls to other dh instances we do not wait for execution here
@@ -475,9 +455,8 @@
return nil
}
-func (dh *deviceHandler) processInterAdapterDeleteGemPortReqMessage(
- ctx context.Context,
- msg *ic.InterAdapterMessage) error {
+func (dh *deviceHandler) handleDeleteGemPortRequest(ctx context.Context, delGemPortMsg *ic.DeleteGemPortMessage) error {
+ logger.Infow(ctx, "delete-gem-port-request", log.Fields{"device-id": dh.deviceID})
if dh.pOnuTP == nil {
//should normally not happen ...
@@ -485,15 +464,6 @@
log.Fields{"device-id": dh.deviceID})
return fmt.Errorf("techProfile DelGem request while onuTechProf instance not setup: %s", dh.deviceID)
}
-
- msgBody := msg.GetBody()
- delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{}
- if err := ptypes.UnmarshalAny(msgBody, delGemPortMsg); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-delete-gem-msg-body", log.Fields{
- "device-id": dh.deviceID, "error": err})
- return err
- }
-
//compare TECH_PROFILE_DOWNLOAD_REQUEST
dh.pOnuTP.lockTpProcMutex()
defer dh.pOnuTP.unlockTpProcMutex()
@@ -516,9 +486,8 @@
}
-func (dh *deviceHandler) processInterAdapterDeleteTcontReqMessage(
- ctx context.Context,
- msg *ic.InterAdapterMessage) error {
+func (dh *deviceHandler) handleDeleteTcontRequest(ctx context.Context, delTcontMsg *ic.DeleteTcontMessage) error {
+ logger.Infow(ctx, "delete-tcont-request", log.Fields{"device-id": dh.deviceID})
pDevEntry := dh.getOnuDeviceEntry(ctx, true)
if pDevEntry == nil {
@@ -532,14 +501,6 @@
return fmt.Errorf("techProfile DelTcont request while onuTechProf instance not setup: %s", dh.deviceID)
}
- msgBody := msg.GetBody()
- delTcontMsg := &ic.InterAdapterDeleteTcontMessage{}
- if err := ptypes.UnmarshalAny(msgBody, delTcontMsg); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-delete-tcont-msg-body", log.Fields{
- "device-id": dh.deviceID, "error": err})
- return err
- }
-
//compare TECH_PROFILE_DOWNLOAD_REQUEST
dh.pOnuTP.lockTpProcMutex()
defer dh.pOnuTP.unlockTpProcMutex()
@@ -616,52 +577,10 @@
return nil
}
-//processInterAdapterMessage sends the proxied messages to the target device
-// If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
-// is meant, and then send the unmarshalled omci message to this onu
-func (dh *deviceHandler) processInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
- msgID := msg.Header.Id
- msgType := msg.Header.Type
- fromTopic := msg.Header.FromTopic
- toTopic := msg.Header.ToTopic
- toDeviceID := msg.Header.ToDeviceId
- proxyDeviceID := msg.Header.ProxyDeviceId
- logger.Debugw(ctx, "InterAdapter message header", log.Fields{"msgID": msgID, "msgType": msgType,
- "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
-
- switch msgType {
- // case ic.InterAdapterMessageType_ONU_IND_REQUEST: was handled by OpenONUAC already - see comments there
- //OMCI_RESPONSE also accepted acc. to VOL-3756 (OMCI_REQUEST request was legacy code)
- case ic.InterAdapterMessageType_OMCI_RESPONSE, ic.InterAdapterMessageType_OMCI_REQUEST:
- {
- return dh.processInterAdapterOMCIReceiveMessage(ctx, msg)
- }
- case ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST:
- {
- return dh.processInterAdapterTechProfileDownloadReqMessage(ctx, msg)
- }
- case ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST:
- {
- return dh.processInterAdapterDeleteGemPortReqMessage(ctx, msg)
-
- }
- case ic.InterAdapterMessageType_DELETE_TCONT_REQUEST:
- {
- return dh.processInterAdapterDeleteTcontReqMessage(ctx, msg)
- }
- default:
- {
- logger.Errorw(ctx, "inter-adapter-unhandled-type", log.Fields{
- "msgType": msg.Header.Type, "device-id": dh.deviceID})
- return fmt.Errorf("inter-adapter-unhandled-type: %d, %s", msg.Header.Type, dh.deviceID)
- }
- }
-}
-
//FlowUpdateIncremental removes and/or adds the flow changes on a given device
func (dh *deviceHandler) FlowUpdateIncremental(ctx context.Context,
- apOfFlowChanges *openflow_13.FlowChanges,
- apOfGroupChanges *openflow_13.FlowGroupChanges, apFlowMetaData *voltha.FlowMetadata) error {
+ apOfFlowChanges *of.FlowChanges,
+ apOfGroupChanges *of.FlowGroupChanges, apFlowMetaData *voltha.FlowMetadata) error {
logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.deviceID, "metadata": apFlowMetaData})
var retError error = nil
//Remove flows (always remove flows first - remove old and add new with same cookie may be part of the same request)
@@ -806,8 +725,11 @@
} else {
logger.Debugw(ctx, "DeviceStateUpdate upon disable", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
"OperStatus": voltha.OperStatus_UNKNOWN, "device-id": dh.deviceID})
- if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx),
- dh.deviceID, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.deviceID,
+ ConnStatus: voltha.ConnectStatus_REACHABLE,
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ }); err != nil {
//TODO with VOL-3045/VOL-3046: return the error and stop further processing
logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
}
@@ -914,10 +836,16 @@
techProfsFound = true // set to true if we found TP once for any UNI port
for tpID := range uniData.PersTpPathMap {
// Request the TpInstance again from the openolt adapter in case of reconcile
- iaTechTpInst, err := dh.AdapterProxy.TechProfileInstanceRequest(ctx, uniData.PersTpPathMap[tpID],
- dh.device.ParentPortNo, dh.device.ProxyAddress.OnuId, uint32(uniData.PersUniID),
- dh.pOpenOnuAc.config.Topic, dh.ProxyAddressType,
- dh.parentID, dh.ProxyAddressID)
+ iaTechTpInst, err := dh.getTechProfileInstanceFromParentAdapter(ctx,
+ dh.device.ProxyAddress.AdapterEndpoint,
+ &ic.TechProfileInstanceRequestMessage{
+ DeviceId: dh.device.Id,
+ TpInstancePath: uniData.PersTpPathMap[tpID],
+ ParentDeviceId: dh.parentID,
+ ParentPonPort: dh.device.ParentPortNo,
+ OnuId: dh.device.ProxyAddress.OnuId,
+ UniId: uint32(uniData.PersUniID),
+ })
if err != nil || iaTechTpInst == nil {
logger.Errorw(ctx, "error fetching tp instance",
log.Fields{"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID, "err": err})
@@ -926,7 +854,7 @@
}
var tpInst tech_profile.TechProfileInstance
switch techTpInst := iaTechTpInst.TechTpInstance.(type) {
- case *ic.InterAdapterTechProfileDownloadMessage_TpInstance: // supports only GPON, XGPON, XGS-PON
+ case *ic.TechProfileDownloadMessage_TpInstance: // supports only GPON, XGPON, XGS-PON
tpInst = *techTpInst.TpInstance
logger.Debugw(ctx, "received-tp-instance-successfully-after-reconcile", log.Fields{
"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID})
@@ -1152,8 +1080,11 @@
logger.Debugw(ctx, "call DeviceStateUpdate upon reboot", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
"OperStatus": voltha.OperStatus_DISCOVERED, "device-id": dh.deviceID})
- if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.ConnectStatus_REACHABLE,
- voltha.OperStatus_DISCOVERED); err != nil {
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.deviceID,
+ ConnStatus: voltha.ConnectStatus_REACHABLE,
+ OperStatus: voltha.OperStatus_DISCOVERED,
+ }); err != nil {
//TODO with VOL-3045/VOL-3046: return the error and stop further processing
logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
return
@@ -1294,8 +1225,7 @@
dh.lockUpgradeFsm.RLock()
if dh.pOnuUpradeFsm != nil {
dh.lockUpgradeFsm.RUnlock()
- onuVolthaDevice, getErr := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx),
- dh.deviceID, dh.deviceID)
+ onuVolthaDevice, getErr := dh.getDeviceFromCore(ctx, dh.deviceID)
if getErr != nil || onuVolthaDevice == nil {
logger.Errorw(ctx, "Failed to fetch Onu device for image activation", log.Fields{"device-id": dh.deviceID, "err": getErr})
return nil, fmt.Errorf("could not fetch device for device-id: %s", dh.deviceID)
@@ -1358,8 +1288,7 @@
dh.lockUpgradeFsm.RLock()
if dh.pOnuUpradeFsm != nil {
dh.lockUpgradeFsm.RUnlock()
- onuVolthaDevice, getErr := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx),
- dh.deviceID, dh.deviceID)
+ onuVolthaDevice, getErr := dh.getDeviceFromCore(ctx, dh.deviceID)
if getErr != nil || onuVolthaDevice == nil {
logger.Errorw(ctx, "Failed to fetch Onu device for image commitment", log.Fields{"device-id": dh.deviceID, "err": getErr})
return nil, fmt.Errorf("could not fetch device for device-id: %s", dh.deviceID)
@@ -1513,7 +1442,9 @@
if !dh.isReconciling() {
logger.Infow(ctx, "DeviceUpdate", log.Fields{"deviceReason": dh.device.Reason, "device-id": dh.deviceID})
- _ = dh.coreProxy.DeviceUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device)
+ if err := dh.updateDeviceInCore(ctx, dh.device); err != nil {
+ logger.Errorw(ctx, "device-update-failed", log.Fields{"device-id": dh.device.Id, "error": err})
+ }
//TODO Need to Update Device Reason To CORE as part of device update userstory
} else {
logger.Debugw(ctx, "reconciling - don't notify core about DeviceUpdate",
@@ -1547,6 +1478,7 @@
}
pPonPort := &voltha.Port{
+ DeviceId: dh.deviceID,
PortNo: ponPortNo,
Label: fmt.Sprintf("pon-%d", ponPortNo),
Type: voltha.Port_PON_ONU,
@@ -1554,7 +1486,7 @@
Peers: []*voltha.Port_PeerPort{{DeviceId: dh.parentID, // Peer device is OLT
PortNo: ponPortNo}}, // Peer port is parent's port number
}
- if err = dh.coreProxy.PortCreated(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, pPonPort); err != nil {
+ if err = dh.createPortInCore(ctx, pPonPort); err != nil {
logger.Fatalf(ctx, "Device FSM: PortCreated-failed-%s", err)
e.Cancel(err)
return
@@ -1814,8 +1746,12 @@
}
logger.Debugw(ctx, "call DeviceStateUpdate upon create interface", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
"OperStatus": voltha.OperStatus_ACTIVATING, "device-id": dh.deviceID})
- if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID,
- voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVATING); err != nil {
+
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.deviceID,
+ OperStatus: voltha.OperStatus_ACTIVATING,
+ ConnStatus: voltha.ConnectStatus_REACHABLE,
+ }); err != nil {
//TODO with VOL-3045/VOL-3046: return the error and stop further processing
logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
}
@@ -2031,8 +1967,11 @@
}
logger.Debugw(ctx, "call DeviceStateUpdate upon update interface", log.Fields{"ConnectStatus": voltha.ConnectStatus_UNREACHABLE,
"OperStatus": voltha.OperStatus_DISCOVERED, "device-id": dh.deviceID})
- if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID,
- voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_DISCOVERED); err != nil {
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.deviceID,
+ ConnStatus: voltha.ConnectStatus_UNREACHABLE,
+ OperStatus: voltha.OperStatus_DISCOVERED,
+ }); err != nil {
//TODO with VOL-3045/VOL-3046: return the error and stop further processing
logger.Errorw(ctx, "error-updating-device-state unreachable-discovered",
log.Fields{"device-id": dh.deviceID, "error": err})
@@ -2208,8 +2147,11 @@
// in case of adapter restart connected to an ONU upgrade I would not rely on the image quality
// maybe some 'forced' commitment can be done in this situation from system management (or upgrade restarted)
dh.checkOnOnuImageCommit(ctx)
- if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID,
- voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE); err != nil {
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.deviceID,
+ ConnStatus: voltha.ConnectStatus_REACHABLE,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ }); err != nil {
//TODO with VOL-3045/VOL-3046: return the error and stop further processing
logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
} else {
@@ -2292,8 +2234,12 @@
func (dh *deviceHandler) processUniDisableStateDoneEvent(ctx context.Context, devEvent OnuDeviceEvent) {
logger.Debugw(ctx, "DeviceStateUpdate upon disable", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
"OperStatus": voltha.OperStatus_UNKNOWN, "device-id": dh.deviceID})
- if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx),
- dh.deviceID, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.deviceID,
+ ConnStatus: voltha.ConnectStatus_REACHABLE,
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ }); err != nil {
//TODO with VOL-3045/VOL-3046: return the error and stop further processing
logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
}
@@ -2322,8 +2268,11 @@
func (dh *deviceHandler) processUniEnableStateDoneEvent(ctx context.Context, devEvent OnuDeviceEvent) {
logger.Debugw(ctx, "DeviceStateUpdate upon re-enable", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
"OperStatus": voltha.OperStatus_ACTIVE, "device-id": dh.deviceID})
- if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.ConnectStatus_REACHABLE,
- voltha.OperStatus_ACTIVE); err != nil {
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.deviceID,
+ ConnStatus: voltha.ConnectStatus_REACHABLE,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ }); err != nil {
//TODO with VOL-3045/VOL-3046: return the error and stop further processing
logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
}
@@ -2537,7 +2486,16 @@
uniPort.setOperState(vc.OperStatus_ACTIVE)
if !dh.isReconciling() {
//maybe also use getter functions on uniPort - perhaps later ...
- go dh.coreProxy.PortStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.Port_ETHERNET_UNI, uniPort.portNo, uniPort.operState)
+ go func(port *onuUniPort) {
+ if err := dh.updatePortStateInCore(ctx, &ic.PortState{
+ DeviceId: dh.deviceID,
+ PortType: voltha.Port_ETHERNET_UNI,
+ PortNo: port.portNo,
+ OperStatus: port.operState,
+ }); err != nil {
+ logger.Errorw(ctx, "port-state-update-failed", log.Fields{"error": err, "port-no": uniPort.portNo, "device-id": dh.deviceID})
+ }
+ }(uniPort)
} else {
logger.Debugw(ctx, "reconciling - don't notify core about PortStateUpdate", log.Fields{"device-id": dh.deviceID})
}
@@ -2557,7 +2515,16 @@
uniPort.setOperState(vc.OperStatus_UNKNOWN)
if !dh.isReconciling() {
//maybe also use getter functions on uniPort - perhaps later ...
- go dh.coreProxy.PortStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.Port_ETHERNET_UNI, uniPort.portNo, uniPort.operState)
+ go func(port *onuUniPort) {
+ if err := dh.updatePortStateInCore(ctx, &ic.PortState{
+ DeviceId: dh.deviceID,
+ PortType: voltha.Port_ETHERNET_UNI,
+ PortNo: port.portNo,
+ OperStatus: port.operState,
+ }); err != nil {
+ logger.Errorw(ctx, "port-state-update-failed", log.Fields{"error": err, "port-no": uniPort.portNo, "device-id": dh.deviceID})
+ }
+ }(uniPort)
} else {
logger.Debugw(ctx, "reconciling - don't notify core about PortStateUpdate", log.Fields{"device-id": dh.deviceID})
}
@@ -2573,7 +2540,7 @@
eventContext := make(map[string]string)
//Populating event context
// assume giving ParentId in GetDevice twice really gives the ParentDevice (there is no GetParentDevice()...)
- parentDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.parentID, dh.parentID)
+ parentDevice, err := dh.getDeviceFromCore(ctx, dh.parentID)
if err != nil || parentDevice == nil {
logger.Errorw(ctx, "Failed to fetch parent device for OnuEvent",
log.Fields{"parentID": dh.parentID, "err": err})
@@ -2853,7 +2820,7 @@
return kvbackend
}
-func (dh *deviceHandler) getFlowOfbFields(ctx context.Context, apFlowItem *ofp.OfpFlowStats, loMatchVlan *uint16,
+func (dh *deviceHandler) getFlowOfbFields(ctx context.Context, apFlowItem *of.OfpFlowStats, loMatchVlan *uint16,
loAddPcp *uint8, loIPProto *uint32) {
for _, field := range flow.GetOfbFields(apFlowItem) {
@@ -2930,7 +2897,7 @@
} //for all OfbFields
}
-func (dh *deviceHandler) getFlowActions(ctx context.Context, apFlowItem *ofp.OfpFlowStats, loSetPcp *uint8, loSetVlan *uint16) {
+func (dh *deviceHandler) getFlowActions(ctx context.Context, apFlowItem *of.OfpFlowStats, loSetPcp *uint8, loSetVlan *uint16) {
for _, action := range flow.GetActions(apFlowItem) {
switch action.Type {
/* not used:
@@ -2976,7 +2943,7 @@
}
//addFlowItemToUniPort parses the actual flow item to add it to the UniPort
-func (dh *deviceHandler) addFlowItemToUniPort(ctx context.Context, apFlowItem *ofp.OfpFlowStats, apUniPort *onuUniPort,
+func (dh *deviceHandler) addFlowItemToUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *onuUniPort,
apFlowMetaData *voltha.FlowMetadata) error {
var loSetVlan uint16 = uint16(of.OfpVlanId_OFPVID_NONE) //noValidEntry
var loMatchVlan uint16 = uint16(of.OfpVlanId_OFPVID_PRESENT) //reserved VLANID entry
@@ -3067,7 +3034,7 @@
}
//removeFlowItemFromUniPort parses the actual flow item to remove it from the UniPort
-func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *ofp.OfpFlowStats, apUniPort *onuUniPort) error {
+func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *onuUniPort) error {
//optimization and assumption: the flow cookie uniquely identifies the flow and with that the internal rule
//hence only the cookie is used here to find the relevant flow and possibly remove the rule
//no extra check is done on the rule parameters
@@ -3300,7 +3267,10 @@
dh.setDeviceReason(deviceReason)
if notifyCore {
//TODO with VOL-3045/VOL-3046: return the error and stop further processing at calling position
- if err := dh.coreProxy.DeviceReasonUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, deviceReasonMap[deviceReason]); err != nil {
+ if err := dh.updateDeviceReasonInCore(ctx, &ic.DeviceReason{
+ DeviceId: dh.deviceID,
+ Reason: deviceReasonMap[deviceReason],
+ }); err != nil {
logger.Errorf(ctx, "DeviceReasonUpdate error: %s",
log.Fields{"device-id": dh.deviceID, "error": err}, deviceReasonMap[deviceReason])
return err
@@ -3715,7 +3685,11 @@
}
logger.Debugw(ctx, "reconciling has been finished in time",
log.Fields{"device-id": dh.deviceID})
- if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.deviceID, connectStatus, operState); err != nil {
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.deviceID,
+ ConnStatus: connectStatus,
+ OperStatus: operState,
+ }); err != nil {
logger.Errorw(ctx, "unable to update device state to core",
log.Fields{"device-id": dh.deviceID, "Err": err})
}
@@ -3830,8 +3804,130 @@
}
logger.Debugw(ctx, "Core DeviceStateUpdate", log.Fields{"connectStatus": connectStatus, "operState": voltha.OperStatus_RECONCILING_FAILED})
- if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.deviceID, connectStatus, voltha.OperStatus_RECONCILING_FAILED); err != nil {
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.deviceID,
+ ConnStatus: connectStatus,
+ OperStatus: voltha.OperStatus_RECONCILING_FAILED,
+ }); err != nil {
logger.Errorw(ctx, "unable to update device state to core",
log.Fields{"device-id": dh.deviceID, "Err": err})
}
}
+
+/*
+Helper functions to communicate with Core
+*/
+
+func (dh *deviceHandler) getDeviceFromCore(ctx context.Context, deviceID string) (*voltha.Device, error) {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+ defer cancel()
+ logger.Debugw(subCtx, "get-device-from-core", log.Fields{"device-id": deviceID})
+ return cClient.GetDevice(subCtx, &vc.ID{Id: deviceID})
+}
+
+func (dh *deviceHandler) updateDeviceStateInCore(ctx context.Context, deviceStateFilter *ic.DeviceStateFilter) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+ defer cancel()
+ _, err = cClient.DeviceStateUpdate(subCtx, deviceStateFilter)
+ logger.Debugw(subCtx, "device-updated-in-core", log.Fields{"device-state": deviceStateFilter, "error": err})
+ return err
+}
+
+func (dh *deviceHandler) updatePMConfigInCore(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+ defer cancel()
+ _, err = cClient.DevicePMConfigUpdate(subCtx, pmConfigs)
+ logger.Debugw(subCtx, "pmconfig-updated-in-core", log.Fields{"pm-configs": pmConfigs, "error": err})
+ return err
+}
+
+func (dh *deviceHandler) updateDeviceInCore(ctx context.Context, device *voltha.Device) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+ defer cancel()
+ _, err = cClient.DeviceUpdate(subCtx, device)
+ logger.Debugw(subCtx, "device-updated-in-core", log.Fields{"device-id": device.Id, "error": err})
+ return err
+}
+
+func (dh *deviceHandler) createPortInCore(ctx context.Context, port *voltha.Port) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+ defer cancel()
+ _, err = cClient.PortCreated(subCtx, port)
+ logger.Debugw(subCtx, "port-created-in-core", log.Fields{"port": port, "error": err})
+ return err
+}
+
+func (dh *deviceHandler) updatePortStateInCore(ctx context.Context, portState *ic.PortState) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+ defer cancel()
+ _, err = cClient.PortStateUpdate(subCtx, portState)
+ logger.Debugw(subCtx, "port-state-updated-in-core", log.Fields{"port-state": portState, "error": err})
+ return err
+}
+
+func (dh *deviceHandler) updateDeviceReasonInCore(ctx context.Context, reason *ic.DeviceReason) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+ defer cancel()
+ _, err = cClient.DeviceReasonUpdate(subCtx, reason)
+ logger.Debugw(subCtx, "device-reason-updated-in-core", log.Fields{"reason": reason, "error": err})
+ return err
+}
+
+/*
+Helper functions to communicate with parent adapter
+*/
+
+func (dh *deviceHandler) getTechProfileInstanceFromParentAdapter(ctx context.Context, parentEndpoint string,
+ request *ic.TechProfileInstanceRequestMessage) (*ic.TechProfileDownloadMessage, error) {
+ pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
+ if err != nil || pgClient == nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.MaxTimeoutInterAdapterComm)
+ defer cancel()
+ logger.Debugw(subCtx, "get-tech-profile-instance", log.Fields{"request": request, "parent-endpoint": parentEndpoint})
+ return pgClient.GetTechProfileInstance(subCtx, request)
+}
+
+func (dh *deviceHandler) sendOMCIRequest(ctx context.Context, parentEndpoint string, request *ic.OmciMessage) error {
+ pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
+ if err != nil || pgClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.MaxTimeoutInterAdapterComm)
+ defer cancel()
+ logger.Debugw(subCtx, "send-omci-request", log.Fields{"request": request, "parent-endpoint": parentEndpoint})
+ _, err = pgClient.ProxyOmciRequest(subCtx, request)
+ if err != nil {
+ logger.Errorw(ctx, "omci-failure", log.Fields{"request": request, "error": err, "request-parent": request.ParentDeviceId, "request-child": request.ChildDeviceId, "request-proxy": request.ProxyAddress})
+ }
+ return err
+}