[VOL-4292] OpenOLT Adapter changes for gRPC migration

Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b79a591..bd0efd2 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -27,18 +27,18 @@
 	"sync"
 	"time"
 
-	"github.com/opencord/voltha-lib-go/v6/pkg/meters"
+	"github.com/opencord/voltha-lib-go/v7/pkg/meters"
 
-	"github.com/opencord/voltha-lib-go/v6/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v6/pkg/log"
-	tp "github.com/opencord/voltha-lib-go/v6/pkg/techprofile"
+	"github.com/opencord/voltha-lib-go/v7/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	tp "github.com/opencord/voltha-lib-go/v7/pkg/techprofile"
 	rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
-	"github.com/opencord/voltha-protos/v4/go/common"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	openoltpb2 "github.com/opencord/voltha-protos/v4/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	openoltpb2 "github.com/opencord/voltha-protos/v5/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v5/go/tech_profile"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	"google.golang.org/grpc/codes"
@@ -1719,7 +1719,7 @@
 					"intf-id":   intfID,
 					"device-id": f.deviceHandler.device.Id}, err)
 		}
-		onuDev = NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuDevice.ProxyAddress.OnuId, onuDevice.ProxyAddress.ChannelId, onuDevice.ProxyAddress.DeviceId, false)
+		onuDev = NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuDevice.ProxyAddress.OnuId, onuDevice.ProxyAddress.ChannelId, onuDevice.ProxyAddress.DeviceId, false, onuDevice.AdapterEndpoint)
 		//better to ad the device to cache here.
 		f.deviceHandler.StoreOnuDevice(onuDev.(*OnuDevice))
 	} else {
@@ -1770,32 +1770,31 @@
 		return err
 	}
 
-	delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpInstancePath: tpPath, GemPortId: gemPortID}
-	logger.Infow(ctx, "sending-gem-port-delete-to-openonu-adapter",
-		log.Fields{
-			"msg":       *delGemPortMsg,
-			"device-id": f.deviceHandler.device.Id})
-	if sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
-		delGemPortMsg,
-		ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST,
-		f.deviceHandler.openOLT.config.Topic,
-		onuDev.deviceType,
-		onuDev.deviceID,
-		onuDev.proxyDeviceID, ""); sendErr != nil {
+	delGemPortMsg := &ic.DeleteGemPortMessage{
+		DeviceId:       onuDev.deviceID,
+		UniId:          uniID,
+		TpInstancePath: tpPath,
+		GemPortId:      gemPortID,
+	}
+	logger.Debugw(ctx, "sending-gem-port-delete-to-openonu-adapter", log.Fields{"msg": *delGemPortMsg, "child-device-id": onuDev.deviceID})
+
+	if err := f.deviceHandler.sendDeleteGemPortToChildAdapter(ctx, onuDev.adapterEndpoint, delGemPortMsg); err != nil {
 		return olterrors.NewErrCommunication("send-delete-gem-port-to-onu-adapter",
 			log.Fields{
-				"from-adapter":  f.deviceHandler.openOLT.config.Topic,
-				"to-adapter":    onuDev.deviceType,
+				"from-adapter":  f.deviceHandler.openOLT.config.AdapterEndpoint,
+				"to-adapter":    onuDev.adapterEndpoint,
 				"onu-id":        onuDev.deviceID,
 				"proxyDeviceID": onuDev.proxyDeviceID,
-				"device-id":     f.deviceHandler.device.Id}, sendErr)
+				"device-id":     f.deviceHandler.device.Id}, err)
 	}
+
 	logger.Infow(ctx, "success-sending-del-gem-port-to-onu-adapter",
 		log.Fields{
-			"msg":          delGemPortMsg,
-			"from-adapter": f.deviceHandler.device.Type,
-			"to-adapter":   onuDev.deviceType,
-			"device-id":    f.deviceHandler.device.Id})
+			"msg":             delGemPortMsg,
+			"from-adapter":    f.deviceHandler.device.Type,
+			"to-adapter":      onuDev.deviceType,
+			"device-id":       f.deviceHandler.device.Id,
+			"child-device-id": onuDev.deviceID})
 	return nil
 }
 
@@ -1811,29 +1810,33 @@
 		return err
 	}
 
-	delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpInstancePath: tpPath, AllocId: allocID}
+	delTcontMsg := &ic.DeleteTcontMessage{
+		DeviceId:       onuDev.deviceID,
+		UniId:          uniID,
+		TpInstancePath: tpPath,
+		AllocId:        allocID,
+	}
+
 	logger.Debugw(ctx, "sending-tcont-delete-to-openonu-adapter",
 		log.Fields{
 			"msg":       *delTcontMsg,
 			"device-id": f.deviceHandler.device.Id})
-	if sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
-		delTcontMsg,
-		ic.InterAdapterMessageType_DELETE_TCONT_REQUEST,
-		f.deviceHandler.openOLT.config.Topic,
-		onuDev.deviceType,
-		onuDev.deviceID,
-		onuDev.proxyDeviceID, ""); sendErr != nil {
+
+	if err := f.deviceHandler.sendDeleteTContToChildAdapter(ctx, onuDev.adapterEndpoint, delTcontMsg); err != nil {
 		return olterrors.NewErrCommunication("send-delete-tcont-to-onu-adapter",
 			log.Fields{
-				"from-adapter": f.deviceHandler.openOLT.config.Topic,
-				"to-adapter":   onuDev.deviceType, "onu-id": onuDev.deviceID,
+				"from-adapter":  f.deviceHandler.openOLT.config.AdapterEndpoint,
+				"to-adapter":    onuDev.adapterEndpoint,
+				"onu-id":        onuDev.deviceID,
 				"proxyDeviceID": onuDev.proxyDeviceID,
-				"device-id":     f.deviceHandler.device.Id}, sendErr)
+				"device-id":     f.deviceHandler.device.Id}, err)
+
 	}
 	logger.Infow(ctx, "success-sending-del-tcont-to-onu-adapter",
 		log.Fields{
-			"msg":       delTcontMsg,
-			"device-id": f.deviceHandler.device.Id})
+			"msg":             delTcontMsg,
+			"device-id":       f.deviceHandler.device.Id,
+			"child-device-id": onuDev.deviceID})
 	return nil
 }
 
