[VOL-4293] OpenONU Adapter update for gRPC migration

Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index cfe99c9..2c5a4c8 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -25,25 +25,23 @@
 	"sync"
 	"time"
 
-	"github.com/opencord/voltha-protos/v4/go/tech_profile"
+	"github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
+	"github.com/opencord/voltha-protos/v5/go/tech_profile"
 
 	"github.com/gogo/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
 	"github.com/looplab/fsm"
 	me "github.com/opencord/omci-lib-go/generated"
-	"github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
-	"github.com/opencord/voltha-lib-go/v5/pkg/db"
-	"github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
-	flow "github.com/opencord/voltha-lib-go/v5/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	vc "github.com/opencord/voltha-protos/v4/go/common"
-	"github.com/opencord/voltha-protos/v4/go/extension"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opencord/voltha-protos/v4/go/openflow_13"
-	of "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	oop "github.com/opencord/voltha-protos/v4/go/openolt"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+	flow "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	vc "github.com/opencord/voltha-protos/v5/go/common"
+	"github.com/opencord/voltha-protos/v5/go/extension"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+	of "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	oop "github.com/opencord/voltha-protos/v5/go/openolt"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
 // Constants for timeouts
@@ -178,11 +176,11 @@
 	parentID         string
 	ponPortNumber    uint32
 
-	coreProxy    adapterif.CoreProxy
-	AdapterProxy adapterif.AdapterProxy
-	EventProxy   eventif.EventProxy
+	coreClient *vgrpc.Client
+	EventProxy eventif.EventProxy
 
 	pmConfigs *voltha.PmConfigs
+	config    *config.AdapterFlags
 
 	pOpenOnuAc      *OpenONUAC
 	pDeviceStateFsm *fsm.FSM
@@ -237,11 +235,11 @@
 }
 
 //newDeviceHandler creates a new device handler
