[VOL-4292] OpenOLT Adapter changes for gRPC migration

Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 3dc62b4..c0e8cc8 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -30,26 +30,29 @@
 	"sync"
 	"time"
 
+	"github.com/golang/protobuf/ptypes/empty"
+	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	"github.com/opencord/voltha-protos/v5/go/adapter_services"
+
 	"github.com/cenkalti/backoff/v3"
 	"github.com/gogo/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
 	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
 	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
-	"github.com/opencord/voltha-lib-go/v6/pkg/adapters/adapterif"
-	"github.com/opencord/voltha-lib-go/v6/pkg/config"
-	"github.com/opencord/voltha-lib-go/v6/pkg/events/eventif"
-	flow_utils "github.com/opencord/voltha-lib-go/v6/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v6/pkg/log"
-	"github.com/opencord/voltha-lib-go/v6/pkg/pmmetrics"
+	"github.com/opencord/voltha-lib-go/v7/pkg/config"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+	flow_utils "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/pmmetrics"
 
+	conf "github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
-	"github.com/opencord/voltha-protos/v4/go/common"
-	"github.com/opencord/voltha-protos/v4/go/extension"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	of "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	oop "github.com/opencord/voltha-protos/v4/go/openolt"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	"github.com/opencord/voltha-protos/v5/go/extension"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+	of "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	oop "github.com/opencord/voltha-protos/v5/go/openolt"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -68,21 +71,23 @@
 
 //DeviceHandler will interact with the OLT device.
 type DeviceHandler struct {
-	cm            *config.ConfigManager
-	device        *voltha.Device
-	coreProxy     adapterif.CoreProxy
-	AdapterProxy  adapterif.AdapterProxy
-	EventProxy    eventif.EventProxy
-	openOLT       *OpenOLT
-	exitChannel   chan int
-	lockDevice    sync.RWMutex
-	Client        oop.OpenoltClient
-	transitionMap *TransitionMap
-	clientCon     *grpc.ClientConn
-	flowMgr       []*OpenOltFlowMgr
-	groupMgr      *OpenOltGroupMgr
-	eventMgr      *OpenOltEventMgr
-	resourceMgr   []*rsrcMgr.OpenOltResourceMgr
+	cm                      *config.ConfigManager
+	device                  *voltha.Device
+	cfg                     *conf.AdapterFlags
+	coreClient              *vgrpc.Client
+	childAdapterClients     map[string]*vgrpc.Client
+	lockChildAdapterClients sync.RWMutex
+	EventProxy              eventif.EventProxy
+	openOLT                 *OpenOLT
+	exitChannel             chan int
+	lockDevice              sync.RWMutex
+	Client                  oop.OpenoltClient
+	transitionMap           *TransitionMap
+	clientCon               *grpc.ClientConn
+	flowMgr                 []*OpenOltFlowMgr
+	groupMgr                *OpenOltGroupMgr
+	eventMgr                *OpenOltEventMgr
+	resourceMgr             []*rsrcMgr.OpenOltResourceMgr
 
 	deviceInfo *oop.DeviceInfo
 
@@ -112,14 +117,15 @@
 
 //OnuDevice represents ONU related info
 type OnuDevice struct {
-	deviceID      string
-	deviceType    string
-	serialNumber  string
-	onuID         uint32
-	intfID        uint32
-	proxyDeviceID string
-	losRaised     bool
-	rdiRaised     bool
+	deviceID        string
+	deviceType      string
+	serialNumber    string
+	onuID           uint32
+	intfID          uint32
+	proxyDeviceID   string
+	losRaised       bool
+	rdiRaised       bool
+	adapterEndpoint string
 }
 
 type onuIndicationMsg struct {
@@ -156,7 +162,7 @@
 }
 
 //NewOnuDevice creates a new Onu Device
-func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string, losRaised bool) *OnuDevice {
+func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string, losRaised bool, adapterEndpoint string) *OnuDevice {
 	var device OnuDevice
 	device.deviceID = devID
 	device.deviceType = deviceTp
@@ -165,15 +171,15 @@
 	device.intfID = intfID
 	device.proxyDeviceID = proxyDevID
 	device.losRaised = losRaised
+	device.adapterEndpoint = adapterEndpoint
 	return &device
 }
 
 //NewDeviceHandler creates a new device handler
-func NewDeviceHandler(cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy, device *voltha.Device, adapter *OpenOLT, cm *config.ConfigManager) *DeviceHandler {
+func NewDeviceHandler(cc *vgrpc.Client, ep eventif.EventProxy, device *voltha.Device, adapter *OpenOLT, cm *config.ConfigManager, cfg *conf.AdapterFlags) *DeviceHandler {
 	var dh DeviceHandler
 	dh.cm = cm
-	dh.coreProxy = cp
-	dh.AdapterProxy = ap
+	dh.coreClient = cc
 	dh.EventProxy = ep
 	cloned := (proto.Clone(device)).(*voltha.Device)
 	dh.device = cloned
@@ -186,6 +192,8 @@
 	dh.activePorts = sync.Map{}
 	dh.stopIndications = make(chan bool, 1)
 	dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
+	dh.childAdapterClients = make(map[string]*vgrpc.Client)
+	dh.cfg = cfg
 	// Create a slice of buffered channels for handling concurrent mcast flow/group.
 	dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
 	dh.stopMcastHandlerRoutine = make([]chan bool, MaxNumOfGroupHandlerChannels)
@@ -219,6 +227,10 @@
 	defer dh.lockDevice.Unlock()
 	logger.Debug(ctx, "stopping-device-agent")
 	dh.exitChannel <- 1
+
+	// Stop the adapter grpc clients for that parent device
+	dh.deleteAdapterClients(ctx)
+
 	logger.Debug(ctx, "device-agent-stopped")
 }
 
@@ -319,9 +331,19 @@
 		return olterrors.NewErrNotFound("port-label", log.Fields{"port-number": portNum, "port-type": portType}, err)
 	}
 