@@ -1977,7 +1980,8 @@
 					"onu-id":    onuID,
 					"uni-id":    uniID,
 					"device-id": f.deviceHandler.device.Id,
-					"alloc-id":  techprofileInst.AllocId})
+					"alloc-id":  techprofileInst.AllocId,
+					"error":     err})
 		}
 	default:
 		logger.Errorw(ctx, "error-unknown-tech",
@@ -2178,7 +2182,7 @@
 		f.incomingFlows[onuID] <- flowCb
 		// Wait on the channel for flow handlers return value
 		err := <-errChan
-		logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
+		logger.Infow(ctx, "process-flow-received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
 		return err
 	}
 	logger.Errorw(ctx, "flow handler routine not active for onu", log.Fields{"onuID": onuID, "ponPortIdx": f.ponPortIdx})
@@ -2420,26 +2424,22 @@
 	logger.Debugw(ctx, "got-child-device-from-olt-device-handler", log.Fields{"onu-id": onuDev.deviceID})
 
 	tpPath := f.getTPpath(ctx, intfID, uni, TpID)
-	tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{
+	tpDownloadMsg := &ic.TechProfileDownloadMessage{
+		DeviceId:       onuDev.deviceID,
 		UniId:          uniID,
 		TpInstancePath: tpPath,
-		TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: &tpInst},
+		TechTpInstance: &ic.TechProfileDownloadMessage_TpInstance{TpInstance: &tpInst},
 	}
 	logger.Debugw(ctx, "sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
-	sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
-		tpDownloadMsg,
-		ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST,
-		f.deviceHandler.openOLT.config.Topic,
-		onuDev.deviceType,
-		onuDev.deviceID,
-		onuDev.proxyDeviceID, "")
-	if sendErr != nil {
+
+	err = f.deviceHandler.sendDownloadTechProfileToChildAdapter(ctx, onuDev.adapterEndpoint, tpDownloadMsg)
+	if err != nil {
 		return olterrors.NewErrCommunication("send-techprofile-download-request",
 			log.Fields{
-				"from-adapter":  f.deviceHandler.openOLT.config.Topic,
+				"from-adapter":  f.deviceHandler.openOLT.config.AdapterEndpoint,
 				"to-adapter":    onuDev.deviceType,
 				"onu-id":        onuDev.deviceID,
-				"proxyDeviceID": onuDev.proxyDeviceID}, sendErr)
+				"proxyDeviceID": onuDev.proxyDeviceID}, err)
 	}
 	logger.Infow(ctx, "success-sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
 	return nil
@@ -3351,29 +3351,38 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) getTechProfileDownloadMessage(ctx context.Context, tpPath string, ponID uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
+func (f *OpenOltFlowMgr) getTechProfileDownloadMessage(ctx context.Context, tpPath string, uniID uint32, onuDeviceID string) (*ic.TechProfileDownloadMessage, error) {
 	tpInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
 	if err != nil {
 		logger.Errorw(ctx, "error-fetching-tp-instance", log.Fields{"tpPath": tpPath})
-		return nil
+		return nil, err
 	}
 
 	switch tpInst := tpInst.(type) {
 	case *tp_pb.TechProfileInstance:
-		logger.Debugw(ctx, "fetched-tp-instance-successfully--formulating-tp-download-msg", log.Fields{"tpPath": tpPath})
-		return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+		logger.Debugw(ctx, "fetched-tp-instance-successfully-formulating-tp-download-msg", log.Fields{"tpPath": tpPath})
+		return &ic.TechProfileDownloadMessage{
+			DeviceId:       onuDeviceID,
+			UniId:          uniID,
 			TpInstancePath: tpPath,
-			TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: tpInst},
-		}
+			TechTpInstance: &ic.TechProfileDownloadMessage_TpInstance{TpInstance: tpInst},
+		}, nil
 	case *openoltpb2.EponTechProfileInstance:
-		return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+		return &ic.TechProfileDownloadMessage{
+			DeviceId:       onuDeviceID,
+			UniId:          uniID,
 			TpInstancePath: tpPath,
-			TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_EponTpInstance{EponTpInstance: tpInst},
-		}
+			TechTpInstance: &ic.TechProfileDownloadMessage_EponTpInstance{EponTpInstance: tpInst},
+		}, nil
 	default:
 		logger.Errorw(ctx, "unknown-tech", log.Fields{"tpPath": tpPath})
 	}
-	return nil
+	return &ic.TechProfileDownloadMessage{
+		DeviceId:       onuDeviceID,
+		UniId:          uniID,
+		TpInstancePath: tpPath,
+		TechTpInstance: nil,
+	}, nil
 }
 
 func (f *OpenOltFlowMgr) getOnuGemInfoList(ctx context.Context) []rsrcMgr.OnuGemInfo {