-func newDeviceHandler(ctx context.Context, cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy, device *voltha.Device, adapter *OpenONUAC) *deviceHandler {
+func newDeviceHandler(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy, device *voltha.Device, adapter *OpenONUAC) *deviceHandler {
 	var dh deviceHandler
-	dh.coreProxy = cp
-	dh.AdapterProxy = ap
+	dh.coreClient = cc
 	dh.EventProxy = ep
+	dh.config = adapter.config
 	cloned := (proto.Clone(device)).(*voltha.Device)
 	dh.deviceID = cloned.Id
 	dh.DeviceType = cloned.Type
@@ -327,7 +325,7 @@
 
 //adoptOrReconcileDevice adopts the ONU device
 func (dh *deviceHandler) adoptOrReconcileDevice(ctx context.Context, device *voltha.Device) {
-	logger.Debugw(ctx, "Adopt_or_reconcile_device", log.Fields{"device-id": device.Id, "Address": device.GetHostAndPort()})
+	logger.Debugw(ctx, "adopt_or_reconcile_device", log.Fields{"device-id": device.Id, "Address": device.GetHostAndPort()})
 
 	logger.Debugw(ctx, "Device FSM: ", log.Fields{"state": string(dh.pDeviceStateFsm.Current())})
 	if dh.pDeviceStateFsm.Is(devStNull) {
@@ -338,7 +336,7 @@
 		// device.PmConfigs is not nil in cases when adapter restarts. We should not re-set the core again.
 		if device.PmConfigs == nil {
 			// Now, set the initial PM configuration for that device
-			if err := dh.coreProxy.DevicePMConfigUpdate(ctx, dh.pmConfigs); err != nil {
+			if err := dh.updatePMConfigInCore(ctx, dh.pmConfigs); err != nil {
 				logger.Errorw(ctx, "error updating pm config to core", log.Fields{"device-id": dh.deviceID, "err": err})
 			}
 		}
@@ -348,36 +346,26 @@
 
 }
 
-func (dh *deviceHandler) processInterAdapterOMCIReceiveMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
-	msgBody := msg.GetBody()
-	omciMsg := &ic.InterAdapterOmciMessage{}
-	if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
-		logger.Warnw(ctx, "cannot-unmarshal-omci-msg-body", log.Fields{
-			"device-id": dh.deviceID, "error": err})
-		return err
-	}
-
+func (dh *deviceHandler) handleOMCIIndication(ctx context.Context, msg *ic.OmciMessage) error {
 	/* msg print moved symmetrically to omci_cc, if wanted here as additional debug, than perhaps only based on additional debug setting!
 	//assuming omci message content is hex coded!
 	// with restricted output of 16(?) bytes would be ...omciMsg.Message[:16]
 	logger.Debugw(ctx, "inter-adapter-recv-omci", log.Fields{
 		"device-id": dh.deviceID, "RxOmciMessage": hex.EncodeToString(omciMsg.Message)})
 	*/
+
 	pDevEntry := dh.getOnuDeviceEntry(ctx, true)
 	if pDevEntry != nil {
 		if pDevEntry.PDevOmciCC != nil {
-			return pDevEntry.PDevOmciCC.receiveMessage(log.WithSpanFromContext(context.TODO(), ctx), omciMsg.Message)
+			return pDevEntry.PDevOmciCC.receiveMessage(log.WithSpanFromContext(context.TODO(), ctx), msg.Message)
 		}
-		logger.Debugw(ctx, "omciCC not ready to receive omci messages - incoming omci message ignored", log.Fields{"rxMsg": omciMsg.Message})
+		logger.Debugw(ctx, "omciCC not ready to receive omci messages - incoming omci message ignored", log.Fields{"rxMsg": msg.Message})
 	}
 	logger.Errorw(ctx, "No valid OnuDevice -aborting", log.Fields{"device-id": dh.deviceID})
 	return fmt.Errorf("no valid OnuDevice: %s", dh.deviceID)
 }
 
-func (dh *deviceHandler) processInterAdapterTechProfileDownloadReqMessage(
-	ctx context.Context,
-	msg *ic.InterAdapterMessage) error {
-
+func (dh *deviceHandler) handleTechProfileDownloadRequest(ctx context.Context, techProfMsg *ic.TechProfileDownloadMessage) error {
 	logger.Infow(ctx, "tech-profile-download-request", log.Fields{"device-id": dh.deviceID})
 
 	pDevEntry := dh.getOnuDeviceEntry(ctx, true)
@@ -400,14 +388,6 @@
 	// at least 'mib-downloaded' should be reached for processing of this specific ONU configuration
 	//  if (dh.deviceReason == "stopping-openomci") || (dh.deviceReason == "omci-admin-lock")
 
-	msgBody := msg.GetBody()
-	techProfMsg := &ic.InterAdapterTechProfileDownloadMessage{}
-	if err := ptypes.UnmarshalAny(msgBody, techProfMsg); err != nil {
-		logger.Warnw(ctx, "cannot-unmarshal-techprof-msg-body", log.Fields{
-			"device-id": dh.deviceID, "error": err})
-		return err
-	}
-
 	// we have to lock access to TechProfile processing based on different messageType calls or
 	// even to fast subsequent calls of the same messageType as well as OnuKVStore processing due
 	// to possible concurrent access by flow processing
@@ -429,7 +409,7 @@
 	if bTpModify := pDevEntry.updateOnuUniTpPath(ctx, uniID, uint8(tpID), techProfMsg.TpInstancePath); bTpModify {
 
 		switch tpInst := techProfMsg.TechTpInstance.(type) {
-		case *ic.InterAdapterTechProfileDownloadMessage_TpInstance:
+		case *ic.TechProfileDownloadMessage_TpInstance:
 			logger.Debugw(ctx, "onu-uni-tp-path-modified", log.Fields{"uniID": uniID, "tp-path": techProfMsg.TpInstancePath, "tpID": tpID})
 			//	if there has been some change for some uni TechProfilePath
 			//in order to allow concurrent calls to other dh instances we do not wait for execution here
@@ -475,9 +455,8 @@
 	return nil
 }
 
-func (dh *deviceHandler) processInterAdapterDeleteGemPortReqMessage(
-	ctx context.Context,
-	msg *ic.InterAdapterMessage) error {
+func (dh *deviceHandler) handleDeleteGemPortRequest(ctx context.Context, delGemPortMsg *ic.DeleteGemPortMessage) error {
+	logger.Infow(ctx, "delete-gem-port-request", log.Fields{"device-id": dh.deviceID})
 
 	if dh.pOnuTP == nil {
 		//should normally not happen ...
@@ -485,15 +464,6 @@
 			log.Fields{"device-id": dh.deviceID})
 		return fmt.Errorf("techProfile DelGem request while onuTechProf instance not setup: %s", dh.deviceID)
 	}
-
-	msgBody := msg.GetBody()
-	delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{}
-	if err := ptypes.UnmarshalAny(msgBody, delGemPortMsg); err != nil {
-		logger.Warnw(ctx, "cannot-unmarshal-delete-gem-msg-body", log.Fields{
-			"device-id": dh.deviceID, "error": err})
-		return err
-	}
-
 	//compare TECH_PROFILE_DOWNLOAD_REQUEST
 	dh.pOnuTP.lockTpProcMutex()
 	defer dh.pOnuTP.unlockTpProcMutex()
@@ -516,9 +486,8 @@
 
 }
 
-func (dh *deviceHandler) processInterAdapterDeleteTcontReqMessage(
-	ctx context.Context,
-	msg *ic.InterAdapterMessage) error {
+func (dh *deviceHandler) handleDeleteTcontRequest(ctx context.Context, delTcontMsg *ic.DeleteTcontMessage) error {
+	logger.Infow(ctx, "delete-tcont-request", log.Fields{"device-id": dh.deviceID})
 
 	pDevEntry := dh.getOnuDeviceEntry(ctx, true)
 	if pDevEntry == nil {
@@ -532,14 +501,6 @@
 		return fmt.Errorf("techProfile DelTcont request while onuTechProf instance not setup: %s", dh.deviceID)
 	}
 
-	msgBody := msg.GetBody()
-	delTcontMsg := &ic.InterAdapterDeleteTcontMessage{}
-	if err := ptypes.UnmarshalAny(msgBody, delTcontMsg); err != nil {
-		logger.Warnw(ctx, "cannot-unmarshal-delete-tcont-msg-body", log.Fields{
-			"device-id": dh.deviceID, "error": err})
-		return err
-	}
-
 	//compare TECH_PROFILE_DOWNLOAD_REQUEST
 	dh.pOnuTP.lockTpProcMutex()
 	defer dh.pOnuTP.unlockTpProcMutex()
@@ -616,52 +577,10 @@
 	return nil
 }
 
-//processInterAdapterMessage sends the proxied messages to the target device
-// If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
-// is meant, and then send the unmarshalled omci message to this onu
-func (dh *deviceHandler) processInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
-	msgID := msg.Header.Id
-	msgType := msg.Header.Type
-	fromTopic := msg.Header.FromTopic
-	toTopic := msg.Header.ToTopic
-	toDeviceID := msg.Header.ToDeviceId
-	proxyDeviceID := msg.Header.ProxyDeviceId
-	logger.Debugw(ctx, "InterAdapter message header", log.Fields{"msgID": msgID, "msgType": msgType,
-		"fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
-
-	switch msgType {
-	// case ic.InterAdapterMessageType_ONU_IND_REQUEST: was handled by OpenONUAC already - see comments there
-	//OMCI_RESPONSE also accepted acc. to VOL-3756 (OMCI_REQUEST request was legacy code)
-	case ic.InterAdapterMessageType_OMCI_RESPONSE, ic.InterAdapterMessageType_OMCI_REQUEST:
-		{
-			return dh.processInterAdapterOMCIReceiveMessage(ctx, msg)
-		}
-	case ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST:
-		{
-			return dh.processInterAdapterTechProfileDownloadReqMessage(ctx, msg)
-		}
-	case ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST:
-		{
-			return dh.processInterAdapterDeleteGemPortReqMessage(ctx, msg)
-
-		}
-	case ic.InterAdapterMessageType_DELETE_TCONT_REQUEST:
-		{
-			return dh.processInterAdapterDeleteTcontReqMessage(ctx, msg)
-		}
-	default:
-		{
-			logger.Errorw(ctx, "inter-adapter-unhandled-type", log.Fields{
-				"msgType": msg.Header.Type, "device-id": dh.deviceID})
-			return fmt.Errorf("inter-adapter-unhandled-type: %d, %s", msg.Header.Type, dh.deviceID)
-		}
-	}
-}
-
 //FlowUpdateIncremental removes and/or adds the flow changes on a given device
 func (dh *deviceHandler) FlowUpdateIncremental(ctx context.Context,
-	apOfFlowChanges *openflow_13.FlowChanges,
-	apOfGroupChanges *openflow_13.FlowGroupChanges, apFlowMetaData *voltha.FlowMetadata) error {
+	apOfFlowChanges *of.FlowChanges,
+	apOfGroupChanges *of.FlowGroupChanges, apFlowMetaData *voltha.FlowMetadata) error {
 	logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.deviceID, "metadata": apFlowMetaData})
 	var retError error = nil
 	//Remove flows (always remove flows first - remove old and add new with same cookie may be part of the same request)
@@ -806,8 +725,11 @@
 		} else {
 			logger.Debugw(ctx, "DeviceStateUpdate upon disable", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
 				"OperStatus": voltha.OperStatus_UNKNOWN, "device-id": dh.deviceID})
-			if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx),
-				dh.deviceID, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+			if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+				DeviceId:   dh.deviceID,
+				ConnStatus: voltha.ConnectStatus_REACHABLE,
+				OperStatus: voltha.OperStatus_UNKNOWN,
+			}); err != nil {
 				//TODO with VOL-3045/VOL-3046: return the error and stop further processing
 				logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
 			}
@@ -914,10 +836,16 @@
 		techProfsFound = true // set to true if we found TP once for any UNI port
 		for tpID := range uniData.PersTpPathMap {
 			// Request the TpInstance again from the openolt adapter in case of reconcile
-			iaTechTpInst, err := dh.AdapterProxy.TechProfileInstanceRequest(ctx, uniData.PersTpPathMap[tpID],
-				dh.device.ParentPortNo, dh.device.ProxyAddress.OnuId, uint32(uniData.PersUniID),
-				dh.pOpenOnuAc.config.Topic, dh.ProxyAddressType,
-				dh.parentID, dh.ProxyAddressID)
+			iaTechTpInst, err := dh.getTechProfileInstanceFromParentAdapter(ctx,
+				dh.device.ProxyAddress.AdapterEndpoint,
+				&ic.TechProfileInstanceRequestMessage{
+					DeviceId:       dh.device.Id,
+					TpInstancePath: uniData.PersTpPathMap[tpID],
+					ParentDeviceId: dh.parentID,
+					ParentPonPort:  dh.device.ParentPortNo,
+					OnuId:          dh.device.ProxyAddress.OnuId,
+					UniId:          uint32(uniData.PersUniID),
+				})
 			if err != nil || iaTechTpInst == nil {
 				logger.Errorw(ctx, "error fetching tp instance",
 					log.Fields{"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID, "err": err})
@@ -926,7 +854,7 @@
 			}
 			var tpInst tech_profile.TechProfileInstance
 			switch techTpInst := iaTechTpInst.TechTpInstance.(type) {
-			case *ic.InterAdapterTechProfileDownloadMessage_TpInstance: // supports only GPON, XGPON, XGS-PON
+			case *ic.TechProfileDownloadMessage_TpInstance: // supports only GPON, XGPON, XGS-PON
 				tpInst = *techTpInst.TpInstance
 				logger.Debugw(ctx, "received-tp-instance-successfully-after-reconcile", log.Fields{
 					"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID})
@@ -1152,8 +1080,11 @@
 
 	logger.Debugw(ctx, "call DeviceStateUpdate upon reboot", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
 		"OperStatus": voltha.OperStatus_DISCOVERED, "device-id": dh.deviceID})
-	if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.ConnectStatus_REACHABLE,
-		voltha.OperStatus_DISCOVERED); err != nil {
+	if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+		DeviceId:   dh.deviceID,
+		ConnStatus: voltha.ConnectStatus_REACHABLE,
+		OperStatus: voltha.OperStatus_DISCOVERED,
+	}); err != nil {
 		//TODO with VOL-3045/VOL-3046: return the error and stop further processing
 		logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
 		return
@@ -1294,8 +1225,7 @@
 	dh.lockUpgradeFsm.RLock()
 	if dh.pOnuUpradeFsm != nil {
 		dh.lockUpgradeFsm.RUnlock()
-		onuVolthaDevice, getErr := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx),
-			dh.deviceID, dh.deviceID)
+		onuVolthaDevice, getErr := dh.getDeviceFromCore(ctx, dh.deviceID)
 		if getErr != nil || onuVolthaDevice == nil {
 			logger.Errorw(ctx, "Failed to fetch Onu device for image activation", log.Fields{"device-id": dh.deviceID, "err": getErr})
 			return nil, fmt.Errorf("could not fetch device for device-id: %s", dh.deviceID)
@@ -1358,8 +1288,7 @@
 	dh.lockUpgradeFsm.RLock()
 	if dh.pOnuUpradeFsm != nil {
 		dh.lockUpgradeFsm.RUnlock()
-		onuVolthaDevice, getErr := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx),
-			dh.deviceID, dh.deviceID)
+		onuVolthaDevice, getErr := dh.getDeviceFromCore(ctx, dh.deviceID)
 		if getErr != nil || onuVolthaDevice == nil {
 			logger.Errorw(ctx, "Failed to fetch Onu device for image commitment", log.Fields{"device-id": dh.deviceID, "err": getErr})
 			return nil, fmt.Errorf("could not fetch device for device-id: %s", dh.deviceID)
@@ -1513,7 +1442,9 @@
 
 	if !dh.isReconciling() {
 		logger.Infow(ctx, "DeviceUpdate", log.Fields{"deviceReason": dh.device.Reason, "device-id": dh.deviceID})
-		_ = dh.coreProxy.DeviceUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device)
+		if err := dh.updateDeviceInCore(ctx, dh.device); err != nil {
+			logger.Errorw(ctx, "device-update-failed", log.Fields{"device-id": dh.device.Id, "error": err})
+		}
 		//TODO Need to Update Device Reason To CORE as part of device update userstory
 	} else {
 		logger.Debugw(ctx, "reconciling - don't notify core about DeviceUpdate",
@@ -1547,6 +1478,7 @@
 		}
 
 		pPonPort := &voltha.Port{
+			DeviceId:   dh.deviceID,
 			PortNo:     ponPortNo,
 			Label:      fmt.Sprintf("pon-%d", ponPortNo),
 			Type:       voltha.Port_PON_ONU,
@@ -1554,7 +1486,7 @@
 			Peers: []*voltha.Port_PeerPort{{DeviceId: dh.parentID, // Peer device  is OLT
 				PortNo: ponPortNo}}, // Peer port is parent's port number
 		}
-		if err = dh.coreProxy.PortCreated(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, pPonPort); err != nil {
+		if err = dh.createPortInCore(ctx, pPonPort); err != nil {
 			logger.Fatalf(ctx, "Device FSM: PortCreated-failed-%s", err)
 			e.Cancel(err)
 			return
@@ -1814,8 +1746,12 @@
 		}
 		logger.Debugw(ctx, "call DeviceStateUpdate upon create interface", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
 			"OperStatus": voltha.OperStatus_ACTIVATING, "device-id": dh.deviceID})
-		if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID,
-			voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVATING); err != nil {
+
+		if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+			DeviceId:   dh.deviceID,
+			OperStatus: voltha.OperStatus_ACTIVATING,
+			ConnStatus: voltha.ConnectStatus_REACHABLE,
+		}); err != nil {
 			//TODO with VOL-3045/VOL-3046: return the error and stop further processing
 			logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
 		}
@@ -2031,8 +1967,11 @@
 		}
 		logger.Debugw(ctx, "call DeviceStateUpdate upon update interface", log.Fields{"ConnectStatus": voltha.ConnectStatus_UNREACHABLE,
 			"OperStatus": voltha.OperStatus_DISCOVERED, "device-id": dh.deviceID})
-		if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID,
-			voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_DISCOVERED); err != nil {
+		if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+			DeviceId:   dh.deviceID,
+			ConnStatus: voltha.ConnectStatus_UNREACHABLE,
+			OperStatus: voltha.OperStatus_DISCOVERED,
+		}); err != nil {
 			//TODO with VOL-3045/VOL-3046: return the error and stop further processing
 			logger.Errorw(ctx, "error-updating-device-state unreachable-discovered",
 				log.Fields{"device-id": dh.deviceID, "error": err})
@@ -2208,8 +2147,11 @@
 		// in case of adapter restart connected to an ONU upgrade I would not rely on the image quality
 		// maybe some 'forced' commitment can be done in this situation from system management (or upgrade restarted)
 		dh.checkOnOnuImageCommit(ctx)
-		if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID,
-			voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE); err != nil {
+		if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+			DeviceId:   dh.deviceID,
+			ConnStatus: voltha.ConnectStatus_REACHABLE,
+			OperStatus: voltha.OperStatus_ACTIVE,
+		}); err != nil {
 			//TODO with VOL-3045/VOL-3046: return the error and stop further processing
 			logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
 		} else {
@@ -2292,8 +2234,12 @@
 func (dh *deviceHandler) processUniDisableStateDoneEvent(ctx context.Context, devEvent OnuDeviceEvent) {
 	logger.Debugw(ctx, "DeviceStateUpdate upon disable", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
 		"OperStatus": voltha.OperStatus_UNKNOWN, "device-id": dh.deviceID})
-	if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx),
-		dh.deviceID, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+
+	if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+		DeviceId:   dh.deviceID,
+		ConnStatus: voltha.ConnectStatus_REACHABLE,
+		OperStatus: voltha.OperStatus_UNKNOWN,
+	}); err != nil {
 		//TODO with VOL-3045/VOL-3046: return the error and stop further processing
 		logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
 	}