-	if port, err := dh.coreProxy.GetDevicePort(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, portNum); err == nil && port.Type == portType {
+	// Check if port exists
+	port, err := dh.getPortFromCore(ctx, &ic.PortFilter{
+		DeviceId: dh.device.Id,
+		Port:     portNum,
+	})
+	if err == nil && port.Type == portType {
 		logger.Debug(ctx, "port-already-exists-updating-oper-status-of-port")
-		if err := dh.coreProxy.PortStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, portType, portNum, operStatus); err != nil {
+		err = dh.updatePortStateInCore(ctx, &ic.PortState{
+			DeviceId:   dh.device.Id,
+			PortType:   portType,
+			PortNo:     portNum,
+			OperStatus: operStatus})
+		if err != nil {
 			return olterrors.NewErrAdapter("failed-to-update-port-state", log.Fields{
 				"device-id":   dh.device.Id,
 				"port-type":   portType,
@@ -330,9 +352,11 @@
 		}
 		return nil
 	}
+
 	// Now create Port
 	capacity := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
-	port := &voltha.Port{
+	port = &voltha.Port{
+		DeviceId:   dh.device.Id,
 		PortNo:     portNum,
 		Label:      label,
 		Type:       portType,
@@ -350,7 +374,8 @@
 	}
 	logger.Debugw(ctx, "sending-port-update-to-core", log.Fields{"port": port})
 	// Synchronous call to update device - this method is run in its own go routine
-	if err := dh.coreProxy.PortCreated(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, port); err != nil {
+	err = dh.createPortInCore(ctx, port)
+	if err != nil {
 		return olterrors.NewErrAdapter("error-creating-port", log.Fields{
 			"device-id": dh.device.Id,
 			"port-type": portType}, err)
@@ -360,7 +385,7 @@
 }
 
 func (dh *DeviceHandler) updateLocalDevice(ctx context.Context) {
-	device, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, dh.device.Id)
+	device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
 	if err != nil || device == nil {
 		logger.Errorf(ctx, "device-not-found", log.Fields{"device-id": dh.device.Id}, err)
 		return
@@ -658,9 +683,12 @@
 	}
 
 	// Synchronous call to update device state - this method is run in its own go routine
-	if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
-		voltha.OperStatus_ACTIVE); err != nil {
-		return olterrors.NewErrAdapter("device-update-failed", log.Fields{"device-id": dh.device.Id}, err)
+	if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+		DeviceId:   dh.device.Id,
+		OperStatus: voltha.OperStatus_ACTIVE,
+		ConnStatus: voltha.ConnectStatus_REACHABLE,
+	}); err != nil {
+		return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err)
 	}
 
 	//Clear olt communication failure event
@@ -688,7 +716,7 @@
 func (dh *DeviceHandler) doStateDown(ctx context.Context) error {
 	logger.Debugw(ctx, "do-state-down-start", log.Fields{"device-id": dh.device.Id})
 
-	device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
+	device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
 	if err != nil || device == nil {
 		/*TODO: needs to handle error scenarios */
 		return olterrors.NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err)
@@ -702,12 +730,16 @@
 	dh.device = cloned
 	dh.lockDevice.Unlock()
 
-	if err = dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+	if err = dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+		DeviceId:   cloned.Id,
+		OperStatus: cloned.OperStatus,
+		ConnStatus: cloned.ConnectStatus,
+	}); err != nil {
 		return olterrors.NewErrAdapter("state-update-failed", log.Fields{"device-id": device.Id}, err)
 	}
 
 	//get the child device for the parent device
-	onuDevices, err := dh.coreProxy.GetChildDevices(ctx, dh.device.Id)
+	onuDevices, err := dh.getChildDevicesFromCore(ctx, dh.device.Id)
 	if err != nil {
 		return olterrors.NewErrAdapter("child-device-fetch-failed", log.Fields{"device-id": dh.device.Id}, err)
 	}
@@ -715,11 +747,20 @@
 		// Update onu state as down in onu adapter
 		onuInd := oop.OnuIndication{}
 		onuInd.OperState = "down"
-		err := dh.AdapterProxy.SendInterAdapterMessage(ctx, &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
-			dh.openOLT.config.Topic, onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+
+		ogClient, err := dh.getChildAdapterServiceClient(onuDevice.AdapterEndpoint)
+		if err != nil {
+			return err
+		}
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		_, err = ogClient.OnuIndication(subCtx, &ic.OnuIndicationMessage{
+			DeviceId:      onuDevice.Id,
+			OnuIndication: &onuInd,
+		})
+		cancel()
 		if err != nil {
 			_ = olterrors.NewErrCommunication("inter-adapter-send-failed", log.Fields{
-				"source":        dh.openOLT.config.Topic,
+				"source":        dh.openOLT.config.AdapterEndpoint,
 				"onu-indicator": onuInd,
 				"device-type":   onuDevice.Type,
 				"device-id":     onuDevice.Id}, err).LogAt(log.ErrorLevel)
@@ -783,7 +824,7 @@
 	logger.Debugw(ctx, "olt-device-connected", log.Fields{"device-id": dh.device.Id})
 
 	// Case where OLT is disabled and then rebooted.
-	device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
+	device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
 	if err != nil || device == nil {
 		/*TODO: needs to handle error scenarios */
 		return olterrors.NewErrAdapter("device-fetch-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
@@ -795,7 +836,12 @@
 		cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
 		cloned.OperStatus = voltha.OperStatus_UNKNOWN
 		dh.device = cloned
-		if err = dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+
+		if err = dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+			DeviceId:   cloned.Id,
+			OperStatus: cloned.OperStatus,
+			ConnStatus: cloned.ConnectStatus,
+		}); err != nil {
 			return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
 
@@ -824,13 +870,13 @@
 		return nil
 	}
 
-	ports, err := dh.coreProxy.ListDevicePorts(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id)
+	ports, err := dh.listDevicePortsFromCore(ctx, dh.device.Id)
 	if err != nil {
 		/*TODO: needs to handle error scenarios */
 		return olterrors.NewErrAdapter("fetch-ports-failed", log.Fields{"device-id": dh.device.Id}, err)
 	}
-	dh.populateActivePorts(ctx, ports)
-	if err := dh.disableAdminDownPorts(ctx, ports); err != nil {
+	dh.populateActivePorts(ctx, ports.Items)
+	if err := dh.disableAdminDownPorts(ctx, ports.Items); err != nil {
 		return olterrors.NewErrAdapter("port-status-update-failed", log.Fields{"ports": ports}, err)
 	}
 
@@ -932,7 +978,7 @@
 	}
 
 	// Synchronous call to update device - this method is run in its own go routine
-	if err := dh.coreProxy.DeviceUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device); err != nil {
+	if err = dh.updateDeviceInCore(ctx, dh.device); err != nil {
 		return nil, olterrors.NewErrAdapter("device-update-failed", log.Fields{"device-id": dh.device.Id}, err)
 	}
 
@@ -948,12 +994,12 @@
 			return
 		case <-time.After(time.Duration(dh.metrics.ToPmConfigs().DefaultFreq) * time.Second):
 
-			ports, err := dh.coreProxy.ListDevicePorts(log.WithSpanFromContext(context.Background(), ctx), dh.device.Id)
+			ports, err := dh.listDevicePortsFromCore(ctx, dh.device.Id)
 			if err != nil {
 				logger.Warnw(ctx, "failed-to-list-ports", log.Fields{"device-id": dh.device.Id, "err": err})
 				continue
 			}
-			for _, port := range ports {
+			for _, port := range ports.Items {
 				// NNI Stats
 				if port.Type == voltha.Port_ETHERNET_NNI {
 					intfID := PortNoToIntfID(port.PortNo, voltha.Port_ETHERNET_NNI)
@@ -989,7 +1035,14 @@
 	dh.transitionMap.Handle(ctx, DeviceInit)
 
 	// Now, set the initial PM configuration for that device
-	if err := dh.coreProxy.DevicePMConfigUpdate(ctx, dh.metrics.ToPmConfigs()); err != nil {
+	cgClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil {
+		logger.Errorw(ctx, "no-core-connection", log.Fields{"device-id": dh.device.Id, "error": err})
+		return
+	}
+
+	// Now, set the initial PM configuration for that device
+	if _, err := cgClient.DevicePMConfigUpdate(ctx, dh.metrics.ToPmConfigs()); err != nil {
 		_ = olterrors.NewErrAdapter("error-updating-performance-metrics", log.Fields{"device-id": device.Id}, err).LogAt(log.ErrorLevel)
 	}
 }
@@ -1014,20 +1067,21 @@
 	}, nil
 }
 
-// GetInterAdapterTechProfileDownloadMessage fetches the TechProfileDownloadMessage for the caller.
-func (dh *DeviceHandler) GetInterAdapterTechProfileDownloadMessage(ctx context.Context, tpPath string, ponPortNum uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
-	ifID, err := IntfIDFromPonPortNum(ctx, ponPortNum)
+// GetTechProfileDownloadMessage fetches the TechProfileDownloadMessage for the caller.
+func (dh *DeviceHandler) GetTechProfileDownloadMessage(ctx context.Context, request *ic.TechProfileInstanceRequestMessage) (*ic.TechProfileDownloadMessage, error) {
+	ifID, err := IntfIDFromPonPortNum(ctx, request.ParentPonPort)
 	if err != nil {
-		return nil
+		return nil, err
 	}
-	return dh.flowMgr[ifID].getTechProfileDownloadMessage(ctx, tpPath, ifID, onuID, uniID)
+	return dh.flowMgr[ifID].getTechProfileDownloadMessage(ctx, request.TpInstancePath, request.OnuId, request.DeviceId)
 }
 
 func (dh *DeviceHandler) omciIndication(ctx context.Context, omciInd *oop.OmciIndication) error {
-	logger.Debugw(ctx, "omci-indication", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
+	logger.Debugw(ctx, "omci-indication", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "parent-device-id": dh.device.Id})
 	var deviceType string
 	var deviceID string
 	var proxyDeviceID string
+	var childAdapterEndpoint string
 
 	transid := extractOmciTransactionID(omciInd.Pkt)
 	if logger.V(log.DebugLevel) {
@@ -1041,11 +1095,12 @@
 
 		logger.Debugw(ctx, "omci-indication-for-a-device-not-in-cache.", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
 		ponPort := IntfIDToPortNo(omciInd.GetIntfId(), voltha.Port_PON_OLT)
-		kwargs := make(map[string]interface{})
-		kwargs["onu_id"] = omciInd.OnuId
-		kwargs["parent_port_no"] = ponPort
 
-		onuDevice, err := dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
+		onuDevice, err := dh.getChildDeviceFromCore(ctx, &ic.ChildDeviceFilter{
+			ParentId:     dh.device.Id,
+			OnuId:        omciInd.OnuId,
+			ParentPortNo: ponPort,
+		})
 		if err != nil {
 			return olterrors.NewErrNotFound("onu", log.Fields{
 				"intf-id": omciInd.IntfId,
@@ -1054,81 +1109,73 @@
 		deviceType = onuDevice.Type
 		deviceID = onuDevice.Id
 		proxyDeviceID = onuDevice.ProxyAddress.DeviceId
+		childAdapterEndpoint = onuDevice.AdapterEndpoint
 		//if not exist in cache, then add to cache.
-		dh.onus.Store(onuKey, NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID, false))
+		dh.onus.Store(onuKey, NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID, false, onuDevice.AdapterEndpoint))
 	} else {
 		//found in cache
 		logger.Debugw(ctx, "omci-indication-for-a-device-in-cache.", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
 		deviceType = onuInCache.(*OnuDevice).deviceType
 		deviceID = onuInCache.(*OnuDevice).deviceID
 		proxyDeviceID = onuInCache.(*OnuDevice).proxyDeviceID
+		childAdapterEndpoint = onuInCache.(*OnuDevice).adapterEndpoint
 	}
 
-	omciMsg := &ic.InterAdapterOmciMessage{Message: omciInd.Pkt}
-	if err := dh.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx), omciMsg,
-		ic.InterAdapterMessageType_OMCI_RESPONSE, dh.openOLT.config.Topic, deviceType,
-		deviceID, proxyDeviceID, ""); err != nil {
+	if err := dh.sendOmciIndicationToChildAdapter(ctx, childAdapterEndpoint, &ic.OmciMessage{
+		ParentDeviceId: proxyDeviceID,
+		ChildDeviceId:  deviceID,
+		Message:        omciInd.Pkt,
+	}); err != nil {
 		return olterrors.NewErrCommunication("omci-request", log.Fields{
-			"source":          dh.openOLT.config.Topic,
-			"destination":     deviceType,
+			"source":          dh.openOLT.config.AdapterEndpoint,
+			"device-type":     deviceType,
+			"destination":     childAdapterEndpoint,
 			"onu-id":          deviceID,
 			"proxy-device-id": proxyDeviceID}, err)
 	}
 	return nil
 }
 
-//ProcessInterAdapterMessage sends the proxied messages to the target device
-// If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
-// is meant, and then send the unmarshalled omci message to this onu
-func (dh *DeviceHandler) ProcessInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
-	logger.Debugw(ctx, "process-inter-adapter-message", log.Fields{"msgID": msg.Header.Id})
-	if msg.Header.Type == ic.InterAdapterMessageType_OMCI_REQUEST {
-		return dh.handleInterAdapterOmciMsg(ctx, msg)
-	}
-	return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
-}
+// //ProcessInterAdapterMessage sends the proxied messages to the target device
+// // If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
+// // is meant, and then send the unmarshalled omci message to this onu
+// func (dh *DeviceHandler) ProcessInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
+// 	logger.Debugw(ctx, "process-inter-adapter-message", log.Fields{"msgID": msg.Header.Id})
+// 	if msg.Header.Type == ic.InterAdapterMessageType_OMCI_REQUEST {
+// 		return dh.handleInterAdapterOmciMsg(ctx, msg)
+// 	}
+// 	return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
+// }
 
-func (dh *DeviceHandler) handleInterAdapterOmciMsg(ctx context.Context, msg *ic.InterAdapterMessage) error {
-	msgID := msg.Header.Id
-	fromTopic := msg.Header.FromTopic
-	toTopic := msg.Header.ToTopic
-	toDeviceID := msg.Header.ToDeviceId
-	proxyDeviceID := msg.Header.ProxyDeviceId
-
-	logger.Debugw(ctx, "omci-request-message-header", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
-
-	msgBody := msg.GetBody()
-
-	omciMsg := &ic.InterAdapterOmciMessage{}
-	if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
-		return olterrors.NewErrAdapter("cannot-unmarshal-omci-msg-body", log.Fields{"msgbody": msgBody}, err)
-	}
+// ProxyOmciMessage sends the proxied OMCI message to the target device
+func (dh *DeviceHandler) ProxyOmciMessage(ctx context.Context, omciMsg *ic.OmciMessage) error {
+	logger.Debugw(ctx, "proxy-omci-message", log.Fields{"parent-device-id": omciMsg.ParentDeviceId, "child-device-id": omciMsg.ChildDeviceId, "proxy-address": omciMsg.ProxyAddress, "connect-status": omciMsg.ConnectStatus})
 
 	if omciMsg.GetProxyAddress() == nil {
-		onuDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, toDeviceID)
+		onuDevice, err := dh.getDeviceFromCore(ctx, omciMsg.ChildDeviceId)
 		if err != nil {
 			return olterrors.NewErrNotFound("onu", log.Fields{
-				"device-id":     dh.device.Id,
-				"onu-device-id": toDeviceID}, err)
+				"parent-device-id": dh.device.Id,
+				"child-device-id":  omciMsg.ChildDeviceId}, err)
 		}
-		logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
-		if err := dh.sendProxiedOmciMessage(ctx, onuDevice, omciMsg); err != nil {
+		logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"onu-device-proxy-address": onuDevice.ProxyAddress})
+		if err := dh.sendProxiedMessage(log.WithSpanFromContext(context.Background(), ctx), onuDevice, omciMsg); err != nil {
 			return olterrors.NewErrCommunication("send-failed", log.Fields{
-				"device-id":     dh.device.Id,
-				"onu-device-id": toDeviceID}, err)
+				"parent-device-id": dh.device.Id,
+				"child-device-id":  omciMsg.ChildDeviceId}, err)
 		}
 	} else {
-		logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
-		if err := dh.sendProxiedOmciMessage(ctx, nil, omciMsg); err != nil {
+		logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"onu-device-proxy-address": omciMsg.ProxyAddress})
+		if err := dh.sendProxiedMessage(log.WithSpanFromContext(context.Background(), ctx), nil, omciMsg); err != nil {
 			return olterrors.NewErrCommunication("send-failed", log.Fields{
-				"device-id":     dh.device.Id,
-				"onu-device-id": toDeviceID}, err)
+				"parent-device-id": dh.device.Id,
+				"child-device-id":  omciMsg.ChildDeviceId}, err)
 		}
 	}
 	return nil
 }
 
