[VOL-4292] OpenOLT Adapter changes for gRPC migration
Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index b79a591..bd0efd2 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -27,18 +27,18 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v6/pkg/meters"
+ "github.com/opencord/voltha-lib-go/v7/pkg/meters"
- "github.com/opencord/voltha-lib-go/v6/pkg/flows"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v6/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ tp "github.com/opencord/voltha-lib-go/v7/pkg/techprofile"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
- "github.com/opencord/voltha-protos/v4/go/common"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- openoltpb2 "github.com/opencord/voltha-protos/v4/go/openolt"
- tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ openoltpb2 "github.com/opencord/voltha-protos/v5/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v5/go/tech_profile"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"google.golang.org/grpc/codes"
@@ -1719,7 +1719,7 @@
"intf-id": intfID,
"device-id": f.deviceHandler.device.Id}, err)
}
- onuDev = NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuDevice.ProxyAddress.OnuId, onuDevice.ProxyAddress.ChannelId, onuDevice.ProxyAddress.DeviceId, false)
+ onuDev = NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuDevice.ProxyAddress.OnuId, onuDevice.ProxyAddress.ChannelId, onuDevice.ProxyAddress.DeviceId, false, onuDevice.AdapterEndpoint)
//better to ad the device to cache here.
f.deviceHandler.StoreOnuDevice(onuDev.(*OnuDevice))
} else {
@@ -1770,32 +1770,31 @@
return err
}
- delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpInstancePath: tpPath, GemPortId: gemPortID}
- logger.Infow(ctx, "sending-gem-port-delete-to-openonu-adapter",
- log.Fields{
- "msg": *delGemPortMsg,
- "device-id": f.deviceHandler.device.Id})
- if sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
- delGemPortMsg,
- ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST,
- f.deviceHandler.openOLT.config.Topic,
- onuDev.deviceType,
- onuDev.deviceID,
- onuDev.proxyDeviceID, ""); sendErr != nil {
+ delGemPortMsg := &ic.DeleteGemPortMessage{
+ DeviceId: onuDev.deviceID,
+ UniId: uniID,
+ TpInstancePath: tpPath,
+ GemPortId: gemPortID,
+ }
+ logger.Debugw(ctx, "sending-gem-port-delete-to-openonu-adapter", log.Fields{"msg": *delGemPortMsg, "child-device-id": onuDev.deviceID})
+
+ if err := f.deviceHandler.sendDeleteGemPortToChildAdapter(ctx, onuDev.adapterEndpoint, delGemPortMsg); err != nil {
return olterrors.NewErrCommunication("send-delete-gem-port-to-onu-adapter",
log.Fields{
- "from-adapter": f.deviceHandler.openOLT.config.Topic,
- "to-adapter": onuDev.deviceType,
+ "from-adapter": f.deviceHandler.openOLT.config.AdapterEndpoint,
+ "to-adapter": onuDev.adapterEndpoint,
"onu-id": onuDev.deviceID,
"proxyDeviceID": onuDev.proxyDeviceID,
- "device-id": f.deviceHandler.device.Id}, sendErr)
+ "device-id": f.deviceHandler.device.Id}, err)
}
+
logger.Infow(ctx, "success-sending-del-gem-port-to-onu-adapter",
log.Fields{
- "msg": delGemPortMsg,
- "from-adapter": f.deviceHandler.device.Type,
- "to-adapter": onuDev.deviceType,
- "device-id": f.deviceHandler.device.Id})
+ "msg": delGemPortMsg,
+ "from-adapter": f.deviceHandler.device.Type,
+ "to-adapter": onuDev.deviceType,
+ "device-id": f.deviceHandler.device.Id,
+ "child-device-id": onuDev.deviceID})
return nil
}
@@ -1811,29 +1810,33 @@
return err
}
- delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpInstancePath: tpPath, AllocId: allocID}
+ delTcontMsg := &ic.DeleteTcontMessage{
+ DeviceId: onuDev.deviceID,
+ UniId: uniID,
+ TpInstancePath: tpPath,
+ AllocId: allocID,
+ }
+
logger.Debugw(ctx, "sending-tcont-delete-to-openonu-adapter",
log.Fields{
"msg": *delTcontMsg,
"device-id": f.deviceHandler.device.Id})
- if sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
- delTcontMsg,
- ic.InterAdapterMessageType_DELETE_TCONT_REQUEST,
- f.deviceHandler.openOLT.config.Topic,
- onuDev.deviceType,
- onuDev.deviceID,
- onuDev.proxyDeviceID, ""); sendErr != nil {
+
+ if err := f.deviceHandler.sendDeleteTContToChildAdapter(ctx, onuDev.adapterEndpoint, delTcontMsg); err != nil {
return olterrors.NewErrCommunication("send-delete-tcont-to-onu-adapter",
log.Fields{
- "from-adapter": f.deviceHandler.openOLT.config.Topic,
- "to-adapter": onuDev.deviceType, "onu-id": onuDev.deviceID,
+ "from-adapter": f.deviceHandler.openOLT.config.AdapterEndpoint,
+ "to-adapter": onuDev.adapterEndpoint,
+ "onu-id": onuDev.deviceID,
"proxyDeviceID": onuDev.proxyDeviceID,
- "device-id": f.deviceHandler.device.Id}, sendErr)
+ "device-id": f.deviceHandler.device.Id}, err)
+
}
logger.Infow(ctx, "success-sending-del-tcont-to-onu-adapter",
log.Fields{
- "msg": delTcontMsg,
- "device-id": f.deviceHandler.device.Id})
+ "msg": delTcontMsg,
+ "device-id": f.deviceHandler.device.Id,
+ "child-device-id": onuDev.deviceID})
return nil
}
@@ -1977,7 +1980,8 @@
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
- "alloc-id": techprofileInst.AllocId})
+ "alloc-id": techprofileInst.AllocId,
+ "error": err})
}
default:
logger.Errorw(ctx, "error-unknown-tech",
@@ -2178,7 +2182,7 @@
f.incomingFlows[onuID] <- flowCb
// Wait on the channel for flow handlers return value
err := <-errChan
- logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
+ logger.Infow(ctx, "process-flow-received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
return err
}
logger.Errorw(ctx, "flow handler routine not active for onu", log.Fields{"onuID": onuID, "ponPortIdx": f.ponPortIdx})
@@ -2420,26 +2424,22 @@
logger.Debugw(ctx, "got-child-device-from-olt-device-handler", log.Fields{"onu-id": onuDev.deviceID})
tpPath := f.getTPpath(ctx, intfID, uni, TpID)
- tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{
+ tpDownloadMsg := &ic.TechProfileDownloadMessage{
+ DeviceId: onuDev.deviceID,
UniId: uniID,
TpInstancePath: tpPath,
- TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: &tpInst},
+ TechTpInstance: &ic.TechProfileDownloadMessage_TpInstance{TpInstance: &tpInst},
}
logger.Debugw(ctx, "sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
- sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
- tpDownloadMsg,
- ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST,
- f.deviceHandler.openOLT.config.Topic,
- onuDev.deviceType,
- onuDev.deviceID,
- onuDev.proxyDeviceID, "")
- if sendErr != nil {
+
+ err = f.deviceHandler.sendDownloadTechProfileToChildAdapter(ctx, onuDev.adapterEndpoint, tpDownloadMsg)
+ if err != nil {
return olterrors.NewErrCommunication("send-techprofile-download-request",
log.Fields{
- "from-adapter": f.deviceHandler.openOLT.config.Topic,
+ "from-adapter": f.deviceHandler.openOLT.config.AdapterEndpoint,
"to-adapter": onuDev.deviceType,
"onu-id": onuDev.deviceID,
- "proxyDeviceID": onuDev.proxyDeviceID}, sendErr)
+ "proxyDeviceID": onuDev.proxyDeviceID}, err)
}
logger.Infow(ctx, "success-sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
return nil
@@ -3351,29 +3351,38 @@
return nil
}
-func (f *OpenOltFlowMgr) getTechProfileDownloadMessage(ctx context.Context, tpPath string, ponID uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
+func (f *OpenOltFlowMgr) getTechProfileDownloadMessage(ctx context.Context, tpPath string, uniID uint32, onuDeviceID string) (*ic.TechProfileDownloadMessage, error) {
tpInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
if err != nil {
logger.Errorw(ctx, "error-fetching-tp-instance", log.Fields{"tpPath": tpPath})
- return nil
+ return nil, err
}
switch tpInst := tpInst.(type) {
case *tp_pb.TechProfileInstance:
- logger.Debugw(ctx, "fetched-tp-instance-successfully--formulating-tp-download-msg", log.Fields{"tpPath": tpPath})
- return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+ logger.Debugw(ctx, "fetched-tp-instance-successfully-formulating-tp-download-msg", log.Fields{"tpPath": tpPath})
+ return &ic.TechProfileDownloadMessage{
+ DeviceId: onuDeviceID,
+ UniId: uniID,
TpInstancePath: tpPath,
- TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: tpInst},
- }
+ TechTpInstance: &ic.TechProfileDownloadMessage_TpInstance{TpInstance: tpInst},
+ }, nil
case *openoltpb2.EponTechProfileInstance:
- return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+ return &ic.TechProfileDownloadMessage{
+ DeviceId: onuDeviceID,
+ UniId: uniID,
TpInstancePath: tpPath,
- TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_EponTpInstance{EponTpInstance: tpInst},
- }
+ TechTpInstance: &ic.TechProfileDownloadMessage_EponTpInstance{EponTpInstance: tpInst},
+ }, nil
default:
logger.Errorw(ctx, "unknown-tech", log.Fields{"tpPath": tpPath})
}
- return nil
+ return &ic.TechProfileDownloadMessage{
+ DeviceId: onuDeviceID,
+ UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: nil,
+ }, nil
}
func (f *OpenOltFlowMgr) getOnuGemInfoList(ctx context.Context) []rsrcMgr.OnuGemInfo {