[VOL-4292] OpenOLT Adapter changes for gRPC migration
Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 3dc62b4..c0e8cc8 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -30,26 +30,29 @@
"sync"
"time"
+ "github.com/golang/protobuf/ptypes/empty"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-protos/v5/go/adapter_services"
+
"github.com/cenkalti/backoff/v3"
"github.com/gogo/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
- "github.com/opencord/voltha-lib-go/v6/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v6/pkg/config"
- "github.com/opencord/voltha-lib-go/v6/pkg/events/eventif"
- flow_utils "github.com/opencord/voltha-lib-go/v6/pkg/flows"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- "github.com/opencord/voltha-lib-go/v6/pkg/pmmetrics"
+ "github.com/opencord/voltha-lib-go/v7/pkg/config"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+ flow_utils "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/pmmetrics"
+ conf "github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
- "github.com/opencord/voltha-protos/v4/go/common"
- "github.com/opencord/voltha-protos/v4/go/extension"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
- of "github.com/opencord/voltha-protos/v4/go/openflow_13"
- oop "github.com/opencord/voltha-protos/v4/go/openolt"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ "github.com/opencord/voltha-protos/v5/go/extension"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+ of "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ oop "github.com/opencord/voltha-protos/v5/go/openolt"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -68,21 +71,23 @@
//DeviceHandler will interact with the OLT device.
type DeviceHandler struct {
- cm *config.ConfigManager
- device *voltha.Device
- coreProxy adapterif.CoreProxy
- AdapterProxy adapterif.AdapterProxy
- EventProxy eventif.EventProxy
- openOLT *OpenOLT
- exitChannel chan int
- lockDevice sync.RWMutex
- Client oop.OpenoltClient
- transitionMap *TransitionMap
- clientCon *grpc.ClientConn
- flowMgr []*OpenOltFlowMgr
- groupMgr *OpenOltGroupMgr
- eventMgr *OpenOltEventMgr
- resourceMgr []*rsrcMgr.OpenOltResourceMgr
+ cm *config.ConfigManager
+ device *voltha.Device
+ cfg *conf.AdapterFlags
+ coreClient *vgrpc.Client
+ childAdapterClients map[string]*vgrpc.Client
+ lockChildAdapterClients sync.RWMutex
+ EventProxy eventif.EventProxy
+ openOLT *OpenOLT
+ exitChannel chan int
+ lockDevice sync.RWMutex
+ Client oop.OpenoltClient
+ transitionMap *TransitionMap
+ clientCon *grpc.ClientConn
+ flowMgr []*OpenOltFlowMgr
+ groupMgr *OpenOltGroupMgr
+ eventMgr *OpenOltEventMgr
+ resourceMgr []*rsrcMgr.OpenOltResourceMgr
deviceInfo *oop.DeviceInfo
@@ -112,14 +117,15 @@
//OnuDevice represents ONU related info
type OnuDevice struct {
- deviceID string
- deviceType string
- serialNumber string
- onuID uint32
- intfID uint32
- proxyDeviceID string
- losRaised bool
- rdiRaised bool
+ deviceID string
+ deviceType string
+ serialNumber string
+ onuID uint32
+ intfID uint32
+ proxyDeviceID string
+ losRaised bool
+ rdiRaised bool
+ adapterEndpoint string
}
type onuIndicationMsg struct {
@@ -156,7 +162,7 @@
}
//NewOnuDevice creates a new Onu Device
-func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string, losRaised bool) *OnuDevice {
+func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string, losRaised bool, adapterEndpoint string) *OnuDevice {
var device OnuDevice
device.deviceID = devID
device.deviceType = deviceTp
@@ -165,15 +171,15 @@
device.intfID = intfID
device.proxyDeviceID = proxyDevID
device.losRaised = losRaised
+ device.adapterEndpoint = adapterEndpoint
return &device
}
//NewDeviceHandler creates a new device handler
-func NewDeviceHandler(cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy, device *voltha.Device, adapter *OpenOLT, cm *config.ConfigManager) *DeviceHandler {
+func NewDeviceHandler(cc *vgrpc.Client, ep eventif.EventProxy, device *voltha.Device, adapter *OpenOLT, cm *config.ConfigManager, cfg *conf.AdapterFlags) *DeviceHandler {
var dh DeviceHandler
dh.cm = cm
- dh.coreProxy = cp
- dh.AdapterProxy = ap
+ dh.coreClient = cc
dh.EventProxy = ep
cloned := (proto.Clone(device)).(*voltha.Device)
dh.device = cloned
@@ -186,6 +192,8 @@
dh.activePorts = sync.Map{}
dh.stopIndications = make(chan bool, 1)
dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
+ dh.childAdapterClients = make(map[string]*vgrpc.Client)
+ dh.cfg = cfg
// Create a slice of buffered channels for handling concurrent mcast flow/group.
dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
dh.stopMcastHandlerRoutine = make([]chan bool, MaxNumOfGroupHandlerChannels)
@@ -219,6 +227,10 @@
defer dh.lockDevice.Unlock()
logger.Debug(ctx, "stopping-device-agent")
dh.exitChannel <- 1
+
+ // Stop the adapter grpc clients for that parent device
+ dh.deleteAdapterClients(ctx)
+
logger.Debug(ctx, "device-agent-stopped")
}
@@ -319,9 +331,19 @@
return olterrors.NewErrNotFound("port-label", log.Fields{"port-number": portNum, "port-type": portType}, err)
}
- if port, err := dh.coreProxy.GetDevicePort(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, portNum); err == nil && port.Type == portType {
+ // Check if port exists
+ port, err := dh.getPortFromCore(ctx, &ic.PortFilter{
+ DeviceId: dh.device.Id,
+ Port: portNum,
+ })
+ if err == nil && port.Type == portType {
logger.Debug(ctx, "port-already-exists-updating-oper-status-of-port")
- if err := dh.coreProxy.PortStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, portType, portNum, operStatus); err != nil {
+ err = dh.updatePortStateInCore(ctx, &ic.PortState{
+ DeviceId: dh.device.Id,
+ PortType: portType,
+ PortNo: portNum,
+ OperStatus: operStatus})
+ if err != nil {
return olterrors.NewErrAdapter("failed-to-update-port-state", log.Fields{
"device-id": dh.device.Id,
"port-type": portType,
@@ -330,9 +352,11 @@
}
return nil
}
+
// Now create Port
capacity := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
- port := &voltha.Port{
+ port = &voltha.Port{
+ DeviceId: dh.device.Id,
PortNo: portNum,
Label: label,
Type: portType,
@@ -350,7 +374,8 @@
}
logger.Debugw(ctx, "sending-port-update-to-core", log.Fields{"port": port})
// Synchronous call to update device - this method is run in its own go routine
- if err := dh.coreProxy.PortCreated(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, port); err != nil {
+ err = dh.createPortInCore(ctx, port)
+ if err != nil {
return olterrors.NewErrAdapter("error-creating-port", log.Fields{
"device-id": dh.device.Id,
"port-type": portType}, err)
@@ -360,7 +385,7 @@
}
func (dh *DeviceHandler) updateLocalDevice(ctx context.Context) {
- device, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, dh.device.Id)
+ device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
if err != nil || device == nil {
logger.Errorf(ctx, "device-not-found", log.Fields{"device-id": dh.device.Id}, err)
return
@@ -658,9 +683,12 @@
}
// Synchronous call to update device state - this method is run in its own go routine
- if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
- voltha.OperStatus_ACTIVE); err != nil {
- return olterrors.NewErrAdapter("device-update-failed", log.Fields{"device-id": dh.device.Id}, err)
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.device.Id,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ ConnStatus: voltha.ConnectStatus_REACHABLE,
+ }); err != nil {
+ return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err)
}
//Clear olt communication failure event
@@ -688,7 +716,7 @@
func (dh *DeviceHandler) doStateDown(ctx context.Context) error {
logger.Debugw(ctx, "do-state-down-start", log.Fields{"device-id": dh.device.Id})
- device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
+ device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
return olterrors.NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err)
@@ -702,12 +730,16 @@
dh.device = cloned
dh.lockDevice.Unlock()
- if err = dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ if err = dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: cloned.Id,
+ OperStatus: cloned.OperStatus,
+ ConnStatus: cloned.ConnectStatus,
+ }); err != nil {
return olterrors.NewErrAdapter("state-update-failed", log.Fields{"device-id": device.Id}, err)
}
//get the child device for the parent device
- onuDevices, err := dh.coreProxy.GetChildDevices(ctx, dh.device.Id)
+ onuDevices, err := dh.getChildDevicesFromCore(ctx, dh.device.Id)
if err != nil {
return olterrors.NewErrAdapter("child-device-fetch-failed", log.Fields{"device-id": dh.device.Id}, err)
}
@@ -715,11 +747,20 @@
// Update onu state as down in onu adapter
onuInd := oop.OnuIndication{}
onuInd.OperState = "down"
- err := dh.AdapterProxy.SendInterAdapterMessage(ctx, &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
- dh.openOLT.config.Topic, onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+
+ ogClient, err := dh.getChildAdapterServiceClient(onuDevice.AdapterEndpoint)
+ if err != nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ _, err = ogClient.OnuIndication(subCtx, &ic.OnuIndicationMessage{
+ DeviceId: onuDevice.Id,
+ OnuIndication: &onuInd,
+ })
+ cancel()
if err != nil {
_ = olterrors.NewErrCommunication("inter-adapter-send-failed", log.Fields{
- "source": dh.openOLT.config.Topic,
+ "source": dh.openOLT.config.AdapterEndpoint,
"onu-indicator": onuInd,
"device-type": onuDevice.Type,
"device-id": onuDevice.Id}, err).LogAt(log.ErrorLevel)
@@ -783,7 +824,7 @@
logger.Debugw(ctx, "olt-device-connected", log.Fields{"device-id": dh.device.Id})
// Case where OLT is disabled and then rebooted.
- device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
+ device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
return olterrors.NewErrAdapter("device-fetch-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
@@ -795,7 +836,12 @@
cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
cloned.OperStatus = voltha.OperStatus_UNKNOWN
dh.device = cloned
- if err = dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+
+ if err = dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: cloned.Id,
+ OperStatus: cloned.OperStatus,
+ ConnStatus: cloned.ConnectStatus,
+ }); err != nil {
return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
@@ -824,13 +870,13 @@
return nil
}
- ports, err := dh.coreProxy.ListDevicePorts(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id)
+ ports, err := dh.listDevicePortsFromCore(ctx, dh.device.Id)
if err != nil {
/*TODO: needs to handle error scenarios */
return olterrors.NewErrAdapter("fetch-ports-failed", log.Fields{"device-id": dh.device.Id}, err)
}
- dh.populateActivePorts(ctx, ports)
- if err := dh.disableAdminDownPorts(ctx, ports); err != nil {
+ dh.populateActivePorts(ctx, ports.Items)
+ if err := dh.disableAdminDownPorts(ctx, ports.Items); err != nil {
return olterrors.NewErrAdapter("port-status-update-failed", log.Fields{"ports": ports}, err)
}
@@ -932,7 +978,7 @@
}
// Synchronous call to update device - this method is run in its own go routine
- if err := dh.coreProxy.DeviceUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device); err != nil {
+ if err = dh.updateDeviceInCore(ctx, dh.device); err != nil {
return nil, olterrors.NewErrAdapter("device-update-failed", log.Fields{"device-id": dh.device.Id}, err)
}
@@ -948,12 +994,12 @@
return
case <-time.After(time.Duration(dh.metrics.ToPmConfigs().DefaultFreq) * time.Second):
- ports, err := dh.coreProxy.ListDevicePorts(log.WithSpanFromContext(context.Background(), ctx), dh.device.Id)
+ ports, err := dh.listDevicePortsFromCore(ctx, dh.device.Id)
if err != nil {
logger.Warnw(ctx, "failed-to-list-ports", log.Fields{"device-id": dh.device.Id, "err": err})
continue
}
- for _, port := range ports {
+ for _, port := range ports.Items {
// NNI Stats
if port.Type == voltha.Port_ETHERNET_NNI {
intfID := PortNoToIntfID(port.PortNo, voltha.Port_ETHERNET_NNI)
@@ -989,7 +1035,14 @@
dh.transitionMap.Handle(ctx, DeviceInit)
// Now, set the initial PM configuration for that device
- if err := dh.coreProxy.DevicePMConfigUpdate(ctx, dh.metrics.ToPmConfigs()); err != nil {
+ cgClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil {
+ logger.Errorw(ctx, "no-core-connection", log.Fields{"device-id": dh.device.Id, "error": err})
+ return
+ }
+
+ // Now, set the initial PM configuration for that device
+ if _, err := cgClient.DevicePMConfigUpdate(ctx, dh.metrics.ToPmConfigs()); err != nil {
_ = olterrors.NewErrAdapter("error-updating-performance-metrics", log.Fields{"device-id": device.Id}, err).LogAt(log.ErrorLevel)
}
}
@@ -1014,20 +1067,21 @@
}, nil
}
-// GetInterAdapterTechProfileDownloadMessage fetches the TechProfileDownloadMessage for the caller.
-func (dh *DeviceHandler) GetInterAdapterTechProfileDownloadMessage(ctx context.Context, tpPath string, ponPortNum uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
- ifID, err := IntfIDFromPonPortNum(ctx, ponPortNum)
+// GetTechProfileDownloadMessage fetches the TechProfileDownloadMessage for the caller.
+func (dh *DeviceHandler) GetTechProfileDownloadMessage(ctx context.Context, request *ic.TechProfileInstanceRequestMessage) (*ic.TechProfileDownloadMessage, error) {
+ ifID, err := IntfIDFromPonPortNum(ctx, request.ParentPonPort)
if err != nil {
- return nil
+ return nil, err
}
- return dh.flowMgr[ifID].getTechProfileDownloadMessage(ctx, tpPath, ifID, onuID, uniID)
+ return dh.flowMgr[ifID].getTechProfileDownloadMessage(ctx, request.TpInstancePath, request.OnuId, request.DeviceId)
}
func (dh *DeviceHandler) omciIndication(ctx context.Context, omciInd *oop.OmciIndication) error {
- logger.Debugw(ctx, "omci-indication", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
+ logger.Debugw(ctx, "omci-indication", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "parent-device-id": dh.device.Id})
var deviceType string
var deviceID string
var proxyDeviceID string
+ var childAdapterEndpoint string
transid := extractOmciTransactionID(omciInd.Pkt)
if logger.V(log.DebugLevel) {
@@ -1041,11 +1095,12 @@
logger.Debugw(ctx, "omci-indication-for-a-device-not-in-cache.", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
ponPort := IntfIDToPortNo(omciInd.GetIntfId(), voltha.Port_PON_OLT)
- kwargs := make(map[string]interface{})
- kwargs["onu_id"] = omciInd.OnuId
- kwargs["parent_port_no"] = ponPort
- onuDevice, err := dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
+ onuDevice, err := dh.getChildDeviceFromCore(ctx, &ic.ChildDeviceFilter{
+ ParentId: dh.device.Id,
+ OnuId: omciInd.OnuId,
+ ParentPortNo: ponPort,
+ })
if err != nil {
return olterrors.NewErrNotFound("onu", log.Fields{
"intf-id": omciInd.IntfId,
@@ -1054,81 +1109,73 @@
deviceType = onuDevice.Type
deviceID = onuDevice.Id
proxyDeviceID = onuDevice.ProxyAddress.DeviceId
+ childAdapterEndpoint = onuDevice.AdapterEndpoint
//if not exist in cache, then add to cache.
- dh.onus.Store(onuKey, NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID, false))
+ dh.onus.Store(onuKey, NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID, false, onuDevice.AdapterEndpoint))
} else {
//found in cache
logger.Debugw(ctx, "omci-indication-for-a-device-in-cache.", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
deviceType = onuInCache.(*OnuDevice).deviceType
deviceID = onuInCache.(*OnuDevice).deviceID
proxyDeviceID = onuInCache.(*OnuDevice).proxyDeviceID
+ childAdapterEndpoint = onuInCache.(*OnuDevice).adapterEndpoint
}
- omciMsg := &ic.InterAdapterOmciMessage{Message: omciInd.Pkt}
- if err := dh.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx), omciMsg,
- ic.InterAdapterMessageType_OMCI_RESPONSE, dh.openOLT.config.Topic, deviceType,
- deviceID, proxyDeviceID, ""); err != nil {
+ if err := dh.sendOmciIndicationToChildAdapter(ctx, childAdapterEndpoint, &ic.OmciMessage{
+ ParentDeviceId: proxyDeviceID,
+ ChildDeviceId: deviceID,
+ Message: omciInd.Pkt,
+ }); err != nil {
return olterrors.NewErrCommunication("omci-request", log.Fields{
- "source": dh.openOLT.config.Topic,
- "destination": deviceType,
+ "source": dh.openOLT.config.AdapterEndpoint,
+ "device-type": deviceType,
+ "destination": childAdapterEndpoint,
"onu-id": deviceID,
"proxy-device-id": proxyDeviceID}, err)
}
return nil
}
-//ProcessInterAdapterMessage sends the proxied messages to the target device
-// If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
-// is meant, and then send the unmarshalled omci message to this onu
-func (dh *DeviceHandler) ProcessInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
- logger.Debugw(ctx, "process-inter-adapter-message", log.Fields{"msgID": msg.Header.Id})
- if msg.Header.Type == ic.InterAdapterMessageType_OMCI_REQUEST {
- return dh.handleInterAdapterOmciMsg(ctx, msg)
- }
- return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
-}
+// //ProcessInterAdapterMessage sends the proxied messages to the target device
+// // If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
+// // is meant, and then send the unmarshalled omci message to this onu
+// func (dh *DeviceHandler) ProcessInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
+// logger.Debugw(ctx, "process-inter-adapter-message", log.Fields{"msgID": msg.Header.Id})
+// if msg.Header.Type == ic.InterAdapterMessageType_OMCI_REQUEST {
+// return dh.handleInterAdapterOmciMsg(ctx, msg)
+// }
+// return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
+// }
-func (dh *DeviceHandler) handleInterAdapterOmciMsg(ctx context.Context, msg *ic.InterAdapterMessage) error {
- msgID := msg.Header.Id
- fromTopic := msg.Header.FromTopic
- toTopic := msg.Header.ToTopic
- toDeviceID := msg.Header.ToDeviceId
- proxyDeviceID := msg.Header.ProxyDeviceId
-
- logger.Debugw(ctx, "omci-request-message-header", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
-
- msgBody := msg.GetBody()
-
- omciMsg := &ic.InterAdapterOmciMessage{}
- if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
- return olterrors.NewErrAdapter("cannot-unmarshal-omci-msg-body", log.Fields{"msgbody": msgBody}, err)
- }
+// ProxyOmciMessage sends the proxied OMCI message to the target device
+func (dh *DeviceHandler) ProxyOmciMessage(ctx context.Context, omciMsg *ic.OmciMessage) error {
+ logger.Debugw(ctx, "proxy-omci-message", log.Fields{"parent-device-id": omciMsg.ParentDeviceId, "child-device-id": omciMsg.ChildDeviceId, "proxy-address": omciMsg.ProxyAddress, "connect-status": omciMsg.ConnectStatus})
if omciMsg.GetProxyAddress() == nil {
- onuDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, toDeviceID)
+ onuDevice, err := dh.getDeviceFromCore(ctx, omciMsg.ChildDeviceId)
if err != nil {
return olterrors.NewErrNotFound("onu", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
+ "parent-device-id": dh.device.Id,
+ "child-device-id": omciMsg.ChildDeviceId}, err)
}
- logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- if err := dh.sendProxiedOmciMessage(ctx, onuDevice, omciMsg); err != nil {
+ logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"onu-device-proxy-address": onuDevice.ProxyAddress})
+ if err := dh.sendProxiedMessage(log.WithSpanFromContext(context.Background(), ctx), onuDevice, omciMsg); err != nil {
return olterrors.NewErrCommunication("send-failed", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
+ "parent-device-id": dh.device.Id,
+ "child-device-id": omciMsg.ChildDeviceId}, err)
}
} else {
- logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- if err := dh.sendProxiedOmciMessage(ctx, nil, omciMsg); err != nil {
+ logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"onu-device-proxy-address": omciMsg.ProxyAddress})
+ if err := dh.sendProxiedMessage(log.WithSpanFromContext(context.Background(), ctx), nil, omciMsg); err != nil {
return olterrors.NewErrCommunication("send-failed", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
+ "parent-device-id": dh.device.Id,
+ "child-device-id": omciMsg.ChildDeviceId}, err)
}
}
return nil
}
-func (dh *DeviceHandler) sendProxiedOmciMessage(ctx context.Context, onuDevice *voltha.Device, omciMsg *ic.InterAdapterOmciMessage) error {
+func (dh *DeviceHandler) sendProxiedMessage(ctx context.Context, onuDevice *voltha.Device, omciMsg *ic.OmciMessage) error {
var intfID uint32
var onuID uint32
var connectStatus common.ConnectStatus_Types
@@ -1200,13 +1247,6 @@
sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
- kwargs := make(map[string]interface{})
- if sn != "" {
- kwargs["serial_number"] = sn
- } else {
- return olterrors.NewErrInvalidValue(log.Fields{"serial-number": sn}, nil)
- }
-
var alarmInd oop.OnuAlarmIndication
raisedTs := time.Now().Unix()
if _, loaded := dh.discOnus.LoadOrStore(sn, true); loaded {
@@ -1244,7 +1284,10 @@
// check the ONU is already know to the OLT
// NOTE the second time the ONU is discovered this should return a device
- onuDevice, err := dh.coreProxy.GetChildDevice(ctx, dh.device.Id, kwargs)
+ onuDevice, err := dh.getChildDeviceFromCore(ctx, &ic.ChildDeviceFilter{
+ ParentId: dh.device.Id,
+ SerialNumber: sn,
+ })
if err != nil {
logger.Debugw(ctx, "core-proxy-get-child-device-failed", log.Fields{"parentDevice": dh.device.Id, "err": err, "sn": sn})
@@ -1280,8 +1323,14 @@
"serial-number": sn}, err)
}
- if onuDevice, err = dh.coreProxy.ChildDeviceDetected(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, int(parentPortNo),
- "", int(channelID), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuID)); err != nil {
+ if onuDevice, err = dh.sendChildDeviceDetectedToCore(ctx, &ic.DeviceDiscovery{
+ ParentId: dh.device.Id,
+ ParentPortNo: parentPortNo,
+ ChannelId: channelID,
+ VendorId: string(onuDiscInd.SerialNumber.GetVendorId()),
+ SerialNumber: sn,
+ OnuId: onuID,
+ }); err != nil {
dh.discOnus.Delete(sn)
dh.resourceMgr[ponintfid].FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
return olterrors.NewErrAdapter("core-proxy-child-device-detected-failed", log.Fields{
@@ -1298,6 +1347,14 @@
"device-id": dh.device.Id})
}
+ // Setup the gRPC connection to the adapter responsible for that onuDevice, if not setup yet
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ err = dh.setupChildInterAdapterClient(subCtx, onuDevice.AdapterEndpoint)
+ cancel()
+ if err != nil {
+ return olterrors.NewErrCommunication("no-connection-to-child-adapter", log.Fields{"device-id": onuDevice.Id}, err)
+ }
+
// we can now use the existing ONU Id
onuID = onuDevice.ProxyAddress.OnuId
//Insert the ONU into cache to use in OnuIndication.
@@ -1308,17 +1365,23 @@
"sn": sn})
onuKey := dh.formOnuKey(onuDiscInd.GetIntfId(), onuID)
- onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false)
+ onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false, onuDevice.AdapterEndpoint)
dh.onus.Store(onuKey, onuDev)
logger.Debugw(ctx, "new-onu-device-discovered",
log.Fields{"onu": onuDev,
"sn": sn})
- if err := dh.coreProxy.DeviceStateUpdate(ctx, onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED); err != nil {
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: onuDevice.Id,
+ ParentDeviceId: dh.device.Id,
+ OperStatus: common.OperStatus_DISCOVERED,
+ ConnStatus: common.ConnectStatus_REACHABLE,
+ }); err != nil {
return olterrors.NewErrAdapter("failed-to-update-device-state", log.Fields{
"device-id": onuDevice.Id,
"serial-number": sn}, err)
}
+
logger.Infow(ctx, "onu-discovered-reachable", log.Fields{"device-id": onuDevice.Id, "sn": sn})
if err := dh.activateONU(ctx, onuDiscInd.IntfId, int64(onuID), onuDiscInd.SerialNumber, sn); err != nil {
return olterrors.NewErrAdapter("onu-activation-failed", log.Fields{
@@ -1330,7 +1393,6 @@
func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication) error {
- kwargs := make(map[string]interface{})
ponPort := IntfIDToPortNo(onuInd.GetIntfId(), voltha.Port_PON_OLT)
var onuDevice *voltha.Device
var err error
@@ -1349,19 +1411,21 @@
//If ONU id is discovered before then use GetDevice to get onuDevice because it is cheaper.
foundInCache = true
errFields["onu-id"] = onuInCache.(*OnuDevice).deviceID
- onuDevice, err = dh.coreProxy.GetDevice(ctx, dh.device.Id, onuInCache.(*OnuDevice).deviceID)
+ onuDevice, err = dh.getDeviceFromCore(ctx, onuInCache.(*OnuDevice).deviceID)
} else {
//If ONU not found in adapter cache then we have to use GetChildDevice to get onuDevice
if serialNumber != "" {
- kwargs["serial_number"] = serialNumber
errFields["serial-number"] = serialNumber
} else {
- kwargs["onu_id"] = onuInd.OnuId
- kwargs["parent_port_no"] = ponPort
errFields["onu-id"] = onuInd.OnuId
errFields["parent-port-no"] = ponPort
}
- onuDevice, err = dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
+ onuDevice, err = dh.getChildDeviceFromCore(ctx, &ic.ChildDeviceFilter{
+ ParentId: dh.device.Id,
+ SerialNumber: serialNumber,
+ OnuId: onuInd.OnuId,
+ ParentPortNo: ponPort,
+ })
}
if err != nil || onuDevice == nil {
@@ -1383,7 +1447,7 @@
if !foundInCache {
onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.GetOnuId())
- dh.onus.Store(onuKey, NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false))
+ dh.onus.Store(onuKey, NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false, onuDevice.AdapterEndpoint))
}
if onuInd.OperState == "down" && onuInd.FailReason != oop.OnuIndication_ONU_ACTIVATION_FAIL_REASON_NONE {
@@ -1412,27 +1476,17 @@
}
switch onuInd.OperState {
- case "down":
+ case "up", "down":
logger.Debugw(ctx, "sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "device-id": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
- // TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
- err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
- dh.openOLT.config.Topic, onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+
+ err := dh.sendOnuIndicationToChildAdapter(ctx, onuDevice.AdapterEndpoint, &ic.OnuIndicationMessage{
+ DeviceId: onuDevice.Id,
+ OnuIndication: onuInd,
+ })
if err != nil {
return olterrors.NewErrCommunication("inter-adapter-send-failed", log.Fields{
"onu-indicator": onuInd,
- "source": dh.openOLT.config.Topic,
- "device-type": onuDevice.Type,
- "device-id": onuDevice.Id}, err)
- }
- case "up":
- logger.Debugw(ctx, "sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "device-id": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
- // TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
- err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
- dh.openOLT.config.Topic, onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
- if err != nil {
- return olterrors.NewErrCommunication("inter-adapter-send-failed", log.Fields{
- "onu-indicator": onuInd,
- "source": dh.openOLT.config.Topic,
+ "source": dh.openOLT.config.AdapterEndpoint,
"device-type": onuDevice.Type,
"device-id": onuDevice.Id}, err)
}
@@ -1485,10 +1539,13 @@
log.Fields{"pon-port": parentPort,
"onu-id": onuID,
"device-id": dh.device.Id})
- kwargs := make(map[string]interface{})
- kwargs["onu_id"] = onuID
- kwargs["parent_port_no"] = parentPort
- onuDevice, err := dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
+
+ onuDevice, err := dh.getChildDeviceFromCore(ctx, &ic.ChildDeviceFilter{
+ ParentId: dh.device.Id,
+ OnuId: onuID,
+ ParentPortNo: parentPort,
+ })
+
if err != nil {
return nil, olterrors.NewErrNotFound("onu-device", log.Fields{
"intf-id": parentPort,
@@ -1509,7 +1566,12 @@
"device-id": dh.device.Id,
})
}
- if err := dh.coreProxy.SendPacketIn(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, logicalPort, packetPayload); err != nil {
+
+ if err := dh.sendPacketToCore(ctx, &ic.PacketIn{
+ DeviceId: dh.device.Id,
+ Port: logicalPort,
+ Packet: packetPayload,
+ }); err != nil {
return olterrors.NewErrCommunication("packet-send-failed", log.Fields{
"source": "adapter",
"destination": "core",
@@ -1646,8 +1708,13 @@
cloned := proto.Clone(device).(*voltha.Device)
//Update device Admin state
dh.device = cloned
+
// Update the all pon ports state on that device to disable and NNI remains active as NNI remains active in openolt agent.
- if err := dh.coreProxy.PortsStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), cloned.Id, ^uint32(1<<voltha.Port_PON_OLT), voltha.OperStatus_UNKNOWN); err != nil {
+ if err := dh.updatePortsStateInCore(ctx, &ic.PortStateFilter{
+ DeviceId: cloned.Id,
+ PortTypeFilter: ^uint32(1 << voltha.Port_PON_OLT),
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ }); err != nil {
return olterrors.NewErrAdapter("ports-state-update-failed", log.Fields{"device-id": device.Id}, err)
}
logger.Debugw(ctx, "disable-device-end", log.Fields{"device-id": device.Id})
@@ -1658,18 +1725,21 @@
// Update onu state as unreachable in onu adapter
onuInd := oop.OnuIndication{}
onuInd.OperState = state
+
//get the child device for the parent device
- onuDevices, err := dh.coreProxy.GetChildDevices(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id)
+ onuDevices, err := dh.getChildDevicesFromCore(ctx, dh.device.Id)
if err != nil {
logger.Errorw(ctx, "failed-to-get-child-devices-information", log.Fields{"device-id": dh.device.Id, "err": err})
}
if onuDevices != nil {
for _, onuDevice := range onuDevices.Items {
- err := dh.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.TODO(), ctx), &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
- dh.openOLT.config.Topic, onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+ err := dh.sendOnuIndicationToChildAdapter(ctx, onuDevice.AdapterEndpoint, &ic.OnuIndicationMessage{
+ DeviceId: onuDevice.Id,
+ OnuIndication: &onuInd,
+ })
if err != nil {
logger.Errorw(ctx, "failed-to-send-inter-adapter-message", log.Fields{"OnuInd": onuInd,
- "From Adapter": dh.openOLT.config.Topic, "DeviceType": onuDevice.Type, "device-id": onuDevice.Id})
+ "From Adapter": dh.openOLT.config.AdapterEndpoint, "DeviceType": onuDevice.Type, "device-id": onuDevice.Id})
}
}
@@ -1691,19 +1761,22 @@
logger.Debug(ctx, "olt-reenabled")
// Update the all ports state on that device to enable
-
- ports, err := dh.coreProxy.ListDevicePorts(ctx, device.Id)
+ ports, err := dh.listDevicePortsFromCore(ctx, device.Id)
if err != nil {
return olterrors.NewErrAdapter("list-ports-failed", log.Fields{"device-id": device.Id}, err)
}
- if err := dh.disableAdminDownPorts(ctx, ports); err != nil {
+ if err := dh.disableAdminDownPorts(ctx, ports.Items); err != nil {
return olterrors.NewErrAdapter("port-status-update-failed-after-olt-reenable", log.Fields{"device": device}, err)
}
//Update the device oper status as ACTIVE
device.OperStatus = voltha.OperStatus_ACTIVE
dh.device = device
- if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), device.Id, device.ConnectStatus, device.OperStatus); err != nil {
+ if err := dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: device.Id,
+ OperStatus: device.OperStatus,
+ ConnStatus: device.ConnectStatus,
+ }); err != nil {
return olterrors.NewErrAdapter("state-update-failed", log.Fields{
"device-id": device.Id,
"connect-status": device.ConnectStatus,
@@ -1858,7 +1931,11 @@
})
}
- if err := dh.coreProxy.SendPacketIn(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, logicalPortNum, packetIn.Pkt); err != nil {
+ if err := dh.sendPacketToCore(ctx, &ic.PacketIn{
+ DeviceId: dh.device.Id,
+ Port: logicalPortNum,
+ Packet: packetIn.Pkt,
+ }); err != nil {
return olterrors.NewErrCommunication("send-packet-in", log.Fields{
"destination": "core",
"source": dh.device.Type,
@@ -1877,7 +1954,7 @@
}
// PacketOut sends packet-out from VOLTHA to OLT on the egress port provided
-func (dh *DeviceHandler) PacketOut(ctx context.Context, egressPortNo int, packet *of.OfpPacketOut) error {
+func (dh *DeviceHandler) PacketOut(ctx context.Context, egressPortNo uint32, packet *of.OfpPacketOut) error {
if logger.V(log.DebugLevel) {
logger.Debugw(ctx, "incoming-packet-out", log.Fields{
"device-id": dh.device.Id,
@@ -2033,7 +2110,7 @@
}
func (dh *DeviceHandler) updateStateUnreachable(ctx context.Context) {
- device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
+ device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
if err != nil || device == nil {
// One case where we have seen core returning an error for GetDevice call is after OLT device delete.
// After OLT delete, the adapter asks for OLT to reboot. When OLT is rebooted, shortly we loose heartbeat.
@@ -2048,18 +2125,29 @@
logger.Debugw(ctx, "update-state-unreachable", log.Fields{"device-id": dh.device.Id, "connect-status": device.ConnectStatus,
"admin-state": device.AdminState, "oper-status": device.OperStatus})
if device.ConnectStatus == voltha.ConnectStatus_REACHABLE {
- if err = dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+ if err = dh.updateDeviceStateInCore(ctx, &ic.DeviceStateFilter{
+ DeviceId: dh.device.Id,
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ ConnStatus: voltha.ConnectStatus_UNREACHABLE,
+ }); err != nil {
_ = olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
- if err = dh.coreProxy.PortsStateUpdate(ctx, dh.device.Id, 0, voltha.OperStatus_UNKNOWN); err != nil {
+
+ if err = dh.updatePortsStateInCore(ctx, &ic.PortStateFilter{
+ DeviceId: dh.device.Id,
+ PortTypeFilter: 0,
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ }); err != nil {
_ = olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
}
//raise olt communication failure event
raisedTs := time.Now().Unix()
- device.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
- device.OperStatus = voltha.OperStatus_UNKNOWN
- go dh.eventMgr.oltCommunicationEvent(ctx, device, raisedTs)
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+ cloned.OperStatus = voltha.OperStatus_UNKNOWN
+ dh.device = cloned // update local copy of the device
+ go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
dh.cleanupDeviceResources(ctx)
// Stop the Stats collector
@@ -2141,7 +2229,12 @@
dh.activePorts.Store(ponID, false)
logger.Infow(ctx, "disabled-pon-port", log.Fields{"out": out, "device-id": dh.device, "Port": port})
}
- if err := dh.coreProxy.PortStateUpdate(ctx, dh.device.Id, voltha.Port_PON_OLT, port.PortNo, operStatus); err != nil {
+ if err := dh.updatePortStateInCore(ctx, &ic.PortState{
+ DeviceId: dh.device.Id,
+ PortType: voltha.Port_PON_OLT,
+ PortNo: port.PortNo,
+ OperStatus: operStatus,
+ }); err != nil {
return olterrors.NewErrAdapter("port-state-update-failed", log.Fields{
"device-id": dh.device.Id,
"port": port.PortNo}, err)
@@ -2695,3 +2788,303 @@
return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_REASON_UNDEFINED)
}
+
+/*
+Helper functions to communicate with Core
+*/
+
+func (dh *DeviceHandler) getDeviceFromCore(ctx context.Context, deviceID string) (*voltha.Device, error) {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ return cClient.GetDevice(subCtx, &common.ID{Id: deviceID})
+}
+
+func (dh *DeviceHandler) getChildDeviceFromCore(ctx context.Context, childDeviceFilter *ic.ChildDeviceFilter) (*voltha.Device, error) {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ return cClient.GetChildDevice(subCtx, childDeviceFilter)
+}
+
+func (dh *DeviceHandler) updateDeviceStateInCore(ctx context.Context, deviceStateFilter *ic.DeviceStateFilter) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = cClient.DeviceStateUpdate(subCtx, deviceStateFilter)
+ return err
+}
+
+func (dh *DeviceHandler) getChildDevicesFromCore(ctx context.Context, deviceID string) (*voltha.Devices, error) {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ return cClient.GetChildDevices(subCtx, &common.ID{Id: deviceID})
+}
+
+func (dh *DeviceHandler) listDevicePortsFromCore(ctx context.Context, deviceID string) (*voltha.Ports, error) {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ return cClient.ListDevicePorts(subCtx, &common.ID{Id: deviceID})
+}
+
+func (dh *DeviceHandler) updateDeviceInCore(ctx context.Context, device *voltha.Device) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = cClient.DeviceUpdate(subCtx, device)
+ return err
+}
+
+func (dh *DeviceHandler) sendChildDeviceDetectedToCore(ctx context.Context, deviceDiscoveryInfo *ic.DeviceDiscovery) (*voltha.Device, error) {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ return cClient.ChildDeviceDetected(subCtx, deviceDiscoveryInfo)
+}
+
+func (dh *DeviceHandler) sendPacketToCore(ctx context.Context, pkt *ic.PacketIn) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = cClient.SendPacketIn(subCtx, pkt)
+ return err
+}
+
+func (dh *DeviceHandler) createPortInCore(ctx context.Context, port *voltha.Port) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = cClient.PortCreated(subCtx, port)
+ return err
+}
+
+func (dh *DeviceHandler) updatePortsStateInCore(ctx context.Context, portFilter *ic.PortStateFilter) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = cClient.PortsStateUpdate(subCtx, portFilter)
+ return err
+}
+
+func (dh *DeviceHandler) updatePortStateInCore(ctx context.Context, portState *ic.PortState) error {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = cClient.PortStateUpdate(subCtx, portState)
+ return err
+}
+
+func (dh *DeviceHandler) getPortFromCore(ctx context.Context, portFilter *ic.PortFilter) (*voltha.Port, error) {
+ cClient, err := dh.coreClient.GetCoreServiceClient()
+ if err != nil || cClient == nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ return cClient.GetDevicePort(subCtx, portFilter)
+}
+
+/*
+Helper functions to communicate with child adapter
+*/
+
+func (dh *DeviceHandler) sendOmciIndicationToChildAdapter(ctx context.Context, childEndpoint string, response *ic.OmciMessage) error {
+ aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+ if err != nil || aClient == nil {
+ return err
+ }
+ logger.Debugw(ctx, "sending-omci-response", log.Fields{"response": response, "child-endpoint": childEndpoint})
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = aClient.OmciIndication(subCtx, response)
+ return err
+}
+
+func (dh *DeviceHandler) sendOnuIndicationToChildAdapter(ctx context.Context, childEndpoint string, onuInd *ic.OnuIndicationMessage) error {
+ aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+ if err != nil || aClient == nil {
+ return err
+ }
+ logger.Debugw(ctx, "sending-onu-indication", log.Fields{"onu-indication": onuInd, "child-endpoint": childEndpoint})
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = aClient.OnuIndication(subCtx, onuInd)
+ return err
+}
+
+func (dh *DeviceHandler) sendDeleteTContToChildAdapter(ctx context.Context, childEndpoint string, tContInfo *ic.DeleteTcontMessage) error {
+ aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+ if err != nil || aClient == nil {
+ return err
+ }
+ logger.Debugw(ctx, "sending-delete-tcont", log.Fields{"tcont": tContInfo, "child-endpoint": childEndpoint})
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = aClient.DeleteTCont(subCtx, tContInfo)
+ return err
+}
+
+func (dh *DeviceHandler) sendDeleteGemPortToChildAdapter(ctx context.Context, childEndpoint string, gemPortInfo *ic.DeleteGemPortMessage) error {
+ aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+ if err != nil || aClient == nil {
+ return err
+ }
+ logger.Debugw(ctx, "sending-delete-gem-port", log.Fields{"gem-port-info": gemPortInfo, "child-endpoint": childEndpoint})
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = aClient.DeleteGemPort(subCtx, gemPortInfo)
+ return err
+}
+
+func (dh *DeviceHandler) sendDownloadTechProfileToChildAdapter(ctx context.Context, childEndpoint string, tpDownloadInfo *ic.TechProfileDownloadMessage) error {
+ aClient, err := dh.getChildAdapterServiceClient(childEndpoint)
+ if err != nil || aClient == nil {
+ return err
+ }
+ logger.Debugw(ctx, "sending-tech-profile-download", log.Fields{"tp-download-info": tpDownloadInfo, "child-endpoint": childEndpoint})
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ _, err = aClient.DownloadTechProfile(subCtx, tpDownloadInfo)
+ return err
+}
+
+/*
+Helper functions for remote communication
+*/
+
+// TODO: Use a connection tracker such that the adapter connection is stopped when the last device that adapter
+// supports is deleted
+func (dh *DeviceHandler) setupChildInterAdapterClient(ctx context.Context, endpoint string) error {
+ logger.Infow(ctx, "setting-child-adapter-connection", log.Fields{"child-endpoint": endpoint})
+
+ dh.lockChildAdapterClients.Lock()
+ defer dh.lockChildAdapterClients.Unlock()
+ if _, ok := dh.childAdapterClients[endpoint]; ok {
+ // Already set
+ return nil
+ }
+
+ // Setup child's adapter grpc connection
+ var err error
+ if dh.childAdapterClients[endpoint], err = vgrpc.NewClient(endpoint,
+ dh.onuAdapterRestarted,
+ vgrpc.ActivityCheck(true)); err != nil {
+ logger.Errorw(ctx, "grpc-client-not-created", log.Fields{"error": err, "endpoint": endpoint})
+ return err
+ }
+ go dh.childAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), setAndTestAdapterServiceHandler)
+
+ // Wait until we have a connection to the child adapter.
+ // Unlimited retries or until context expires
+ subCtx := log.WithSpanFromContext(context.TODO(), ctx)
+ backoff := vgrpc.NewBackoff(dh.cfg.MinBackoffRetryDelay, dh.cfg.MaxBackoffRetryDelay, 0)
+ for {
+ client, err := dh.childAdapterClients[endpoint].GetOnuInterAdapterServiceClient()
+ if err == nil && client != nil {
+ logger.Infow(subCtx, "connected-to-child-adapter", log.Fields{"child-endpoint": endpoint})
+ break
+ }
+ logger.Warnw(subCtx, "connection-to-child-adapter-not-ready", log.Fields{"error": err, "child-endpoint": endpoint})
+ // Backoff
+ if err = backoff.Backoff(subCtx); err != nil {
+ logger.Errorw(subCtx, "received-error-on-backoff", log.Fields{"error": err, "child-endpoint": endpoint})
+ break
+ }
+ }
+ return nil
+}
+
+// func (dh *DeviceHandler) getChildAdapterServiceClient(endpoint string) (adapter_services.OnuInterAdapterServiceClient, error) {
+// dh.lockChildAdapterClients.RLock()
+// defer dh.lockChildAdapterClients.RUnlock()
+// if cgClient, ok := dh.childAdapterClients[endpoint]; ok {
+// return cgClient.GetOnuInterAdapterServiceClient()
+// }
+// return nil, fmt.Errorf("no-client-for-endpoint-%s", endpoint)
+// }
+
+func (dh *DeviceHandler) getChildAdapterServiceClient(endpoint string) (adapter_services.OnuInterAdapterServiceClient, error) {
+
+ // First check from cache
+ dh.lockChildAdapterClients.RLock()
+ if cgClient, ok := dh.childAdapterClients[endpoint]; ok {
+ dh.lockChildAdapterClients.RUnlock()
+ return cgClient.GetOnuInterAdapterServiceClient()
+ }
+ dh.lockChildAdapterClients.RUnlock()
+
+ // Set the child connection - can occur on restarts
+ ctx, cancel := context.WithTimeout(context.Background(), dh.cfg.RPCTimeout)
+ err := dh.setupChildInterAdapterClient(ctx, endpoint)
+ cancel()
+ if err != nil {
+ return nil, err
+ }
+
+ // Get the child client now
+ dh.lockChildAdapterClients.RLock()
+ defer dh.lockChildAdapterClients.RUnlock()
+ if cgClient, ok := dh.childAdapterClients[endpoint]; ok {
+ return cgClient.GetOnuInterAdapterServiceClient()
+ }
+ return nil, fmt.Errorf("no-client-for-endpoint-%s", endpoint)
+}
+
+func (dh *DeviceHandler) deleteAdapterClients(ctx context.Context) {
+ dh.lockChildAdapterClients.Lock()
+ defer dh.lockChildAdapterClients.Unlock()
+ for key, client := range dh.childAdapterClients {
+ client.Stop(ctx)
+ delete(dh.childAdapterClients, key)
+ }
+}
+
+// TODO: Any action the adapter needs to do following a onu adapter restart?
+func (dh *DeviceHandler) onuAdapterRestarted(ctx context.Context, endPoint string) error {
+ logger.Warnw(ctx, "onu-adapter-restarted", log.Fields{"endpoint": endPoint})
+ return nil
+}
+
+// setAndTestAdapterServiceHandler is used to test whether the remote gRPC service is up
+func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+ svc := adapter_services.NewOnuInterAdapterServiceClient(conn)
+ if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+ return nil
+ }
+ return svc
+}