@@ -2322,8 +2268,11 @@
 func (dh *deviceHandler) processUniEnableStateDoneEvent(ctx context.Context, devEvent OnuDeviceEvent) {
 	logger.Debugw(ctx, "DeviceStateUpdate upon re-enable", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
 		"OperStatus": voltha.OperStatus_ACTIVE, "device-id": dh.deviceID})
-	if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.ConnectStatus_REACHABLE,
-		voltha.OperStatus_ACTIVE); err != nil {
+	if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+		DeviceId:   dh.deviceID,
+		ConnStatus: voltha.ConnectStatus_REACHABLE,
+		OperStatus: voltha.OperStatus_ACTIVE,
+	}); err != nil {
 		//TODO with VOL-3045/VOL-3046: return the error and stop further processing
 		logger.Errorw(ctx, "error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
 	}
@@ -2537,7 +2486,16 @@
 			uniPort.setOperState(vc.OperStatus_ACTIVE)
 			if !dh.isReconciling() {
 				//maybe also use getter functions on uniPort - perhaps later ...
-				go dh.coreProxy.PortStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.Port_ETHERNET_UNI, uniPort.portNo, uniPort.operState)
+				go func(port *onuUniPort) {
+					if err := dh.updatePortStateInCore(ctx, &ic.PortState{
+						DeviceId:   dh.deviceID,
+						PortType:   voltha.Port_ETHERNET_UNI,
+						PortNo:     port.portNo,
+						OperStatus: port.operState,
+					}); err != nil {
+						logger.Errorw(ctx, "port-state-update-failed", log.Fields{"error": err, "port-no": uniPort.portNo, "device-id": dh.deviceID})
+					}
+				}(uniPort)
 			} else {
 				logger.Debugw(ctx, "reconciling - don't notify core about PortStateUpdate", log.Fields{"device-id": dh.deviceID})
 			}