-func (dh *DeviceHandler) sendProxiedOmciMessage(ctx context.Context, onuDevice *voltha.Device, omciMsg *ic.InterAdapterOmciMessage) error {
+func (dh *DeviceHandler) sendProxiedMessage(ctx context.Context, onuDevice *voltha.Device, omciMsg *ic.OmciMessage) error {
 	var intfID uint32
 	var onuID uint32
 	var connectStatus common.ConnectStatus_Types
@@ -1200,13 +1247,6 @@
 	sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
 	logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
 
-	kwargs := make(map[string]interface{})
-	if sn != "" {
-		kwargs["serial_number"] = sn
-	} else {
-		return olterrors.NewErrInvalidValue(log.Fields{"serial-number": sn}, nil)
-	}
-
 	var alarmInd oop.OnuAlarmIndication
 	raisedTs := time.Now().Unix()
 	if _, loaded := dh.discOnus.LoadOrStore(sn, true); loaded {
@@ -1244,7 +1284,10 @@
 
 	// check the ONU is already know to the OLT
 	// NOTE the second time the ONU is discovered this should return a device
-	onuDevice, err := dh.coreProxy.GetChildDevice(ctx, dh.device.Id, kwargs)
+	onuDevice, err := dh.getChildDeviceFromCore(ctx, &ic.ChildDeviceFilter{
+		ParentId:     dh.device.Id,
+		SerialNumber: sn,
+	})
 
 	if err != nil {
 		logger.Debugw(ctx, "core-proxy-get-child-device-failed", log.Fields{"parentDevice": dh.device.Id, "err": err, "sn": sn})
@@ -1280,8 +1323,14 @@
 				"serial-number": sn}, err)
 		}
 
-		if onuDevice, err = dh.coreProxy.ChildDeviceDetected(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, int(parentPortNo),
-			"", int(channelID), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuID)); err != nil {
+		if onuDevice, err = dh.sendChildDeviceDetectedToCore(ctx, &ic.DeviceDiscovery{
+			ParentId:     dh.device.Id,
+			ParentPortNo: parentPortNo,
+			ChannelId:    channelID,
+			VendorId:     string(onuDiscInd.SerialNumber.GetVendorId()),
+			SerialNumber: sn,
+			OnuId:        onuID,
+		}); err != nil {
 			dh.discOnus.Delete(sn)
 			dh.resourceMgr[ponintfid].FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
 			return olterrors.NewErrAdapter("core-proxy-child-device-detected-failed", log.Fields{
@@ -1298,6 +1347,14 @@
 				"device-id": dh.device.Id})
 	}
 
+	// Setup the gRPC connection to the adapter responsible for that onuDevice, if not setup yet
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	err = dh.setupChildInterAdapterClient(subCtx, onuDevice.AdapterEndpoint)
+	cancel()
+	if err != nil {
+		return olterrors.NewErrCommunication("no-connection-to-child-adapter", log.Fields{"device-id": onuDevice.Id}, err)
+	}
+
 	// we can now use the existing ONU Id
 	onuID = onuDevice.ProxyAddress.OnuId
 	//Insert the ONU into cache to use in OnuIndication.
@@ -1308,17 +1365,23 @@
 			"sn":     sn})
 	onuKey := dh.formOnuKey(onuDiscInd.GetIntfId(), onuID)
 