@@ -2557,7 +2515,16 @@
 			uniPort.setOperState(vc.OperStatus_UNKNOWN)
 			if !dh.isReconciling() {
 				//maybe also use getter functions on uniPort - perhaps later ...
-				go dh.coreProxy.PortStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.Port_ETHERNET_UNI, uniPort.portNo, uniPort.operState)
+				go func(port *onuUniPort) {
+					if err := dh.updatePortStateInCore(ctx, &ic.PortState{
+						DeviceId:   dh.deviceID,
+						PortType:   voltha.Port_ETHERNET_UNI,
+						PortNo:     port.portNo,
+						OperStatus: port.operState,
+					}); err != nil {
+						logger.Errorw(ctx, "port-state-update-failed", log.Fields{"error": err, "port-no": uniPort.portNo, "device-id": dh.deviceID})
+					}
+				}(uniPort)
 			} else {
 				logger.Debugw(ctx, "reconciling - don't notify core about PortStateUpdate", log.Fields{"device-id": dh.deviceID})
 			}
@@ -2573,7 +2540,7 @@
 	eventContext := make(map[string]string)
 	//Populating event context
 	//  assume giving ParentId in GetDevice twice really gives the ParentDevice (there is no GetParentDevice()...)
-	parentDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.parentID, dh.parentID)
+	parentDevice, err := dh.getDeviceFromCore(ctx, dh.parentID)
 	if err != nil || parentDevice == nil {
 		logger.Errorw(ctx, "Failed to fetch parent device for OnuEvent",
 			log.Fields{"parentID": dh.parentID, "err": err})
@@ -2853,7 +2820,7 @@
 
 	return kvbackend
 }
-func (dh *deviceHandler) getFlowOfbFields(ctx context.Context, apFlowItem *ofp.OfpFlowStats, loMatchVlan *uint16,
+func (dh *deviceHandler) getFlowOfbFields(ctx context.Context, apFlowItem *of.OfpFlowStats, loMatchVlan *uint16,
 	loAddPcp *uint8, loIPProto *uint32) {
 
 	for _, field := range flow.GetOfbFields(apFlowItem) {
@@ -2930,7 +2897,7 @@
 	} //for all OfbFields
 }
 
-func (dh *deviceHandler) getFlowActions(ctx context.Context, apFlowItem *ofp.OfpFlowStats, loSetPcp *uint8, loSetVlan *uint16) {
+func (dh *deviceHandler) getFlowActions(ctx context.Context, apFlowItem *of.OfpFlowStats, loSetPcp *uint8, loSetVlan *uint16) {
 	for _, action := range flow.GetActions(apFlowItem) {
 		switch action.Type {
 		/* not used:
@@ -2976,7 +2943,7 @@
 }
 
 //addFlowItemToUniPort parses the actual flow item to add it to the UniPort
-func (dh *deviceHandler) addFlowItemToUniPort(ctx context.Context, apFlowItem *ofp.OfpFlowStats, apUniPort *onuUniPort,
+func (dh *deviceHandler) addFlowItemToUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *onuUniPort,
 	apFlowMetaData *voltha.FlowMetadata) error {
 	var loSetVlan uint16 = uint16(of.OfpVlanId_OFPVID_NONE)      //noValidEntry
 	var loMatchVlan uint16 = uint16(of.OfpVlanId_OFPVID_PRESENT) //reserved VLANID entry
@@ -3067,7 +3034,7 @@
 }
 
 //removeFlowItemFromUniPort parses the actual flow item to remove it from the UniPort
-func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *ofp.OfpFlowStats, apUniPort *onuUniPort) error {
+func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *onuUniPort) error {
 	//optimization and assumption: the flow cookie uniquely identifies the flow and with that the internal rule
 	//hence only the cookie is used here to find the relevant flow and possibly remove the rule
 	//no extra check is done on the rule parameters
@@ -3300,7 +3267,10 @@
 	dh.setDeviceReason(deviceReason)
 	if notifyCore {
 		//TODO with VOL-3045/VOL-3046: return the error and stop further processing at calling position
-		if err := dh.coreProxy.DeviceReasonUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, deviceReasonMap[deviceReason]); err != nil {
+		if err := dh.updateDeviceReasonInCore(ctx, &ic.DeviceReason{
+			DeviceId: dh.deviceID,
+			Reason:   deviceReasonMap[deviceReason],
+		}); err != nil {
 			logger.Errorf(ctx, "DeviceReasonUpdate error: %s",
 				log.Fields{"device-id": dh.deviceID, "error": err}, deviceReasonMap[deviceReason])
 			return err
@@ -3715,7 +3685,11 @@
 					}
 					logger.Debugw(ctx, "reconciling has been finished in time",
 						log.Fields{"device-id": dh.deviceID})
-					if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.deviceID, connectStatus, operState); err != nil {
+					if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+						DeviceId:   dh.deviceID,
+						ConnStatus: connectStatus,
+						OperStatus: operState,
+					}); err != nil {
 						logger.Errorw(ctx, "unable to update device state to core",
 							log.Fields{"device-id": dh.deviceID, "Err": err})
 					}
@@ -3830,8 +3804,130 @@
 	}
 
 	logger.Debugw(ctx, "Core DeviceStateUpdate", log.Fields{"connectStatus": connectStatus, "operState": voltha.OperStatus_RECONCILING_FAILED})
-	if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.deviceID, connectStatus, voltha.OperStatus_RECONCILING_FAILED); err != nil {
+	if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+		DeviceId:   dh.deviceID,
+		ConnStatus: connectStatus,
+		OperStatus: voltha.OperStatus_RECONCILING_FAILED,
+	}); err != nil {
 		logger.Errorw(ctx, "unable to update device state to core",
 			log.Fields{"device-id": dh.deviceID, "Err": err})
 	}
 }
+
+/*
+Helper functions to communicate with Core
+*/
+
+func (dh *deviceHandler) getDeviceFromCore(ctx context.Context, deviceID string) (*voltha.Device, error) {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+	defer cancel()
+	logger.Debugw(subCtx, "get-device-from-core", log.Fields{"device-id": deviceID})
+	return cClient.GetDevice(subCtx, &vc.ID{Id: deviceID})
+}
+
+func (dh *deviceHandler) updateDeviceStateInCore(ctx context.Context, deviceStateFilter *ic.DeviceStateFilter) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+	defer cancel()
+	_, err = cClient.DeviceStateUpdate(subCtx, deviceStateFilter)
+	logger.Debugw(subCtx, "device-updated-in-core", log.Fields{"device-state": deviceStateFilter, "error": err})
+	return err
+}
+
+func (dh *deviceHandler) updatePMConfigInCore(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+	defer cancel()
+	_, err = cClient.DevicePMConfigUpdate(subCtx, pmConfigs)
+	logger.Debugw(subCtx, "pmconfig-updated-in-core", log.Fields{"pm-configs": pmConfigs, "error": err})
+	return err
+}
+
+func (dh *deviceHandler) updateDeviceInCore(ctx context.Context, device *voltha.Device) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+	defer cancel()
+	_, err = cClient.DeviceUpdate(subCtx, device)
+	logger.Debugw(subCtx, "device-updated-in-core", log.Fields{"device-id": device.Id, "error": err})
+	return err
+}
+
+func (dh *deviceHandler) createPortInCore(ctx context.Context, port *voltha.Port) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+	defer cancel()
+	_, err = cClient.PortCreated(subCtx, port)
+	logger.Debugw(subCtx, "port-created-in-core", log.Fields{"port": port, "error": err})
+	return err
+}
+
+func (dh *deviceHandler) updatePortStateInCore(ctx context.Context, portState *ic.PortState) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+	defer cancel()
+	_, err = cClient.PortStateUpdate(subCtx, portState)
+	logger.Debugw(subCtx, "port-state-updated-in-core", log.Fields{"port-state": portState, "error": err})
+	return err
+}
+
+func (dh *deviceHandler) updateDeviceReasonInCore(ctx context.Context, reason *ic.DeviceReason) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.RPCTimeout)
+	defer cancel()
+	_, err = cClient.DeviceReasonUpdate(subCtx, reason)
+	logger.Debugw(subCtx, "device-reason-updated-in-core", log.Fields{"reason": reason, "error": err})
+	return err
+}
+
+/*
+Helper functions to communicate with parent adapter
+*/
+
+func (dh *deviceHandler) getTechProfileInstanceFromParentAdapter(ctx context.Context, parentEndpoint string,
+	request *ic.TechProfileInstanceRequestMessage) (*ic.TechProfileDownloadMessage, error) {
+	pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
+	if err != nil || pgClient == nil {
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.MaxTimeoutInterAdapterComm)
+	defer cancel()
+	logger.Debugw(subCtx, "get-tech-profile-instance", log.Fields{"request": request, "parent-endpoint": parentEndpoint})
+	return pgClient.GetTechProfileInstance(subCtx, request)
+}
+
+func (dh *deviceHandler) sendOMCIRequest(ctx context.Context, parentEndpoint string, request *ic.OmciMessage) error {
+	pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
+	if err != nil || pgClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.MaxTimeoutInterAdapterComm)
+	defer cancel()
+	logger.Debugw(subCtx, "send-omci-request", log.Fields{"request": request, "parent-endpoint": parentEndpoint})
+	_, err = pgClient.ProxyOmciRequest(subCtx, request)
+	if err != nil {
+		logger.Errorw(ctx, "omci-failure", log.Fields{"request": request, "error": err, "request-parent": request.ParentDeviceId, "request-child": request.ChildDeviceId, "request-proxy": request.ProxyAddress})
+	}
+	return err
+}