-	onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false)
+	onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false, onuDevice.AdapterEndpoint)
 	dh.onus.Store(onuKey, onuDev)
 	logger.Debugw(ctx, "new-onu-device-discovered",
 		log.Fields{"onu": onuDev,
 			"sn": sn})
 
-	if err := dh.coreProxy.DeviceStateUpdate(ctx, onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED); err != nil {
+	if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+		DeviceId:       onuDevice.Id,
+		ParentDeviceId: dh.device.Id,
+		OperStatus:     common.OperStatus_DISCOVERED,
+		ConnStatus:     common.ConnectStatus_REACHABLE,
+	}); err != nil {
 		return olterrors.NewErrAdapter("failed-to-update-device-state", log.Fields{
 			"device-id":     onuDevice.Id,
 			"serial-number": sn}, err)
 	}
+
 	logger.Infow(ctx, "onu-discovered-reachable", log.Fields{"device-id": onuDevice.Id, "sn": sn})
 	if err := dh.activateONU(ctx, onuDiscInd.IntfId, int64(onuID), onuDiscInd.SerialNumber, sn); err != nil {
 		return olterrors.NewErrAdapter("onu-activation-failed", log.Fields{
@@ -1330,7 +1393,6 @@
 
 func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication) error {
 
-	kwargs := make(map[string]interface{})
 	ponPort := IntfIDToPortNo(onuInd.GetIntfId(), voltha.Port_PON_OLT)
 	var onuDevice *voltha.Device
 	var err error
@@ -1349,19 +1411,21 @@
 		//If ONU id is discovered before then use GetDevice to get onuDevice because it is cheaper.
 		foundInCache = true
 		errFields["onu-id"] = onuInCache.(*OnuDevice).deviceID
-		onuDevice, err = dh.coreProxy.GetDevice(ctx, dh.device.Id, onuInCache.(*OnuDevice).deviceID)
+		onuDevice, err = dh.getDeviceFromCore(ctx, onuInCache.(*OnuDevice).deviceID)
 	} else {
 		//If ONU not found in adapter cache then we have to use GetChildDevice to get onuDevice
 		if serialNumber != "" {
-			kwargs["serial_number"] = serialNumber
 			errFields["serial-number"] = serialNumber
 		} else {
-			kwargs["onu_id"] = onuInd.OnuId
-			kwargs["parent_port_no"] = ponPort
 			errFields["onu-id"] = onuInd.OnuId
 			errFields["parent-port-no"] = ponPort
 		}
-		onuDevice, err = dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
+		onuDevice, err = dh.getChildDeviceFromCore(ctx, &ic.ChildDeviceFilter{
+			ParentId:     dh.device.Id,
+			SerialNumber: serialNumber,
+			OnuId:        onuInd.OnuId,
+			ParentPortNo: ponPort,
+		})
 	}
 
 	if err != nil || onuDevice == nil {
@@ -1383,7 +1447,7 @@
 	if !foundInCache {
 		onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.GetOnuId())
 
-		dh.onus.Store(onuKey, NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false))
+		dh.onus.Store(onuKey, NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false, onuDevice.AdapterEndpoint))
 
 	}
 	if onuInd.OperState == "down" && onuInd.FailReason != oop.OnuIndication_ONU_ACTIVATION_FAIL_REASON_NONE {
@@ -1412,27 +1476,17 @@
 	}
 
 	switch onuInd.OperState {
-	case "down":
+	case "up", "down":
 		logger.Debugw(ctx, "sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "device-id": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
-		// TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
-		err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
-			dh.openOLT.config.Topic, onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+
+		err := dh.sendOnuIndicationToChildAdapter(ctx, onuDevice.AdapterEndpoint, &ic.OnuIndicationMessage{
+			DeviceId:      onuDevice.Id,
+			OnuIndication: onuInd,
+		})
 		if err != nil {
 			return olterrors.NewErrCommunication("inter-adapter-send-failed", log.Fields{
 				"onu-indicator": onuInd,
-				"source":        dh.openOLT.config.Topic,
-				"device-type":   onuDevice.Type,
-				"device-id":     onuDevice.Id}, err)
-		}
-	case "up":
-		logger.Debugw(ctx, "sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "device-id": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
-		// TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
-		err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
-			dh.openOLT.config.Topic, onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
-		if err != nil {
-			return olterrors.NewErrCommunication("inter-adapter-send-failed", log.Fields{
-				"onu-indicator": onuInd,
-				"source":        dh.openOLT.config.Topic,
+				"source":        dh.openOLT.config.AdapterEndpoint,
 				"device-type":   onuDevice.Type,
 				"device-id":     onuDevice.Id}, err)
 		}
@@ -1485,10 +1539,13 @@
 		log.Fields{"pon-port": parentPort,
 			"onu-id":    onuID,
 			"device-id": dh.device.Id})
-	kwargs := make(map[string]interface{})
-	kwargs["onu_id"] = onuID
-	kwargs["parent_port_no"] = parentPort
-	onuDevice, err := dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
+
+	onuDevice, err := dh.getChildDeviceFromCore(ctx, &ic.ChildDeviceFilter{
+		ParentId:     dh.device.Id,
+		OnuId:        onuID,
+		ParentPortNo: parentPort,
+	})
+
 	if err != nil {
 		return nil, olterrors.NewErrNotFound("onu-device", log.Fields{
 			"intf-id": parentPort,
@@ -1509,7 +1566,12 @@
 			"device-id": dh.device.Id,
 		})
 	}
-	if err := dh.coreProxy.SendPacketIn(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, logicalPort, packetPayload); err != nil {
+
+	if err := dh.sendPacketToCore(ctx, &ic.PacketIn{
+		DeviceId: dh.device.Id,
+		Port:     logicalPort,
+		Packet:   packetPayload,
+	}); err != nil {
 		return olterrors.NewErrCommunication("packet-send-failed", log.Fields{
 			"source":       "adapter",
 			"destination":  "core",
@@ -1646,8 +1708,13 @@
 	cloned := proto.Clone(device).(*voltha.Device)
 	//Update device Admin state
 	dh.device = cloned
+
 	// Update the all pon ports state on that device to disable and NNI remains active as NNI remains active in openolt agent.
-	if err := dh.coreProxy.PortsStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), cloned.Id, ^uint32(1<<voltha.Port_PON_OLT), voltha.OperStatus_UNKNOWN); err != nil {
+	if err := dh.updatePortsStateInCore(ctx, &ic.PortStateFilter{
+		DeviceId:       cloned.Id,
+		PortTypeFilter: ^uint32(1 << voltha.Port_PON_OLT),
+		OperStatus:     voltha.OperStatus_UNKNOWN,
+	}); err != nil {
 		return olterrors.NewErrAdapter("ports-state-update-failed", log.Fields{"device-id": device.Id}, err)
 	}
 	logger.Debugw(ctx, "disable-device-end", log.Fields{"device-id": device.Id})
@@ -1658,18 +1725,21 @@
 	// Update onu state as unreachable in onu adapter
 	onuInd := oop.OnuIndication{}
 	onuInd.OperState = state
+
 	//get the child device for the parent device
-	onuDevices, err := dh.coreProxy.GetChildDevices(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id)
+	onuDevices, err := dh.getChildDevicesFromCore(ctx, dh.device.Id)
 	if err != nil {
 		logger.Errorw(ctx, "failed-to-get-child-devices-information", log.Fields{"device-id": dh.device.Id, "err": err})
 	}
 	if onuDevices != nil {
 		for _, onuDevice := range onuDevices.Items {
-			err := dh.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.TODO(), ctx), &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
-				dh.openOLT.config.Topic, onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+			err := dh.sendOnuIndicationToChildAdapter(ctx, onuDevice.AdapterEndpoint, &ic.OnuIndicationMessage{
+				DeviceId:      onuDevice.Id,
+				OnuIndication: &onuInd,
+			})
 			if err != nil {
 				logger.Errorw(ctx, "failed-to-send-inter-adapter-message", log.Fields{"OnuInd": onuInd,
-					"From Adapter": dh.openOLT.config.Topic, "DeviceType": onuDevice.Type, "device-id": onuDevice.Id})
+					"From Adapter": dh.openOLT.config.AdapterEndpoint, "DeviceType": onuDevice.Type, "device-id": onuDevice.Id})
 			}
 
 		}
@@ -1691,19 +1761,22 @@
 	logger.Debug(ctx, "olt-reenabled")
 
 	// Update the all ports state on that device to enable
-
-	ports, err := dh.coreProxy.ListDevicePorts(ctx, device.Id)
+	ports, err := dh.listDevicePortsFromCore(ctx, device.Id)
 	if err != nil {
 		return olterrors.NewErrAdapter("list-ports-failed", log.Fields{"device-id": device.Id}, err)
 	}
-	if err := dh.disableAdminDownPorts(ctx, ports); err != nil {
+	if err := dh.disableAdminDownPorts(ctx, ports.Items); err != nil {
 		return olterrors.NewErrAdapter("port-status-update-failed-after-olt-reenable", log.Fields{"device": device}, err)
 	}
 	//Update the device oper status as ACTIVE
 	device.OperStatus = voltha.OperStatus_ACTIVE
 	dh.device = device
 
-	if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), device.Id, device.ConnectStatus, device.OperStatus); err != nil {
+	if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+		DeviceId:   device.Id,
+		OperStatus: device.OperStatus,
+		ConnStatus: device.ConnectStatus,
+	}); err != nil {
 		return olterrors.NewErrAdapter("state-update-failed", log.Fields{
 			"device-id":      device.Id,
 			"connect-status": device.ConnectStatus,
@@ -1858,7 +1931,11 @@
 		})
 	}
 
-	if err := dh.coreProxy.SendPacketIn(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, logicalPortNum, packetIn.Pkt); err != nil {
+	if err := dh.sendPacketToCore(ctx, &ic.PacketIn{
+		DeviceId: dh.device.Id,
+		Port:     logicalPortNum,
+		Packet:   packetIn.Pkt,
+	}); err != nil {
 		return olterrors.NewErrCommunication("send-packet-in", log.Fields{
 			"destination": "core",
 			"source":      dh.device.Type,
@@ -1877,7 +1954,7 @@
 }
 
 // PacketOut sends packet-out from VOLTHA to OLT on the egress port provided
-func (dh *DeviceHandler) PacketOut(ctx context.Context, egressPortNo int, packet *of.OfpPacketOut) error {
+func (dh *DeviceHandler) PacketOut(ctx context.Context, egressPortNo uint32, packet *of.OfpPacketOut) error {
 	if logger.V(log.DebugLevel) {
 		logger.Debugw(ctx, "incoming-packet-out", log.Fields{
 			"device-id":      dh.device.Id,
@@ -2033,7 +2110,7 @@
 }
 
 func (dh *DeviceHandler) updateStateUnreachable(ctx context.Context) {
-	device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
+	device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
 	if err != nil || device == nil {
 		// One case where we have seen core returning an error for GetDevice call is after OLT device delete.
 		// After OLT delete, the adapter asks for OLT to reboot. When OLT is rebooted, shortly we loose heartbeat.
@@ -2048,18 +2125,29 @@
 	logger.Debugw(ctx, "update-state-unreachable", log.Fields{"device-id": dh.device.Id, "connect-status": device.ConnectStatus,
 		"admin-state": device.AdminState, "oper-status": device.OperStatus})
 	if device.ConnectStatus == voltha.ConnectStatus_REACHABLE {
-		if err = dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+		if err = dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+			DeviceId:   dh.device.Id,
+			OperStatus: voltha.OperStatus_UNKNOWN,
+			ConnStatus: voltha.ConnectStatus_UNREACHABLE,
+		}); err != nil {
 			_ = olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
-		if err = dh.coreProxy.PortsStateUpdate(ctx, dh.device.Id, 0, voltha.OperStatus_UNKNOWN); err != nil {
+
+		if err = dh.updatePortsStateInCore(ctx, &ic.PortStateFilter{
+			DeviceId:       dh.device.Id,
+			PortTypeFilter: 0,
+			OperStatus:     voltha.OperStatus_UNKNOWN,
+		}); err != nil {
 			_ = olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
 		}
 
 		//raise olt communication failure event
 		raisedTs := time.Now().Unix()
-		device.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
-		device.OperStatus = voltha.OperStatus_UNKNOWN
-		go dh.eventMgr.oltCommunicationEvent(ctx, device, raisedTs)
+		cloned := proto.Clone(device).(*voltha.Device)
+		cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+		cloned.OperStatus = voltha.OperStatus_UNKNOWN
+		dh.device = cloned // update local copy of the device
+		go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
 
 		dh.cleanupDeviceResources(ctx)
 		// Stop the Stats collector
@@ -2141,7 +2229,12 @@
 		dh.activePorts.Store(ponID, false)
 		logger.Infow(ctx, "disabled-pon-port", log.Fields{"out": out, "device-id": dh.device, "Port": port})
 	}
-	if err := dh.coreProxy.PortStateUpdate(ctx, dh.device.Id, voltha.Port_PON_OLT, port.PortNo, operStatus); err != nil {
+	if err := dh.updatePortStateInCore(ctx, &ic.PortState{
+		DeviceId:   dh.device.Id,
+		PortType:   voltha.Port_PON_OLT,
+		PortNo:     port.PortNo,
+		OperStatus: operStatus,
+	}); err != nil {
 		return olterrors.NewErrAdapter("port-state-update-failed", log.Fields{
 			"device-id": dh.device.Id,
 			"port":      port.PortNo}, err)
@@ -2695,3 +2788,303 @@
 
 	return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_REASON_UNDEFINED)
 }
+
+/*
+Helper functions to communicate with Core
+*/
+
+func (dh *DeviceHandler) getDeviceFromCore(ctx context.Context, deviceID string) (*voltha.Device, error) {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	return cClient.GetDevice(subCtx, &common.ID{Id: deviceID})
+}
+
+func (dh *DeviceHandler) getChildDeviceFromCore(ctx context.Context, childDeviceFilter *ic.ChildDeviceFilter) (*voltha.Device, error) {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	return cClient.GetChildDevice(subCtx, childDeviceFilter)
+}
+
+func (dh *DeviceHandler) updateDeviceStateInCore(ctx context.Context, deviceStateFilter *ic.DeviceStateFilter) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = cClient.DeviceStateUpdate(subCtx, deviceStateFilter)
+	return err
+}
+
+func (dh *DeviceHandler) getChildDevicesFromCore(ctx context.Context, deviceID string) (*voltha.Devices, error) {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	return cClient.GetChildDevices(subCtx, &common.ID{Id: deviceID})
+}
+
+func (dh *DeviceHandler) listDevicePortsFromCore(ctx context.Context, deviceID string) (*voltha.Ports, error) {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	return cClient.ListDevicePorts(subCtx, &common.ID{Id: deviceID})
+}
+
+func (dh *DeviceHandler) updateDeviceInCore(ctx context.Context, device *voltha.Device) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = cClient.DeviceUpdate(subCtx, device)
+	return err
+}
+
+func (dh *DeviceHandler) sendChildDeviceDetectedToCore(ctx context.Context, deviceDiscoveryInfo *ic.DeviceDiscovery) (*voltha.Device, error) {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	return cClient.ChildDeviceDetected(subCtx, deviceDiscoveryInfo)
+}
+
+func (dh *DeviceHandler) sendPacketToCore(ctx context.Context, pkt *ic.PacketIn) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = cClient.SendPacketIn(subCtx, pkt)
+	return err
+}
+
+func (dh *DeviceHandler) createPortInCore(ctx context.Context, port *voltha.Port) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = cClient.PortCreated(subCtx, port)
+	return err
+}
+
+func (dh *DeviceHandler) updatePortsStateInCore(ctx context.Context, portFilter *ic.PortStateFilter) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = cClient.PortsStateUpdate(subCtx, portFilter)
+	return err
+}
+
+func (dh *DeviceHandler) updatePortStateInCore(ctx context.Context, portState *ic.PortState) error {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = cClient.PortStateUpdate(subCtx, portState)
+	return err
+}
+
+func (dh *DeviceHandler) getPortFromCore(ctx context.Context, portFilter *ic.PortFilter) (*voltha.Port, error) {
+	cClient, err := dh.coreClient.GetCoreServiceClient()
+	if err != nil || cClient == nil {
+		return nil, err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	return cClient.GetDevicePort(subCtx, portFilter)
+}
+
+/*
+Helper functions to communicate with child adapter
+*/
+
+func (dh *DeviceHandler) sendOmciIndicationToChildAdapter(ctx context.Context, childEndpoint string, response *ic.OmciMessage) error {
+	aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+	if err != nil || aClient == nil {
+		return err
+	}
+	logger.Debugw(ctx, "sending-omci-response", log.Fields{"response": response, "child-endpoint": childEndpoint})
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = aClient.OmciIndication(subCtx, response)
+	return err
+}
+
+func (dh *DeviceHandler) sendOnuIndicationToChildAdapter(ctx context.Context, childEndpoint string, onuInd *ic.OnuIndicationMessage) error {
+	aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+	if err != nil || aClient == nil {
+		return err
+	}
+	logger.Debugw(ctx, "sending-onu-indication", log.Fields{"onu-indication": onuInd, "child-endpoint": childEndpoint})
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = aClient.OnuIndication(subCtx, onuInd)
+	return err
+}
+
+func (dh *DeviceHandler) sendDeleteTContToChildAdapter(ctx context.Context, childEndpoint string, tContInfo *ic.DeleteTcontMessage) error {
+	aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+	if err != nil || aClient == nil {
+		return err
+	}
+	logger.Debugw(ctx, "sending-delete-tcont", log.Fields{"tcont": tContInfo, "child-endpoint": childEndpoint})
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = aClient.DeleteTCont(subCtx, tContInfo)
+	return err
+}
+
+func (dh *DeviceHandler) sendDeleteGemPortToChildAdapter(ctx context.Context, childEndpoint string, gemPortInfo *ic.DeleteGemPortMessage) error {
+	aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+	if err != nil || aClient == nil {
+		return err
+	}
+	logger.Debugw(ctx, "sending-delete-gem-port", log.Fields{"gem-port-info": gemPortInfo, "child-endpoint": childEndpoint})
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = aClient.DeleteGemPort(subCtx, gemPortInfo)
+	return err
+}
+
+func (dh *DeviceHandler) sendDownloadTechProfileToChildAdapter(ctx context.Context, childEndpoint string, tpDownloadInfo *ic.TechProfileDownloadMessage) error {
+	aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+	if err != nil || aClient == nil {
+		return err
+	}
+	logger.Debugw(ctx, "sending-tech-profile-download", log.Fields{"tp-download-info": tpDownloadInfo, "child-endpoint": childEndpoint})
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	_, err = aClient.DownloadTechProfile(subCtx, tpDownloadInfo)
+	return err
+}
+
+/*
+Helper functions for remote communication
+*/
+
+// TODO: Use a connection tracker such that the adapter connection is stopped when the last device that adapter
+// supports is deleted
+func (dh *DeviceHandler) setupChildInterAdapterClient(ctx context.Context, endpoint string) error {
+	logger.Infow(ctx, "setting-child-adapter-connection", log.Fields{"child-endpoint": endpoint})
+
+	dh.lockChildAdapterClients.Lock()
+	defer dh.lockChildAdapterClients.Unlock()
+	if _, ok := dh.childAdapterClients[endpoint]; ok {
+		// Already set
+		return nil
+	}
+
+	// Setup child's adapter grpc connection
+	var err error
+	if dh.childAdapterClients[endpoint], err = vgrpc.NewClient(endpoint,
+		dh.onuAdapterRestarted,
+		vgrpc.ActivityCheck(true)); err != nil {
+		logger.Errorw(ctx, "grpc-client-not-created", log.Fields{"error": err, "endpoint": endpoint})
+		return err
+	}
+	go dh.childAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), setAndTestAdapterServiceHandler)
+
+	// Wait until we have a connection to the child adapter.
+	// Unlimited retries or until context expires
+	subCtx := log.WithSpanFromContext(context.TODO(), ctx)
+	backoff := vgrpc.NewBackoff(dh.cfg.MinBackoffRetryDelay, dh.cfg.MaxBackoffRetryDelay, 0)
+	for {
+		client, err := dh.childAdapterClients[endpoint].GetOnuInterAdapterServiceClient()
+		if err == nil && client != nil {
+			logger.Infow(subCtx, "connected-to-child-adapter", log.Fields{"child-endpoint": endpoint})
+			break
+		}
+		logger.Warnw(subCtx, "connection-to-child-adapter-not-ready", log.Fields{"error": err, "child-endpoint": endpoint})
+		// Backoff
+		if err = backoff.Backoff(subCtx); err != nil {
+			logger.Errorw(subCtx, "received-error-on-backoff", log.Fields{"error": err, "child-endpoint": endpoint})
+			break
+		}
+	}
+	return nil
+}
+
+// func (dh *DeviceHandler) getChildAdapterServiceClient(endpoint string) (adapter_services.OnuInterAdapterServiceClient, error) {
+// 	dh.lockChildAdapterClients.RLock()
+// 	defer dh.lockChildAdapterClients.RUnlock()
+// 	if cgClient, ok := dh.childAdapterClients[endpoint]; ok {
+// 		return cgClient.GetOnuInterAdapterServiceClient()
+// 	}
+// 	return nil, fmt.Errorf("no-client-for-endpoint-%s", endpoint)
+// }
+
+func (dh *DeviceHandler) getChildAdapterServiceClient(endpoint string) (adapter_services.OnuInterAdapterServiceClient, error) {
+
+	// First check from cache
+	dh.lockChildAdapterClients.RLock()
+	if cgClient, ok := dh.childAdapterClients[endpoint]; ok {
+		dh.lockChildAdapterClients.RUnlock()
+		return cgClient.GetOnuInterAdapterServiceClient()
+	}
+	dh.lockChildAdapterClients.RUnlock()
+
+	// Set the child connection - can occur on restarts
+	ctx, cancel := context.WithTimeout(context.Background(), dh.cfg.RPCTimeout)
+	err := dh.setupChildInterAdapterClient(ctx, endpoint)
+	cancel()
+	if err != nil {
+		return nil, err
+	}
+
+	// Get the child client now
+	dh.lockChildAdapterClients.RLock()
+	defer dh.lockChildAdapterClients.RUnlock()
+	if cgClient, ok := dh.childAdapterClients[endpoint]; ok {
+		return cgClient.GetOnuInterAdapterServiceClient()
+	}
+	return nil, fmt.Errorf("no-client-for-endpoint-%s", endpoint)
+}
+
+func (dh *DeviceHandler) deleteAdapterClients(ctx context.Context) {
+	dh.lockChildAdapterClients.Lock()
+	defer dh.lockChildAdapterClients.Unlock()
+	for key, client := range dh.childAdapterClients {
+		client.Stop(ctx)
+		delete(dh.childAdapterClients, key)
+	}
+}
+
+// TODO:  Any action the adapter needs to do following a onu adapter restart?
+func (dh *DeviceHandler) onuAdapterRestarted(ctx context.Context, endPoint string) error {
+	logger.Warnw(ctx, "onu-adapter-restarted", log.Fields{"endpoint": endPoint})
+	return nil
+}
+
+// setAndTestAdapterServiceHandler is used to test whether the remote gRPC service is up
+func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+	svc := adapter_services.NewOnuInterAdapterServiceClient(conn)
+	if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+		return nil
+	}
+	return svc
+}