VOL-2293, VOL-2456 improve error handling
Change-Id: I4be5f12719a31b40363758cd47cc02968f180c75
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 6be5be7..9683d31 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -20,7 +20,6 @@
import (
"context"
"encoding/hex"
- "errors"
"fmt"
"io"
"net"
@@ -181,15 +180,13 @@
if ips, err = net.LookupHost(host); err == nil {
log.Debugw("dns-result-ips", log.Fields{"ips": ips})
if addr = net.ParseIP(ips[0]); addr == nil {
- log.Errorw("unable-to-parse-ip", log.Fields{"ip": ips[0]})
- return "", errors.New("unable-to-parse-ip")
+ return "", NewErrInvalidValue(log.Fields{"ip": ips[0]}, nil).Log()
}
genmac = macifyIP(addr)
log.Debugw("using-ip-as-mac", log.Fields{"host": ips[0], "mac": genmac})
return genmac, nil
}
- log.Errorw("cannot-resolve-hostname-to-ip", log.Fields{"host": host})
- return "", err
+ return "", NewErrAdapter("cannot-resolve-hostname-to-ip", nil, err).Log()
}
genmac = macifyIP(addr)
@@ -212,20 +209,19 @@
}
//GetportLabel returns the label for the NNI and the PON port based on port number and port type
-func GetportLabel(portNum uint32, portType voltha.Port_PortType) string {
+func GetportLabel(portNum uint32, portType voltha.Port_PortType) (string, error) {
- if portType == voltha.Port_ETHERNET_NNI {
- return fmt.Sprintf("nni-%d", portNum)
- } else if portType == voltha.Port_PON_OLT {
- return fmt.Sprintf("pon-%d", portNum)
- } else if portType == voltha.Port_ETHERNET_UNI {
- log.Errorw("local UNI management not supported", log.Fields{})
- return ""
+ switch portType {
+ case voltha.Port_ETHERNET_NNI:
+ return fmt.Sprintf("nni-%d", portNum), nil
+ case voltha.Port_PON_OLT:
+ return fmt.Sprintf("pon-%d", portNum), nil
}
- return ""
+
+ return "", NewErrInvalidValue(log.Fields{"port-type": portType}, nil).Log()
}
-func (dh *DeviceHandler) addPort(intfID uint32, portType voltha.Port_PortType, state string) {
+func (dh *DeviceHandler) addPort(intfID uint32, portType voltha.Port_PortType, state string) error {
var operStatus common.OperStatus_Types
if state == "up" {
operStatus = voltha.OperStatus_ACTIVE
@@ -236,26 +232,28 @@
dh.activePorts.Store(intfID, false)
}
portNum := IntfIDToPortNo(intfID, portType)
- label := GetportLabel(portNum, portType)
- if len(label) == 0 {
- log.Errorw("Invalid-port-label", log.Fields{"portNum": portNum, "portType": portType})
- return
+ label, err := GetportLabel(portNum, portType)
+ if err != nil {
+ return NewErrNotFound("port-label", log.Fields{"port-number": portNum, "port-type": portType}, nil).Log()
}
device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
if err != nil || device == nil {
- log.Errorw("Failed-to-fetch-device", log.Fields{"err": err})
- return
+ return NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err).Log()
}
if device.Ports != nil {
for _, dPort := range device.Ports {
if dPort.Type == portType && dPort.PortNo == portNum {
log.Debug("port-already-exists-updating-oper-status-of-port")
if err := dh.coreProxy.PortStateUpdate(context.TODO(), dh.device.Id, portType, portNum, operStatus); err != nil {
- log.Errorw("failed-to-update-port-state", log.Fields{"err": err})
- return
+ return NewErrAdapter("failed-to-update-port-state", log.Fields{
+ "device-id": dh.device.Id,
+ "port-type": portType,
+ "port-number": portNum,
+ "oper-status": operStatus}, err).Log()
+
}
- return
+ return nil
}
}
}
@@ -269,30 +267,28 @@
log.Debugw("Sending-port-update-to-core", log.Fields{"port": port})
// Synchronous call to update device - this method is run in its own go routine
if err := dh.coreProxy.PortCreated(context.TODO(), dh.device.Id, port); err != nil {
- log.Errorw("Error-creating-port", log.Fields{"deviceID": dh.device.Id, "portType": portType, "error": err})
- return
+ return NewErrAdapter("Error-creating-port", log.Fields{
+ "device-id": dh.device.Id,
+ "port-type": portType}, err).Log()
}
- return
+ return nil
}
// readIndications to read the indications from the OLT device
-func (dh *DeviceHandler) readIndications(ctx context.Context) {
- defer log.Errorw("Indications ended", log.Fields{})
+func (dh *DeviceHandler) readIndications(ctx context.Context) error {
+ defer log.Debugw("indications-ended", log.Fields{"device-id": dh.device.Id})
indications, err := dh.Client.EnableIndication(ctx, new(oop.Empty))
if err != nil {
- log.Errorw("Failed to read indications", log.Fields{"err": err})
- return
+ return NewErrCommunication("fail-to-read-indications", log.Fields{"device-id": dh.device.Id}, err).Log()
}
if indications == nil {
- log.Errorw("Indications is nil", log.Fields{})
- return
+ return NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil).Log()
}
/* get device state */
device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
- log.Errorw("Failed to fetch device info", log.Fields{"err": err})
- return
+ return NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err).Log()
}
// When the device is in DISABLED and Adapter container restarts, we need to
// rebuild the locally maintained admin state.
@@ -325,8 +321,7 @@
time.Sleep(indicationBackoff.NextBackOff())
indications, err = dh.Client.EnableIndication(ctx, new(oop.Empty))
if err != nil {
- log.Errorw("Failed to read indications", log.Fields{"err": err})
- return
+ return NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
}
continue
}
@@ -338,7 +333,7 @@
}
dh.transitionMap.Handle(ctx, DeviceDownInd)
dh.transitionMap.Handle(ctx, DeviceInit)
- break
+ return NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
}
// Reset backoff if we have a successful receive
indicationBackoff.Reset()
@@ -354,9 +349,10 @@
dh.handleIndication(ctx, indication)
}
+ return nil
}
-func (dh *DeviceHandler) handleOltIndication(ctx context.Context, oltIndication *oop.OltIndication) {
+func (dh *DeviceHandler) handleOltIndication(ctx context.Context, oltIndication *oop.OltIndication) error {
raisedTs := time.Now().UnixNano()
if oltIndication.OperState == "up" && dh.transitionMap.currentDeviceState != deviceStateUp {
dh.transitionMap.Handle(ctx, DeviceUpInd)
@@ -364,27 +360,48 @@
dh.transitionMap.Handle(ctx, DeviceDownInd)
}
// Send or clear Alarm
- dh.eventMgr.oltUpDownIndication(oltIndication, dh.deviceID, raisedTs)
+ if err := dh.eventMgr.oltUpDownIndication(oltIndication, dh.deviceID, raisedTs); err != nil {
+ return NewErrAdapter("failed-indication", log.Fields{
+ "device_id": dh.deviceID,
+ "indication": oltIndication,
+ "timestamp": raisedTs}, err).Log()
+ }
+ return nil
}
+// nolint: gocyclo
func (dh *DeviceHandler) handleIndication(ctx context.Context, indication *oop.Indication) {
raisedTs := time.Now().UnixNano()
switch indication.Data.(type) {
case *oop.Indication_OltInd:
- dh.handleOltIndication(ctx, indication.GetOltInd())
+ if err := dh.handleOltIndication(ctx, indication.GetOltInd()); err != nil {
+ NewErrAdapter("handle-indication-error", log.Fields{"type": "olt"}, err).Log()
+ }
case *oop.Indication_IntfInd:
intfInd := indication.GetIntfInd()
- go dh.addPort(intfInd.GetIntfId(), voltha.Port_PON_OLT, intfInd.GetOperState())
+ go func() {
+ if err := dh.addPort(intfInd.GetIntfId(), voltha.Port_PON_OLT, intfInd.GetOperState()); err != nil {
+ NewErrAdapter("handle-indication-error", log.Fields{"type": "interface"}, err).Log()
+ }
+ }()
log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
case *oop.Indication_IntfOperInd:
intfOperInd := indication.GetIntfOperInd()
if intfOperInd.GetType() == "nni" {
- go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState())
+ go func() {
+ if err := dh.addPort(intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState()); err != nil {
+ NewErrAdapter("handle-indication-error", log.Fields{"type": "interface-oper-nni"}, err).Log()
+ }
+ }()
dh.resourceMgr.AddNNIToKVStore(ctx, intfOperInd.GetIntfId())
} else if intfOperInd.GetType() == "pon" {
// TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
// Handle pon port update
- go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_PON_OLT, intfOperInd.GetOperState())
+ go func() {
+ if err := dh.addPort(intfOperInd.GetIntfId(), voltha.Port_PON_OLT, intfOperInd.GetOperState()); err != nil {
+ NewErrAdapter("handle-indication-error", log.Fields{"type": "interface-oper-pon"}, err).Log()
+ }
+ }()
go dh.eventMgr.oltIntfOperIndication(indication.GetIntfOperInd(), dh.deviceID, raisedTs)
}
log.Infow("Received interface oper indication ", log.Fields{"InterfaceOperInd": intfOperInd})
@@ -392,19 +409,35 @@
onuDiscInd := indication.GetOnuDiscInd()
log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
- go dh.onuDiscIndication(ctx, onuDiscInd, sn)
+ go func() {
+ if err := dh.onuDiscIndication(ctx, onuDiscInd, sn); err != nil {
+ NewErrAdapter("handle-indication-error", log.Fields{"type": "onu-discovery"}, err).Log()
+ }
+ }()
case *oop.Indication_OnuInd:
onuInd := indication.GetOnuInd()
log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
- go dh.onuIndication(onuInd)
+ go func() {
+ if err := dh.onuIndication(onuInd); err != nil {
+ NewErrAdapter("handle-indication-error", log.Fields{"type": "onu"}, err).Log()
+ }
+ }()
case *oop.Indication_OmciInd:
omciInd := indication.GetOmciInd()
log.Debugw("Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
- go dh.omciIndication(omciInd)
+ go func() {
+ if err := dh.omciIndication(omciInd); err != nil {
+ NewErrAdapter("handle-indication-error", log.Fields{"type": "omci"}, err).Log()
+ }
+ }()
case *oop.Indication_PktInd:
pktInd := indication.GetPktInd()
log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
- go dh.handlePacketIndication(ctx, pktInd)
+ go func() {
+ if err := dh.handlePacketIndication(ctx, pktInd); err != nil {
+ NewErrAdapter("handle-indication-error", log.Fields{"type": "packet"}, err).Log()
+ }
+ }()
case *oop.Indication_PortStats:
portStats := indication.GetPortStats()
go dh.portStats.PortStatisticsIndication(portStats, dh.resourceMgr.DevInfo.GetPonPorts())
@@ -423,8 +456,7 @@
// Synchronous call to update device state - this method is run in its own go routine
if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
voltha.OperStatus_ACTIVE); err != nil {
- log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceID": dh.device.Id, "error": err})
- return err
+ return NewErrAdapter("device-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
}
return nil
}
@@ -438,15 +470,13 @@
device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
- log.Errorw("Failed to fetch device device", log.Fields{"err": err})
- return errors.New("failed to fetch device device")
+ return NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err).Log()
}
cloned := proto.Clone(device).(*voltha.Device)
// Update the all ports state on that device to disable
- if er := dh.coreProxy.PortsStateUpdate(ctx, cloned.Id, voltha.OperStatus_UNKNOWN); er != nil {
- log.Errorw("updating-ports-failed", log.Fields{"deviceID": device.Id, "error": er})
- return er
+ if err = dh.coreProxy.PortsStateUpdate(ctx, cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
+ return NewErrAdapter("port-update-failed", log.Fields{"device-id": device.Id}, err).Log()
}
//Update the device oper state and connection status
@@ -454,44 +484,45 @@
cloned.ConnectStatus = common.ConnectStatus_UNREACHABLE
dh.device = cloned
- if er := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); er != nil {
- log.Errorw("error-updating-device-state", log.Fields{"deviceID": device.Id, "error": er})
- return er
+ if err = dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ return NewErrAdapter("state-update-failed", log.Fields{"device-id": device.Id}, err).Log()
}
//get the child device for the parent device
onuDevices, err := dh.coreProxy.GetChildDevices(ctx, dh.device.Id)
if err != nil {
- log.Errorw("failed to get child devices information", log.Fields{"deviceID": dh.device.Id, "error": err})
- return err
+ return NewErrAdapter("child-device-fetch-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
}
for _, onuDevice := range onuDevices.Items {
// Update onu state as down in onu adapter
onuInd := oop.OnuIndication{}
onuInd.OperState = "down"
- er := dh.AdapterProxy.SendInterAdapterMessage(ctx, &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
+ err := dh.AdapterProxy.SendInterAdapterMessage(ctx, &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
- if er != nil {
- log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
- "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
+ if err != nil {
+ NewErrCommunication("inter-adapter-send-failed", log.Fields{
+ "source": "openolt",
+ "onu-indicator": onuInd,
+ "device-type": onuDevice.Type,
+ "device-id": onuDevice.Id}, err).LogAt(log.ErrorLevel)
//Do not return here and continue to process other ONUs
}
}
/* Discovered ONUs entries need to be cleared , since after OLT
is up, it starts sending discovery indications again*/
dh.discOnus = sync.Map{}
- log.Debugw("do-state-down-end", log.Fields{"deviceID": device.Id})
+ log.Debugw("do-state-down-end", log.Fields{"device-id": device.Id})
return nil
}
// doStateInit dial the grpc before going to init state
func (dh *DeviceHandler) doStateInit(ctx context.Context) error {
var err error
- dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure(), grpc.WithBlock())
- if err != nil {
- log.Errorw("Failed to dial device", log.Fields{"DeviceId": dh.deviceID, "HostAndPort": dh.device.GetHostAndPort(), "err": err})
- return err
+ if dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure(), grpc.WithBlock()); err != nil {
+ return NewErrCommunication("dial-failure", log.Fields{
+ "device-id": dh.deviceID,
+ "host-and-port": dh.device.GetHostAndPort()}, err).Log()
}
return nil
}
@@ -513,7 +544,7 @@
device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
- log.Errorw("Failed to fetch device device", log.Fields{"err": err})
+ NewErrAdapter("device-fetch-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
cloned := proto.Clone(device).(*voltha.Device)
@@ -521,48 +552,47 @@
cloned.OperStatus = voltha.OperStatus_UNKNOWN
dh.device = cloned
if er := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); er != nil {
- log.Errorw("error-updating-device-state", log.Fields{"deviceID": dh.device.Id, "error": er})
+ NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
// Since the device was disabled before the OLT was rebooted, enforce the OLT to be Disabled after re-connection.
_, err = dh.Client.DisableOlt(ctx, new(oop.Empty))
if err != nil {
- log.Errorw("Failed to disable olt ", log.Fields{"err": err})
+ NewErrAdapter("olt-disable-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
// Start reading indications
- go dh.readIndications(ctx)
+ go func() {
+ if err := dh.readIndications(ctx); err != nil {
+ NewErrAdapter("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+ }
+ }()
return nil
}
deviceInfo, err := dh.populateDeviceInfo()
if err != nil {
- log.Errorw("Unable to populate Device Info", log.Fields{"err": err})
- return err
+ return NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
}
device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
- log.Errorw("Failed to fetch device device", log.Fields{"err": err})
- return err
+ return NewErrAdapter("fetch-device-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
}
dh.populateActivePorts(device)
if err := dh.disableAdminDownPorts(device); err != nil {
- log.Errorw("Error-on-updating-port-status", log.Fields{"device": device})
- return err
+ return NewErrAdapter("port-status-update-failed", log.Fields{"device": device}, err).Log()
}
KVStoreHostPort := fmt.Sprintf("%s:%d", dh.openOLT.KVStoreHost, dh.openOLT.KVStorePort)
// Instantiate resource manager
if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.deviceID, KVStoreHostPort, dh.openOLT.KVStoreType, dh.deviceType, deviceInfo); dh.resourceMgr == nil {
- log.Error("Error while instantiating resource manager")
- return errors.New("instantiating resource manager failed")
+ return ErrResourceManagerInstantiating.Log()
}
// Instantiate flow manager
if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
- log.Error("Error while instantiating flow manager")
- return errors.New("instantiating flow manager failed")
+ return ErrResourceManagerInstantiating.Log()
}
/* TODO: Instantiate Alarm , stats , BW managers */
/* Instantiating Event Manager to handle Alarms and KPIs */
@@ -571,7 +601,11 @@
dh.portStats = NewOpenOltStatsMgr(dh)
// Start reading indications
- go dh.readIndications(ctx)
+ go func() {
+ if err := dh.readIndications(ctx); err != nil {
+ NewErrAdapter("read-indications-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
+ }
+ }()
return nil
}
@@ -582,12 +616,10 @@
deviceInfo, err = dh.Client.GetDeviceInfo(context.Background(), new(oop.Empty))
if err != nil {
- log.Errorw("Failed to fetch device info", log.Fields{"err": err})
- return nil, err
+ return nil, NewErrPersistence("get", "device", 0, nil, err).Log()
}
if deviceInfo == nil {
- log.Errorw("Device info is nil", log.Fields{})
- return nil, errors.New("failed to get device info from OLT")
+ return nil, NewErrInvalidValue(log.Fields{"device": nil}, nil).Log()
}
log.Debugw("Fetched device info", log.Fields{"deviceInfo": deviceInfo})
@@ -603,7 +635,7 @@
host := strings.Split(dh.device.GetHostAndPort(), ":")[0]
genmac, err := generateMacFromHost(host)
if err != nil {
- return nil, err
+ return nil, NewErrAdapter("failed-to-generate-mac-host", log.Fields{"host": host}, err).Log()
}
log.Debugw("using-host-for-mac-address", log.Fields{"host": host, "mac": genmac})
dh.device.MacAddress = genmac
@@ -613,8 +645,7 @@
// Synchronous call to update device - this method is run in its own go routine
if err := dh.coreProxy.DeviceUpdate(context.TODO(), dh.device); err != nil {
- log.Errorw("error-updating-device", log.Fields{"deviceID": dh.device.Id, "error": err})
- return nil, err
+ return nil, NewErrAdapter("device-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
}
return deviceInfo, nil
@@ -663,7 +694,7 @@
// Now, set the initial PM configuration for that device
if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
- log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+ NewErrAdapter("error-updating-performance-metrics", log.Fields{"device-id": device.Id}, err).LogAt(log.ErrorLevel)
}
go startCollector(dh)
@@ -711,7 +742,7 @@
}, nil
}
-func (dh *DeviceHandler) omciIndication(omciInd *oop.OmciIndication) {
+func (dh *DeviceHandler) omciIndication(omciInd *oop.OmciIndication) error {
log.Debugw("omci indication", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
var deviceType string
var deviceID string
@@ -729,8 +760,9 @@
onuDevice, err := dh.coreProxy.GetChildDevice(context.TODO(), dh.device.Id, kwargs)
if err != nil {
- log.Errorw("onu not found", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId, "error": err})
- return
+ return NewErrNotFound("onu", log.Fields{
+ "interface-id": omciInd.IntfId,
+ "onu-id": omciInd.OnuId}, err).Log()
}
deviceType = onuDevice.Type
deviceID = onuDevice.Id
@@ -746,13 +778,16 @@
}
omciMsg := &ic.InterAdapterOmciMessage{Message: omciInd.Pkt}
- if sendErr := dh.AdapterProxy.SendInterAdapterMessage(context.Background(), omciMsg,
+ if err := dh.AdapterProxy.SendInterAdapterMessage(context.Background(), omciMsg,
ic.InterAdapterMessageType_OMCI_REQUEST, dh.deviceType, deviceType,
- deviceID, proxyDeviceID, ""); sendErr != nil {
- log.Errorw("send omci request error", log.Fields{"fromAdapter": dh.deviceType, "toAdapter": deviceType, "onuID": deviceID, "proxyDeviceID": proxyDeviceID, "error": sendErr})
- return
+ deviceID, proxyDeviceID, ""); err != nil {
+ return NewErrCommunication("omci-request", log.Fields{
+ "source": dh.deviceType,
+ "destination": deviceType,
+ "onu-id": deviceID,
+ "proxy-device-id": proxyDeviceID}, err).Log()
}
- return
+ return nil
}
//ProcessInterAdapterMessage sends the proxied messages to the target device
@@ -780,24 +815,32 @@
if omciMsg.GetProxyAddress() == nil {
onuDevice, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, toDeviceID)
if err != nil {
- log.Errorw("onu not found", log.Fields{"onuDeviceId": toDeviceID, "error": err})
- return err
+ return NewErrNotFound("onu", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err).Log()
}
log.Debugw("device retrieved from core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- dh.sendProxiedMessage(onuDevice, omciMsg)
-
+ if err := dh.sendProxiedMessage(onuDevice, omciMsg); err != nil {
+ return NewErrCommunication("send-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err).Log()
+ }
} else {
log.Debugw("Proxy Address found in omci message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- dh.sendProxiedMessage(nil, omciMsg)
+ if err := dh.sendProxiedMessage(nil, omciMsg); err != nil {
+ return NewErrCommunication("send-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err).Log()
+ }
}
} else {
- log.Errorw("inter-adapter-unhandled-type", log.Fields{"msgType": msg.Header.Type})
+ return NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil).Log()
}
return nil
}
-func (dh *DeviceHandler) sendProxiedMessage(onuDevice *voltha.Device, omciMsg *ic.InterAdapterOmciMessage) {
+func (dh *DeviceHandler) sendProxiedMessage(onuDevice *voltha.Device, omciMsg *ic.InterAdapterOmciMessage) error {
var intfID uint32
var onuID uint32
var connectStatus common.ConnectStatus_Types
@@ -812,7 +855,10 @@
}
if connectStatus != voltha.ConnectStatus_REACHABLE {
log.Debugw("ONU is not reachable, cannot send OMCI", log.Fields{"intfID": intfID, "onuID": onuID})
- return
+
+ return NewErrCommunication("unreachable", log.Fields{
+ "interface-id": intfID,
+ "onu-id": onuID}, nil).Log()
}
// TODO: Once we are sure openonu/openomci is sending only binary in omciMsg.Message, we can remove this check
@@ -834,13 +880,16 @@
_, err := dh.Client.OmciMsgOut(context.Background(), omciMessage)
if err != nil {
- log.Errorw("unable to send omci-msg-out", log.Fields{"IntfID": intfID, "OnuID": onuID, "Msg": omciMessage})
- return
+ return NewErrCommunication("omci-send-failed", log.Fields{
+ "interface-id": intfID,
+ "onu-id": onuID,
+ "message": omciMessage}, err).Log()
}
log.Debugw("Sent Omci message", log.Fields{"intfID": intfID, "onuID": onuID, "omciMsg": hex.EncodeToString(omciMsg.Message)})
+ return nil
}
-func (dh *DeviceHandler) activateONU(ctx context.Context, intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) {
+func (dh *DeviceHandler) activateONU(ctx context.Context, intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) error {
log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
dh.flowMgr.UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber)
// TODO: need resource manager
@@ -851,14 +900,15 @@
if st.Code() == codes.AlreadyExists {
log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
} else {
- log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
+ return NewErrAdapter("onu-activate-failed", log.Fields{"onu": Onu}, err).Log()
}
} else {
log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
}
+ return nil
}
-func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) {
+func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) error {
channelID := onuDiscInd.GetIntfId()
parentPortNo := IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
@@ -869,13 +919,12 @@
if sn != "" {
kwargs["serial_number"] = sn
} else {
- log.Errorw("invalid-onu-serial-number", log.Fields{"sn": sn})
- return
+ return NewErrInvalidValue(log.Fields{"serial-number": sn}, nil).Log()
}
if _, loaded := dh.discOnus.LoadOrStore(sn, true); loaded {
log.Warnw("onu-sn-is-already-being-processed", log.Fields{"sn": sn})
- return
+ return nil
}
var onuID uint32
@@ -885,7 +934,7 @@
onuDevice, err := dh.coreProxy.GetChildDevice(ctx, dh.device.Id, kwargs)
if err != nil {
- log.Errorw("core-proxy-get-child-device-failed", log.Fields{"parentDevice": dh.device.Id, "err": err, "sn": sn})
+ log.Warnw("core-proxy-get-child-device-failed", log.Fields{"parentDevice": dh.device.Id, "err": err, "sn": sn})
if e, ok := status.FromError(err); ok {
log.Warnw("core-proxy-get-child-device-failed-with-code", log.Fields{"errCode": e.Code(), "sn": sn})
switch e.Code() {
@@ -895,7 +944,7 @@
case codes.DeadlineExceeded:
// if the call times out, cleanup and exit
dh.discOnus.Delete(sn)
- return
+ return NewErrTimeout("get-child-device", log.Fields{"device-id": dh.device.Id}, err).Log()
}
}
}
@@ -914,17 +963,19 @@
if err != nil {
// if we can't create an ID in resource manager,
// cleanup and exit
- log.Warnw("resource-manage-get-onu-id-failed", log.Fields{"pon-intf-id": ponintfid, "err": err, "sn": sn})
dh.discOnus.Delete(sn)
- return
+ return NewErrAdapter("resource-manage-get-onu-id-failed", log.Fields{
+ "pon-interface-id": ponintfid,
+ "serial-number": sn}, err).Log()
}
if onuDevice, err = dh.coreProxy.ChildDeviceDetected(context.TODO(), dh.device.Id, int(parentPortNo),
"", int(channelID), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuID)); err != nil {
- log.Warnw("core-proxy-child-device-detected-failed", log.Fields{"pon-intf-id": ponintfid, "err": err, "sn": sn})
dh.discOnus.Delete(sn)
dh.resourceMgr.FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
- return
+ return NewErrAdapter("core-proxy-child-device-detected-failed", log.Fields{
+ "pon-interface-id": ponintfid,
+ "serial-number": sn}, err).Log()
}
log.Infow("onu-child-device-added", log.Fields{"onuDevice": onuDevice, "sn": sn})
@@ -943,119 +994,140 @@
dh.onus.Store(onuKey, onuDev)
log.Debugw("new-onu-device-discovered", log.Fields{"onu": onuDev, "sn": sn})
- err = dh.coreProxy.DeviceStateUpdate(ctx, onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED)
- if err != nil {
- log.Errorw("failed-to-update-device-state", log.Fields{"DeviceID": onuDevice.Id, "sn": sn, "err": err})
- return
+ if err = dh.coreProxy.DeviceStateUpdate(ctx, onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED); err != nil {
+ return NewErrAdapter("failed-to-update-device-state", log.Fields{
+ "device-id": onuDevice.Id,
+ "serial-number": sn}, err).Log()
}
log.Infow("onu-discovered-reachable", log.Fields{"deviceId": onuDevice.Id, "sn": sn})
//TODO: We put this sleep here to prevent the race between state update and onuIndication
//In onuIndication the operStatus of device is checked. If it is still not updated in KV store
//then the initialisation fails.
time.Sleep(1 * time.Second)
- dh.activateONU(ctx, onuDiscInd.IntfId, int64(onuID), onuDiscInd.SerialNumber, sn)
- return
+ if err = dh.activateONU(ctx, onuDiscInd.IntfId, int64(onuID), onuDiscInd.SerialNumber, sn); err != nil {
+ return NewErrAdapter("onu-activation-failed", log.Fields{
+ "device-id": onuDevice.Id,
+ "serial-number": sn}, err).Log()
+ }
+ return nil
}
-func (dh *DeviceHandler) onuIndication(onuInd *oop.OnuIndication) {
+func (dh *DeviceHandler) onuIndication(onuInd *oop.OnuIndication) error {
serialNumber := dh.stringifySerialNumber(onuInd.SerialNumber)
kwargs := make(map[string]interface{})
ponPort := IntfIDToPortNo(onuInd.GetIntfId(), voltha.Port_PON_OLT)
var onuDevice *voltha.Device
+ var err error
foundInCache := false
log.Debugw("ONU indication key create", log.Fields{"onuId": onuInd.OnuId,
"intfId": onuInd.GetIntfId()})
onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.OnuId)
+ errFields := log.Fields{"device-id": dh.device.Id}
+
if onuInCache, ok := dh.onus.Load(onuKey); ok {
//If ONU id is discovered before then use GetDevice to get onuDevice because it is cheaper.
foundInCache = true
- onuDevice, _ = dh.coreProxy.GetDevice(nil, dh.device.Id, onuInCache.(*OnuDevice).deviceID)
+ errFields["onu-id"] = onuInCache.(*OnuDevice).deviceID
+ onuDevice, err = dh.coreProxy.GetDevice(nil, dh.device.Id, onuInCache.(*OnuDevice).deviceID)
} else {
//If ONU not found in adapter cache then we have to use GetChildDevice to get onuDevice
if serialNumber != "" {
kwargs["serial_number"] = serialNumber
+ errFields["serial-number"] = serialNumber
} else {
kwargs["onu_id"] = onuInd.OnuId
kwargs["parent_port_no"] = ponPort
+ errFields["onu-id"] = onuInd.OnuId
+ errFields["parent-port-no"] = ponPort
}
- onuDevice, _ = dh.coreProxy.GetChildDevice(context.TODO(), dh.device.Id, kwargs)
+ onuDevice, err = dh.coreProxy.GetChildDevice(context.TODO(), dh.device.Id, kwargs)
}
- if onuDevice != nil {
- if onuDevice.ParentPortNo != ponPort {
- //log.Warnw("ONU-is-on-a-different-intf-id-now", log.Fields{"previousIntfId": intfIDFromPortNo(onuDevice.ParentPortNo), "currentIntfId": onuInd.GetIntfId()})
- log.Warnw("ONU-is-on-a-different-intf-id-now", log.Fields{"previousIntfId": onuDevice.ParentPortNo, "currentIntfId": ponPort})
- }
-
- if onuDevice.ProxyAddress.OnuId != onuInd.OnuId {
- log.Warnw("ONU-id-mismatch, can happen if both voltha and the olt rebooted", log.Fields{"expected_onu_id": onuDevice.ProxyAddress.OnuId, "received_onu_id": onuInd.OnuId})
- }
- if !foundInCache {
- onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.GetOnuId())
-
- dh.onus.Store(onuKey, NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId))
-
- }
- dh.updateOnuStates(onuDevice, onuInd, foundInCache)
-
- } else {
- log.Errorw("onu not found", log.Fields{"intfID": onuInd.IntfId, "onuID": onuInd.OnuId})
- return
+ if err != nil || onuDevice == nil {
+ return NewErrNotFound("onu-device", errFields, err).Log()
}
+ if onuDevice.ParentPortNo != ponPort {
+ log.Warnw("ONU-is-on-a-different-intf-id-now", log.Fields{
+ "previousIntfId": onuDevice.ParentPortNo,
+ "currentIntfId": ponPort})
+ }
+
+ if onuDevice.ProxyAddress.OnuId != onuInd.OnuId {
+ log.Warnw("ONU-id-mismatch, can happen if both voltha and the olt rebooted", log.Fields{
+ "expected_onu_id": onuDevice.ProxyAddress.OnuId,
+ "received_onu_id": onuInd.OnuId})
+ }
+ if !foundInCache {
+ onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.GetOnuId())
+
+ dh.onus.Store(onuKey, NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId))
+
+ }
+ if err := dh.updateOnuStates(onuDevice, onuInd, foundInCache); err != nil {
+ return NewErrCommunication("state-update-failed", errFields, err).Log()
+ }
+ return nil
}
-func (dh *DeviceHandler) updateOnuStates(onuDevice *voltha.Device, onuInd *oop.OnuIndication, foundInCache bool) {
+func (dh *DeviceHandler) updateOnuStates(onuDevice *voltha.Device, onuInd *oop.OnuIndication, foundInCache bool) error {
ctx := context.TODO()
log.Debugw("onu-indication-for-state", log.Fields{"onuIndication": onuInd, "DeviceId": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
dh.updateOnuAdminState(onuInd)
- // operState
- if onuInd.OperState == "down" {
+ switch onuInd.OperState {
+ case "down":
log.Debugw("sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "DeviceId": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
// TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
if err != nil {
- log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
- "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
+ return NewErrCommunication("inter-adapter-send-failed", log.Fields{
+ "onu-indicator": onuInd,
+ "source": "openolt",
+ "device-type": onuDevice.Type,
+ "device-id": onuDevice.Id}, err).Log()
}
- } else if onuInd.OperState == "up" {
+ case "up":
// Ignore operstatus if device was found in cache
if !foundInCache && onuDevice.OperStatus != common.OperStatus_DISCOVERED {
log.Warnw("ignore-onu-indication", log.Fields{"intfID": onuInd.IntfId, "onuID": onuInd.OnuId, "operStatus": onuDevice.OperStatus, "msgOperStatus": onuInd.OperState})
- return
+ return nil
}
log.Debugw("sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "DeviceId": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
// TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
if err != nil {
- log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
- "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
- return
+ return NewErrCommunication("inter-adapter-send-failed", log.Fields{
+ "onu-indicator": onuInd,
+ "source": "openolt",
+ "device-type": onuDevice.Type,
+ "device-id": onuDevice.Id}, err).Log()
}
- } else {
- log.Warnw("Not-implemented-or-invalid-value-of-oper-state", log.Fields{"operState": onuInd.OperState})
+ default:
+ return NewErrInvalidValue(log.Fields{"oper-state": onuInd.OperState}, nil).Log()
}
+ return nil
}
-func (dh *DeviceHandler) updateOnuAdminState(onuInd *oop.OnuIndication) {
- if onuInd.AdminState == "down" {
+func (dh *DeviceHandler) updateOnuAdminState(onuInd *oop.OnuIndication) error {
+ switch onuInd.AdminState {
+ case "down":
if onuInd.OperState != "down" {
log.Errorw("ONU-admin-state-down-and-oper-status-not-down", log.Fields{"operState": onuInd.OperState})
// Forcing the oper state change code to execute
onuInd.OperState = "down"
}
// Port and logical port update is taken care of by oper state block
- } else if onuInd.AdminState == "up" {
+ case "up":
log.Debugln("received-onu-admin-state up")
- } else {
- log.Errorw("Invalid-or-not-implemented-admin-state", log.Fields{"received-admin-state": onuInd.AdminState})
+ default:
+ return NewErrInvalidValue(log.Fields{"admin-state": onuInd.AdminState}, nil).Log()
}
- log.Debugln("admin-state-dealt-with")
+ return nil
}
func (dh *DeviceHandler) stringifySerialNumber(serialNum *oop.SerialNumber) string {
@@ -1079,42 +1151,45 @@
//UpdateFlowsBulk upates the bulk flow
func (dh *DeviceHandler) UpdateFlowsBulk() error {
- return errors.New("unimplemented")
+ return ErrNotImplemented
}
//GetChildDevice returns the child device for given parent port and onu id
-func (dh *DeviceHandler) GetChildDevice(parentPort, onuID uint32) *voltha.Device {
+func (dh *DeviceHandler) GetChildDevice(parentPort, onuID uint32) (*voltha.Device, error) {
log.Debugw("GetChildDevice", log.Fields{"pon port": parentPort, "onuID": onuID})
kwargs := make(map[string]interface{})
kwargs["onu_id"] = onuID
kwargs["parent_port_no"] = parentPort
onuDevice, err := dh.coreProxy.GetChildDevice(context.TODO(), dh.device.Id, kwargs)
if err != nil {
- log.Errorw("onu not found", log.Fields{"intfID": parentPort, "onuID": onuID})
- return nil
+ return nil, NewErrNotFound("onu", log.Fields{
+ "interface-id": parentPort,
+ "onu-id": onuID}, err).Log()
}
log.Debugw("Successfully received child device from core", log.Fields{"child_device": *onuDevice})
- return onuDevice
+ return onuDevice, nil
}
// SendPacketInToCore sends packet-in to core
// For this, it calls SendPacketIn of the core-proxy which uses a device specific topic to send the request.
// The adapter handling the device creates a device specific topic
-func (dh *DeviceHandler) SendPacketInToCore(logicalPort uint32, packetPayload []byte) {
+func (dh *DeviceHandler) SendPacketInToCore(logicalPort uint32, packetPayload []byte) error {
log.Debugw("send-packet-in-to-core", log.Fields{
"port": logicalPort,
"packet": hex.EncodeToString(packetPayload),
})
if err := dh.coreProxy.SendPacketIn(context.TODO(), dh.device.Id, logicalPort, packetPayload); err != nil {
- log.Errorw("Error sending packetin to core", log.Fields{
- "error": err,
- "packet": hex.EncodeToString(packetPayload),
- })
- return
+ return NewErrCommunication("packet-send-failed", log.Fields{
+ "source": "adapter",
+ "destination": "core",
+ "device-id": dh.device.Id,
+ "logical-port": logicalPort,
+ "packet": hex.EncodeToString(packetPayload)}, err).Log()
}
log.Debugw("Sent packet-in to core successfully", log.Fields{
"packet": hex.EncodeToString(packetPayload),
})
+ return nil
}
// AddUniPortToOnu adds the uni port to the onu device
@@ -1180,11 +1255,10 @@
if dh.Client != nil {
if _, err := dh.Client.DisableOlt(context.Background(), new(oop.Empty)); err != nil {
if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
- log.Errorw("failed-to-disable-olt ", log.Fields{"err": err, "deviceID": device.Id})
dh.lockDevice.Lock()
dh.adminState = "up"
dh.lockDevice.Unlock()
- return err
+ return NewErrAdapter("olt-disable-failed", log.Fields{"device-id": device.Id}, err).Log()
}
}
}
@@ -1247,11 +1321,10 @@
if _, err := dh.Client.ReenableOlt(context.Background(), new(oop.Empty)); err != nil {
if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
- log.Errorw("Failed to reenable olt ", log.Fields{"err": err})
dh.lockDevice.Lock()
dh.adminState = "down"
dh.lockDevice.Unlock()
- return err
+ return NewErrAdapter("olt-reenable-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
}
}
log.Debug("olt-reenabled")
@@ -1260,16 +1333,17 @@
// Update the all ports state on that device to enable
if err := dh.disableAdminDownPorts(device); err != nil {
- log.Errorw("Error-on-updating-port-status-after-reenabling-olt", log.Fields{"device": device})
- return err
+ return NewErrAdapter("port-status-update-failed-after-olt-reenable", log.Fields{"device": device}, err).Log()
}
//Update the device oper status as ACTIVE
cloned.OperStatus = voltha.OperStatus_ACTIVE
dh.device = cloned
if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
- log.Errorw("error-updating-device-state", log.Fields{"deviceID": device.Id, "error": err})
- return err
+ return NewErrAdapter("state-update-failed", log.Fields{
+ "device-id": device.Id,
+ "connect-status": cloned.ConnectStatus,
+ "oper-status": cloned.OperStatus}, err).Log()
}
log.Debugw("ReEnableDevice-end", log.Fields{"deviceID": device.Id})
@@ -1325,8 +1399,7 @@
//Free the flow-ids for the NNI port
nni, err := dh.resourceMgr.GetNNIFromKVStore(ctx)
if err != nil {
- log.Error("Failed to fetch nni from kv store")
- return err
+ return NewErrPersistence("get", "nni", 0, nil, err).Log()
}
log.Debugw("NNI are ", log.Fields{"nni": nni})
for _, nniIntfID := range nni {
@@ -1338,10 +1411,9 @@
dh.resourceMgr.RemoveResourceMap(ctx, nniIntfID, int32(nniOnuID), int32(nniUniID))
}
if err = dh.resourceMgr.DelNNiFromKVStore(ctx); err != nil {
- log.Error("Failed to clear nni from kv store")
- return err
+ return NewErrPersistence("clear", "nni", 0, nil, err).Log()
}
- return err
+ return nil
}
// DeleteDevice deletes the device instance from openolt handler array. Also clears allocated resource manager resources. Also reboots the OLT hardware!
@@ -1365,8 +1437,9 @@
var onuGemData []rsrcMgr.OnuGemInfo
err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
if err != nil {
- log.Errorw("Failed to get onu info for port ", log.Fields{"ponport": ponPort})
- return err
+ return NewErrNotFound("onu", log.Fields{
+ "device-id": dh.device.Id,
+ "pon-port": ponPort}, err).Log()
}
for _, onu := range onuGemData {
onuID := make([]uint32, 1)
@@ -1413,16 +1486,17 @@
//Reset the state
if dh.Client != nil {
if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
- log.Errorw("Failed-to-reboot-olt ", log.Fields{"deviceID": dh.deviceID, "err": err})
- return err
+ return NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.deviceID}, err).Log()
}
}
cloned := proto.Clone(device).(*voltha.Device)
cloned.OperStatus = voltha.OperStatus_UNKNOWN
cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
if err := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
- log.Errorw("error-updating-device-state", log.Fields{"deviceID": device.Id, "error": err})
- return err
+ return NewErrAdapter("device-state-update-failed", log.Fields{
+ "device-id": device.Id,
+ "connect-status": cloned.ConnectStatus,
+ "oper-status": cloned.OperStatus}, err).Log()
}
return nil
}
@@ -1430,42 +1504,35 @@
//RebootDevice reboots the given device
func (dh *DeviceHandler) RebootDevice(device *voltha.Device) error {
if _, err := dh.Client.Reboot(context.Background(), new(oop.Empty)); err != nil {
- log.Errorw("Failed to reboot olt ", log.Fields{"deviceID": dh.deviceID, "err": err})
- return err
+ return NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.deviceID}, err).Log()
}
-
log.Debugw("rebooted-device-successfully", log.Fields{"deviceID": device.Id})
-
return nil
}
-func (dh *DeviceHandler) handlePacketIndication(ctx context.Context, packetIn *oop.PacketIndication) {
+func (dh *DeviceHandler) handlePacketIndication(ctx context.Context, packetIn *oop.PacketIndication) error {
log.Debugw("Received packet-in", log.Fields{
"packet-indication": *packetIn,
"packet": hex.EncodeToString(packetIn.Pkt),
})
logicalPortNum, err := dh.flowMgr.GetLogicalPortFromPacketIn(ctx, packetIn)
if err != nil {
- log.Errorw("Error getting logical port from packet-in", log.Fields{
- "error": err,
- "packet": hex.EncodeToString(packetIn.Pkt),
- })
- return
+ return NewErrNotFound("logical-port", log.Fields{"packet": hex.EncodeToString(packetIn.Pkt)}, err).Log()
}
log.Debugw("sending packet-in to core", log.Fields{
"logicalPortNum": logicalPortNum,
"packet": hex.EncodeToString(packetIn.Pkt),
})
if err := dh.coreProxy.SendPacketIn(context.TODO(), dh.device.Id, logicalPortNum, packetIn.Pkt); err != nil {
- log.Errorw("Error sending packet-in to core", log.Fields{
- "error": err,
- "packet": hex.EncodeToString(packetIn.Pkt),
- })
- return
+ return NewErrCommunication("send-packet-in", log.Fields{
+ "destination": "core",
+ "source": dh.deviceType,
+ "packet": hex.EncodeToString(packetIn.Pkt)}, err).Log()
}
log.Debugw("Success sending packet-in to core!", log.Fields{
"packet": hex.EncodeToString(packetIn.Pkt),
})
+ return nil
}
// PacketOut sends packet-out from VOLTHA to OLT on the egress port provided
@@ -1523,14 +1590,22 @@
})
if _, err := dh.Client.OnuPacketOut(ctx, &onuPkt); err != nil {
- log.Errorw("Error while sending packet-out to ONU", log.Fields{
- "error": err,
- "packet": hex.EncodeToString(packet.Data),
- })
- return err
+ return NewErrCommunication("packet-out-send", log.Fields{
+ "source": "adapter",
+ "destination": "onu",
+ "egress-port-number": egressPortNo,
+ "interface-id": intfID,
+ "oni-id": onuID,
+ "uni-id": uniID,
+ "gem-port-id": gemPortID,
+ "packet": hex.EncodeToString(packet.Data)}, err).Log()
}
} else if egressPortType == voltha.Port_ETHERNET_NNI {
- uplinkPkt := oop.UplinkPacket{IntfId: IntfIDFromNniPortNum(uint32(egressPortNo)), Pkt: packet.Data}
+ nniIntfID, err := IntfIDFromNniPortNum(uint32(egressPortNo))
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"egress-nni-port": egressPortNo}, err).Log()
+ }
+ uplinkPkt := oop.UplinkPacket{IntfId: nniIntfID, Pkt: packet.Data}
log.Debugw("sending-packet-to-nni", log.Fields{
"uplink_pkt": uplinkPkt,
@@ -1538,11 +1613,7 @@
})
if _, err := dh.Client.UplinkPacketOut(ctx, &uplinkPkt); err != nil {
- log.Errorw("Error while sending packet-out to NNI", log.Fields{
- "error": err,
- "packet": hex.EncodeToString(packet.Data),
- })
- return err
+ return NewErrCommunication("packet-out-to-nni", log.Fields{"packet": hex.EncodeToString(packet.Data)}, err).Log()
}
} else {
log.Warnw("Packet-out-to-this-interface-type-not-implemented", log.Fields{
@@ -1602,8 +1673,7 @@
go dh.notifyChildDevices("unreachable")
if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
- log.Errorw("error-updating-device-state", log.Fields{"deviceID": dh.device.Id, "error": err})
- return
+ NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
}
@@ -1627,7 +1697,9 @@
// Bug is opened for VOL-2505 to support NNI disable feature.
log.Infow("voltha-supports-single-nni-hence-disable-of-nni-not-allowed",
log.Fields{"Device": dh.device, "port": port})
- return fmt.Errorf("received-disable-enable-nni-port-request, received-port %s", port.GetType())
+ return NewErrAdapter("illegal-port-request", log.Fields{
+ "port-type": port.GetType,
+ "enable-state": enablePort}, nil).Log()
}
// fetch interfaceid from PortNo
ponID := PortNoToIntfID(port.GetPortNo(), voltha.Port_PON_OLT)
@@ -1638,8 +1710,9 @@
out, err := dh.Client.EnablePonIf(ctx, ponIntf)
if err != nil {
- log.Errorw("error-while-enable-Pon-port", log.Fields{"DeviceID": dh.device, "Port": port, "error": err})
- return err
+ return NewErrAdapter("pon-port-enable-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "port": port}, err).Log()
}
// updating interface local cache for collecting stats
dh.activePorts.Store(ponID, true)
@@ -1648,16 +1721,18 @@
operStatus = voltha.OperStatus_UNKNOWN
out, err := dh.Client.DisablePonIf(ctx, ponIntf)
if err != nil {
- log.Errorw("error-while-disabling-interface", log.Fields{"DeviceID": dh.device, "Port": port})
- return err
+ return NewErrAdapter("pon-port-disable-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "port": port}, err).Log()
}
// updating interface local cache for collecting stats
dh.activePorts.Store(ponID, false)
log.Infow("disabled-pon-port", log.Fields{"out": out, "DeviceID": dh.device, "Port": port})
}
- if errs := dh.coreProxy.PortStateUpdate(ctx, dh.deviceID, voltha.Port_PON_OLT, port.PortNo, operStatus); errs != nil {
- log.Errorw("portstate-update-failed", log.Fields{"Device": dh.deviceID, "port": port.PortNo, "error": errs})
- return errs
+ if err := dh.coreProxy.PortStateUpdate(ctx, dh.deviceID, voltha.Port_PON_OLT, port.PortNo, operStatus); err != nil {
+ return NewErrAdapter("port-state-update-failed", log.Fields{
+ "device-id": dh.deviceID,
+ "port": port.PortNo}, err).Log()
}
return nil
}
@@ -1670,8 +1745,9 @@
for _, port := range cloned.Ports {
if port.AdminState == common.AdminState_DISABLED {
if err := dh.DisablePort(port); err != nil {
- log.Errorw("error-occurred-while-disabling-port", log.Fields{"DeviceId": dh.deviceID, "port": port, "error": err})
- return err
+ return NewErrAdapter("port-disable-failed", log.Fields{
+ "device-id": dh.deviceID,
+ "port": port}, err).Log()
}
}
}
diff --git a/adaptercore/device_handler_test.go b/adaptercore/device_handler_test.go
index aacc930..fbb224d 100644
--- a/adaptercore/device_handler_test.go
+++ b/adaptercore/device_handler_test.go
@@ -275,6 +275,36 @@
}
}
+func sparseCompare(keys []string, spec, target interface{}) bool {
+ if spec == target {
+ return true
+ }
+ if spec == nil || target == nil {
+ return false
+ }
+ typeSpec := reflect.TypeOf(spec)
+ typeTarget := reflect.TypeOf(target)
+ if typeSpec != typeTarget {
+ return false
+ }
+
+ vSpec := reflect.ValueOf(spec)
+ vTarget := reflect.ValueOf(target)
+ if vSpec.Kind() == reflect.Ptr {
+ vSpec = vSpec.Elem()
+ vTarget = vTarget.Elem()
+ }
+
+ for _, key := range keys {
+ fSpec := vSpec.FieldByName(key)
+ fTarget := vTarget.FieldByName(key)
+ if !reflect.DeepEqual(fSpec.Interface(), fTarget.Interface()) {
+ return false
+ }
+ }
+ return true
+}
+
func TestDeviceHandler_GetChildDevice(t *testing.T) {
dh1 := newMockDeviceHandler()
dh2 := negativeDeviceHandler()
@@ -287,49 +317,72 @@
devicehandler *DeviceHandler
args args
want *voltha.Device
+ errType reflect.Type
}{
{"GetChildDevice-1", dh1,
args{parentPort: 1,
onuID: 1},
- &voltha.Device{},
+ &voltha.Device{
+ Id: "1",
+ ParentId: "olt",
+ ParentPortNo: 1,
+ },
+ nil,
},
{"GetChildDevice-2", dh2,
args{parentPort: 1,
onuID: 1},
- &voltha.Device{},
+ nil,
+ reflect.TypeOf(&ErrNotFound{}),
},
}
+
+ /*
+ --- FAIL: TestDeviceHandler_GetChildDevice/GetChildDevice-1 (0.00s)
+ device_handler_test.go:309: GetportLabel() => want=(, <nil>) got=(id:"1" parent_id:"olt" parent_port_no:1 proxy_address:<channel_id:1 channel_group_id:1 onu_id:1 > oper_status:ACTIVE connect_status:UNREACHABLE ports:<port_no:1 label:"pon" > ports:<port_no:2 label:"uni" > pm_configs:<id:"olt" default_freq:10 > , <nil>)
+ --- FAIL: TestDeviceHandler_GetChildDevice/GetChildDevice-2 (0.00s)
+ */
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got := tt.devicehandler.GetChildDevice(tt.args.parentPort, tt.args.onuID)
+ got, err := tt.devicehandler.GetChildDevice(tt.args.parentPort, tt.args.onuID)
+ if reflect.TypeOf(err) != tt.errType || !sparseCompare([]string{"Id", "ParentId", "ParentPortNo"}, tt.want, got) {
+ t.Errorf("GetportLabel() => want=(%v, %v) got=(%v, %v)",
+ tt.want, tt.errType, got, reflect.TypeOf(err))
+ return
+ }
t.Log("onu device id", got)
})
}
}
func TestGetportLabel(t *testing.T) {
+ invalid := reflect.TypeOf(&ErrInvalidValue{})
type args struct {
portNum uint32
portType voltha.Port_PortType
}
tests := []struct {
- name string
- args args
- want string
+ name string
+ args args
+ want string
+ errType reflect.Type
}{
- {"GetportLabel-1", args{portNum: 0, portType: 0}, ""},
- {"GetportLabel-2", args{portNum: 1, portType: 1}, "nni-1"},
- {"GetportLabel-3", args{portNum: 2, portType: 2}, ""},
- {"GetportLabel-4", args{portNum: 3, portType: 3}, "pon-3"},
- {"GetportLabel-5", args{portNum: 4, portType: 4}, ""},
- {"GetportLabel-6", args{portNum: 5, portType: 5}, ""},
- {"GetportLabel-7", args{portNum: 6, portType: 6}, ""},
+ {"GetportLabel-1", args{portNum: 0, portType: 0}, "", invalid},
+ {"GetportLabel-2", args{portNum: 1, portType: 1}, "nni-1", nil},
+ {"GetportLabel-3", args{portNum: 2, portType: 2}, "", invalid},
+ {"GetportLabel-4", args{portNum: 3, portType: 3}, "pon-3", nil},
+ {"GetportLabel-5", args{portNum: 4, portType: 4}, "", invalid},
+ {"GetportLabel-6", args{portNum: 5, portType: 5}, "", invalid},
+ {"GetportLabel-7", args{portNum: 6, portType: 6}, "", invalid},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if got := GetportLabel(tt.args.portNum, tt.args.portType); got != tt.want {
- t.Errorf("GetportLabel() = %v, want %v", got, tt.want)
+ got, err := GetportLabel(tt.args.portNum, tt.args.portType)
+ if reflect.TypeOf(err) != tt.errType || got != tt.want {
+ t.Errorf("GetportLabel() => want=(%v, %v) got=(%v, %v)",
+ tt.want, tt.errType, got, reflect.TypeOf(err))
}
+
})
}
}
@@ -366,80 +419,86 @@
type args struct {
msg *ic.InterAdapterMessage
}
+ invalid := reflect.TypeOf(&ErrInvalidValue{})
tests := []struct {
name string
args args
- wantErr bool
+ wantErr reflect.Type
}{
{"ProcessInterAdapterMessage-1", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 0,
+ Type: ic.InterAdapterMessageType_FLOW_REQUEST,
},
Body: marshalledData,
- }}, false},
+ }}, invalid},
{"ProcessInterAdapterMessage-2", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 1,
+ Type: ic.InterAdapterMessageType_FLOW_RESPONSE,
},
Body: marshalledData1,
- }}, false},
+ }}, invalid},
{"ProcessInterAdapterMessage-3", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 2,
+ Type: ic.InterAdapterMessageType_OMCI_REQUEST,
},
Body: marshalledData,
- }}, false},
+ }}, reflect.TypeOf(&ErrCommunication{})},
{"ProcessInterAdapterMessage-4", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 3,
+ Type: ic.InterAdapterMessageType_OMCI_RESPONSE,
}, Body: marshalledData,
- }}, false},
+ }}, invalid},
{"ProcessInterAdapterMessage-5", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 4,
+ Type: ic.InterAdapterMessageType_METRICS_REQUEST,
}, Body: marshalledData1,
- }}, false},
+ }}, invalid},
{"ProcessInterAdapterMessage-6", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 4,
+ Type: ic.InterAdapterMessageType_METRICS_RESPONSE,
}, Body: marshalledData,
- }}, false},
+ }}, invalid},
{"ProcessInterAdapterMessage-7", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 5,
+ Type: ic.InterAdapterMessageType_ONU_IND_REQUEST,
}, Body: marshalledData,
- }}, false},
+ }}, invalid},
{"ProcessInterAdapterMessage-8", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 6,
+ Type: ic.InterAdapterMessageType_ONU_IND_RESPONSE,
}, Body: marshalledData,
- }}, false},
+ }}, invalid},
{"ProcessInterAdapterMessage-9", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 7,
+ Type: ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST,
}, Body: marshalledData,
- }}, false},
+ }}, invalid},
{"ProcessInterAdapterMessage-10", args{msg: &ic.InterAdapterMessage{
Header: &ic.InterAdapterHeader{
Id: "012345",
- Type: 7,
+ Type: ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST,
}, Body: marshalledData2,
- }}, false},
- //marshalledData2
+ }}, invalid},
+ {"ProcessInterAdapterMessage-11", args{msg: &ic.InterAdapterMessage{
+ Header: &ic.InterAdapterHeader{
+ Id: "012345",
+ Type: ic.InterAdapterMessageType_DELETE_TCONT_REQUEST,
+ }, Body: marshalledData2,
+ }}, invalid},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := dh.ProcessInterAdapterMessage(tt.args.msg); (err != nil) != tt.wantErr {
+ if err := dh.ProcessInterAdapterMessage(tt.args.msg); reflect.TypeOf(err) != tt.wantErr {
t.Errorf("DeviceHandler.ProcessInterAdapterMessage() error = %v, wantErr %v", err, tt.wantErr)
}
})
diff --git a/adaptercore/error.go b/adaptercore/error.go
new file mode 100644
index 0000000..b696529
--- /dev/null
+++ b/adaptercore/error.go
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package adaptercore provides the utility for olt devices, flows and statistics
+package adaptercore
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "strings"
+)
+
+const (
+ defaultLogAndReturnLevel = log.DebugLevel
+)
+
+func copy(src log.Fields) log.Fields {
+ dst := make(log.Fields)
+ for k, v := range src {
+ dst[k] = v
+ }
+ return dst
+}
+
+func merge(one, two log.Fields) log.Fields {
+ dst := make(log.Fields)
+ for k, v := range one {
+ dst[k] = v
+ }
+ for k, v := range two {
+ dst[k] = v
+ }
+ return dst
+}
+
+// LoggableError defined functions that can be used to log an object
+type LoggableError interface {
+ error
+ Log() error
+ LogAt(int) error
+}
+
+// ErrAdapter represents a basic adapter error that combines an name, field set
+// and wrapped error
+type ErrAdapter struct {
+ name string
+ fields log.Fields
+ wrapped error
+}
+
+// NewErrAdapter constructs a new error with the given values
+func NewErrAdapter(name string, fields log.Fields, wrapped error) LoggableError {
+ return &ErrAdapter{
+ name: name,
+ fields: copy(fields),
+ wrapped: wrapped,
+ }
+}
+
+// Name returns the error name
+func (e *ErrAdapter) Name() string {
+ return e.name
+}
+
+// Fields returns the fields associated with the error
+func (e *ErrAdapter) Fields() log.Fields {
+ return e.fields
+}
+
+// Unwrap returns the wrapped or nested error
+func (e *ErrAdapter) Unwrap() error {
+ return e.wrapped
+}
+
+// Error returns a string representation of the error
+func (e *ErrAdapter) Error() string {
+ var buf strings.Builder
+ buf.WriteString(e.name)
+ if len(e.fields) > 0 {
+ if val, err := json.Marshal(e.fields); err == nil {
+ buf.WriteString(": [")
+ buf.WriteString(string(val))
+ buf.WriteString("]")
+ }
+ }
+ if e.wrapped != nil {
+ buf.WriteString(": ")
+ buf.WriteString(e.wrapped.Error())
+ }
+ return buf.String()
+}
+
+// Log logs the error at the default level for log and return
+func (e *ErrAdapter) Log() error {
+ return e.LogAt(defaultLogAndReturnLevel)
+}
+
+// LogAt logs the error at the specified level and then returns the error
+func (e *ErrAdapter) LogAt(level int) error {
+ logger := log.Debugw
+ switch level {
+ case log.InfoLevel:
+ logger = log.Infow
+ case log.WarnLevel:
+ logger = log.Warnw
+ case log.ErrorLevel:
+ logger = log.Errorw
+ case log.FatalLevel:
+ logger = log.Fatalw
+ }
+ local := e.fields
+ if e.wrapped != nil {
+ local = merge(e.fields, log.Fields{"wrapped": e.wrapped})
+ }
+ logger(e.name, local)
+ return e
+}
+
+// ErrInvalidValue represents an error condition with given value is not able to
+// be processed
+type ErrInvalidValue struct {
+ ErrAdapter
+}
+
+// NewErrInvalidValue constructs a new error based on the given values
+func NewErrInvalidValue(fields log.Fields, wrapped error) LoggableError {
+ return &ErrInvalidValue{
+ ErrAdapter{
+ name: "invalid-value",
+ fields: copy(fields),
+ wrapped: wrapped,
+ },
+ }
+}
+
+// Log logs the error at the default level for log and return
+func (e *ErrInvalidValue) Log() error {
+ _ = e.ErrAdapter.Log()
+ return e
+}
+
+// LogAt logs the error at the specified level and then returns the error
+func (e *ErrInvalidValue) LogAt(level int) error {
+ _ = e.ErrAdapter.LogAt(level)
+ return e
+}
+
+// ErrNotFound represents an error condition when a value can not be located
+// given a field set of criteria
+type ErrNotFound struct {
+ ErrAdapter
+}
+
+// NewErrNotFound constructs a new error based on the given values
+func NewErrNotFound(target string, fields log.Fields, wrapped error) LoggableError {
+ return &ErrNotFound{
+ ErrAdapter{
+ name: "not-found",
+ fields: merge(fields, log.Fields{"target": target}),
+ wrapped: wrapped,
+ },
+ }
+}
+
+// Log logs the error at the default level for log and return
+func (e *ErrNotFound) Log() error {
+ _ = e.ErrAdapter.Log()
+ return e
+}
+
+// LogAt logs the error at the specified level and then returns the error
+func (e *ErrNotFound) LogAt(level int) error {
+ _ = e.ErrAdapter.LogAt(level)
+ return e
+}
+
+// ErrPersistence representation an error condition when a persistence operation
+// did not succeed
+type ErrPersistence struct {
+ ErrAdapter
+}
+
+// NewErrPersistence constructs a new error based on the given values
+func NewErrPersistence(operation, entityType string, ID uint32, fields log.Fields, wrapped error) LoggableError {
+ return &ErrPersistence{
+ ErrAdapter{
+ name: "unable-to-persist",
+ fields: merge(fields, log.Fields{
+ "operation": operation,
+ "entity-type": entityType,
+ "id": fmt.Sprintf("0x%x", ID)}),
+ wrapped: wrapped,
+ },
+ }
+}
+
+// Log logs the error at the default level for log and return
+func (e *ErrPersistence) Log() error {
+ _ = e.ErrAdapter.Log()
+ return e
+}
+
+// LogAt logs the error at the specified level and then returns the error
+func (e *ErrPersistence) LogAt(level int) error {
+ _ = e.ErrAdapter.LogAt(level)
+ return e
+}
+
+// ErrCommunication representation an error condition when an interprocess
+// message communication fails
+type ErrCommunication struct {
+ ErrAdapter
+}
+
+// NewErrCommunication constructs a new error based on the given values
+func NewErrCommunication(operation string, fields log.Fields, wrapped error) LoggableError {
+ return &ErrCommunication{
+ ErrAdapter{
+ name: "failed-communication",
+ fields: merge(fields, log.Fields{
+ "operation": operation}),
+ wrapped: wrapped,
+ },
+ }
+}
+
+// Log logs the error at the default level for log and return
+func (e *ErrCommunication) Log() error {
+ _ = e.ErrAdapter.Log()
+ return e
+}
+
+// LogAt logs the error at the specified level and then returns the error
+func (e *ErrCommunication) LogAt(level int) error {
+ _ = e.ErrAdapter.LogAt(level)
+ return e
+}
+
+// ErrFlowOp represents an error condition when a flow operation to a device did
+// not succeed
+type ErrFlowOp struct {
+ ErrAdapter
+}
+
+// NewErrFlowOp constructs a new error based on the given values
+func NewErrFlowOp(operation string, ID uint32, fields log.Fields, wrapped error) LoggableError {
+ return &ErrPersistence{
+ ErrAdapter{
+ name: "unable-to-perform-flow-operation",
+ fields: merge(fields, log.Fields{
+ "operation": operation,
+ "id": fmt.Sprintf("0x%x", ID)}),
+ wrapped: wrapped,
+ },
+ }
+}
+
+// Log logs the error at the default level for log and return
+func (e *ErrFlowOp) Log() error {
+ _ = e.ErrAdapter.Log()
+ return e
+}
+
+// LogAt logs the error at the specified level and then returns the error
+func (e *ErrFlowOp) LogAt(level int) error {
+ _ = e.ErrAdapter.LogAt(level)
+ return e
+}
+
+// ErrTimeout represents an error condition when the deadline for performing an
+// operation has been exceeded
+type ErrTimeout struct {
+ ErrAdapter
+}
+
+// NewErrTimeout constructs a new error based on the given values
+func NewErrTimeout(operation string, fields log.Fields, wrapped error) LoggableError {
+ return &ErrTimeout{
+ ErrAdapter{
+ name: "operation-timed-out",
+ fields: merge(fields, log.Fields{"operation": operation}),
+ wrapped: wrapped,
+ },
+ }
+}
+
+// Log logs the error at the default level for log and return
+func (e *ErrTimeout) Log() error {
+ _ = e.ErrAdapter.Log()
+ return e
+}
+
+// LogAt logs the error at the specified level and then returns the error
+func (e *ErrTimeout) LogAt(level int) error {
+ _ = e.ErrAdapter.LogAt(level)
+ return e
+}
+
+var (
+ // ErrNotImplemented error returned when an unimplemented method is
+ // invoked
+ ErrNotImplemented = NewErrAdapter("not-implemented", nil, nil)
+
+ // ErrInvalidPortRange error returned when a given port is not in the
+ // valid range
+ ErrInvalidPortRange = NewErrAdapter("invalid-port-range", nil, nil)
+
+ // ErrStateTransition error returned when a state transition is fails
+ ErrStateTransition = NewErrAdapter("state-transition", nil, nil)
+
+ // ErrResourceManagerInstantiating error returned when an unexpected
+ // condition occcurs while instantiating the resource manager
+ ErrResourceManagerInstantiating = NewErrAdapter("resoure-manager-instantiating", nil, nil)
+)
diff --git a/adaptercore/olt_platform.go b/adaptercore/olt_platform.go
index efc0029..bca0584 100644
--- a/adaptercore/olt_platform.go
+++ b/adaptercore/olt_platform.go
@@ -18,8 +18,6 @@
package adaptercore
import (
- "errors"
-
"github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
@@ -107,6 +105,12 @@
ponIntfMarkerPos = 28
//Value of marker used to distinguish PON port type of OF port
ponIntfMarkerValue = 0x2
+ // Number of bits for NNI ID
+ bitsforNNIID = 20
+ // minNniIntPortNum is used to store start range of nni port number (1 << 20) 1048576
+ minNniIntPortNum = (1 << bitsforNNIID)
+ // maxNniPortNum is used to store the maximum range of nni port number ((1 << 21)-1) 2097151
+ maxNniPortNum = ((1 << (bitsforNNIID + 1)) - 1)
)
//MinUpstreamPortID value
@@ -164,8 +168,12 @@
}
//IntfIDFromNniPortNum returns Intf ID derived from portNum
-func IntfIDFromNniPortNum(portNum uint32) uint32 {
- return portNum & 0xFFFF
+func IntfIDFromNniPortNum(portNum uint32) (uint32, error) {
+ if portNum < minNniIntPortNum || portNum > maxNniPortNum {
+ log.Errorw("NNIPortNumber is not in valid range", log.Fields{"portNum": portNum})
+ return uint32(0), ErrInvalidPortRange
+ }
+ return (portNum & 0xFFFF), nil
}
//IntfIDToPortTypeName returns port type derived from the intfId
@@ -252,7 +260,8 @@
}
if uniPortNo == 0 {
- return 0, 0, 0, 0, 0, 0, errors.New("failed to extract Pon Interface, ONU Id and Uni Id from flow")
+ return 0, 0, 0, 0, 0, 0, NewErrNotFound("pon-interface", log.Fields{
+ "flow-direction": flowDirection}, nil)
}
ponIntf = IntfIDFromUniPortNum(uniPortNo)
diff --git a/adaptercore/olt_platform_test.go b/adaptercore/olt_platform_test.go
index 141e8bd..536e13a 100644
--- a/adaptercore/olt_platform_test.go
+++ b/adaptercore/olt_platform_test.go
@@ -165,22 +165,31 @@
}
tests := []struct {
- name string
- args args
- want uint32
+ name string
+ args args
+ want uint32
+ wantErr error
}{
// TODO: Add test cases.
- {"IntfIDFromNniPortNum-1", args{portNum: 8081}, 8081},
- {"IntfIDFromNniPortNum-2", args{portNum: 9090}, 9090},
- {"IntfIDFromNniPortNum-3", args{portNum: 0}, 0},
- {"IntfIDFromNniPortNum-3", args{portNum: 65535}, 65535},
+ {"IntfIDFromNniPortNum-01", args{portNum: 8081}, 0, ErrInvalidPortRange},
+ {"IntfIDFromNniPortNum-02", args{portNum: 9090}, 0, ErrInvalidPortRange},
+ {"IntfIDFromNniPortNum-03", args{portNum: 0}, 0, ErrInvalidPortRange},
+ {"IntfIDFromNniPortNum-04", args{portNum: 65535}, 0, ErrInvalidPortRange},
+ {"IntfIDFromNniPortNum-05", args{portNum: 1048575}, 0, ErrInvalidPortRange},
+ {"IntfIDFromNniPortNum-06", args{portNum: 1048576}, 0, nil},
+ {"IntfIDFromNniPortNum-07", args{portNum: 1048577}, 1, nil},
+ {"IntfIDFromNniPortNum-08", args{portNum: 1048578}, 2, nil},
+ {"IntfIDFromNniPortNum-09", args{portNum: 1048579}, 3, nil},
+ {"IntfIDFromNniPortNum-10", args{portNum: 2097150}, 65534, nil},
+ {"IntfIDFromNniPortNum-11", args{portNum: 2097151}, 65535, nil},
+ {"IntfIDFromNniPortNum-12", args{portNum: 3000000}, 0, ErrInvalidPortRange},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if got := IntfIDFromNniPortNum(tt.args.portNum); got != tt.want {
- t.Errorf("IntfIDFromNniPortNum() = %v, want %v", got, tt.want)
- } else {
- t.Logf("Expected %v , Actual %v \n", tt.want, got)
+ got, err := IntfIDFromNniPortNum(tt.args.portNum)
+ if got != tt.want || err != tt.wantErr {
+ t.Errorf("IntfIDFromNniPortNum(): FOR[%v] WANT[%v and %v] GOT[%v and %v]",
+ tt.args.portNum, tt.want, tt.wantErr, got, err)
}
})
}
diff --git a/adaptercore/olt_state_transitions_test.go b/adaptercore/olt_state_transitions_test.go
index 6c256a1..c79aeb0 100644
--- a/adaptercore/olt_state_transitions_test.go
+++ b/adaptercore/olt_state_transitions_test.go
@@ -18,7 +18,6 @@
import (
"context"
- "errors"
"reflect"
"testing"
"time"
@@ -48,7 +47,7 @@
after: []TransitionHandler{func(ctx context.Context) error {
return nil
}, func(ctx context.Context) error {
- return errors.New("transition error")
+ return ErrStateTransition
}},
}
transitions[GrpcConnected] = transition
@@ -66,7 +65,7 @@
before: []TransitionHandler{func(ctx context.Context) error {
return nil
}, func(ctx context.Context) error {
- return errors.New("transition error")
+ return ErrStateTransition
}},
}
transitions[GrpcConnected] = transition
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index 15e8aae..62e34c9 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -19,8 +19,6 @@
import (
"context"
- "errors"
- "fmt"
"sync"
"time"
@@ -142,8 +140,7 @@
func (oo *OpenOLT) Adopt_device(device *voltha.Device) error {
ctx := context.Background()
if device == nil {
- log.Warn("device-is-nil")
- return errors.New("nil-device")
+ return NewErrInvalidValue(log.Fields{"device": nil}, nil).Log()
}
log.Infow("adopt-device", log.Fields{"deviceId": device.Id})
var handler *DeviceHandler
@@ -163,8 +160,7 @@
if handler := oo.getDeviceHandler(device.Id); handler != nil {
return handler.GetOfpDeviceInfo(device)
}
- log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
- return nil, errors.New("device-handler-not-set")
+ return nil, NewErrNotFound("device-handler", log.Fields{"device-id": device.Id}, nil).Log()
}
//Get_ofp_port_info returns OFP port information for the given device
@@ -173,8 +169,7 @@
if handler := oo.getDeviceHandler(device.Id); handler != nil {
return handler.GetOfpPortInfo(device, portNo)
}
- log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
- return nil, errors.New("device-handler-not-set")
+ return nil, NewErrNotFound("device-handler", log.Fields{"device-id": device.Id}, nil).Log()
}
//Process_inter_adapter_message sends messages to a target device (between adapters)
@@ -188,30 +183,29 @@
if handler := oo.getDeviceHandler(targetDevice); handler != nil {
return handler.ProcessInterAdapterMessage(msg)
}
- return fmt.Errorf(fmt.Sprintf("handler-not-found-%s", targetDevice))
+ return NewErrNotFound("device-handler", log.Fields{"device-id": targetDevice}, nil).Log()
}
//Adapter_descriptor not implemented
func (oo *OpenOLT) Adapter_descriptor() error {
- return errors.New("unImplemented")
+ return ErrNotImplemented
}
//Device_types unimplemented
func (oo *OpenOLT) Device_types() (*voltha.DeviceTypes, error) {
- return nil, errors.New("unImplemented")
+ return nil, ErrNotImplemented
}
//Health returns unimplemented
func (oo *OpenOLT) Health() (*voltha.HealthStatus, error) {
- return nil, errors.New("unImplemented")
+ return nil, ErrNotImplemented
}
//Reconcile_device unimplemented
func (oo *OpenOLT) Reconcile_device(device *voltha.Device) error {
ctx := context.Background()
if device == nil {
- log.Warn("device-is-nil")
- return errors.New("nil-device")
+ return NewErrInvalidValue(log.Fields{"device": nil}, nil).Log()
}
log.Infow("reconcile-device", log.Fields{"deviceId": device.Id})
var handler *DeviceHandler
@@ -226,7 +220,7 @@
//Abandon_device unimplemented
func (oo *OpenOLT) Abandon_device(device *voltha.Device) error {
- return errors.New("unImplemented")
+ return ErrNotImplemented
}
//Disable_device disables the given device
@@ -235,8 +229,7 @@
if handler := oo.getDeviceHandler(device.Id); handler != nil {
return handler.DisableDevice(device)
}
- log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
- return errors.New("device-handler-not-found")
+ return NewErrNotFound("device-handler", log.Fields{"device-id": device.Id}, nil).Log()
}
//Reenable_device enables the olt device after disable
@@ -245,8 +238,7 @@
if handler := oo.getDeviceHandler(device.Id); handler != nil {
return handler.ReenableDevice(device)
}
- log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
- return errors.New("device-handler-not-found")
+ return NewErrNotFound("device-handler", log.Fields{"device-id": device.Id}, nil).Log()
}
//Reboot_device reboots the given device
@@ -255,14 +247,12 @@
if handler := oo.getDeviceHandler(device.Id); handler != nil {
return handler.RebootDevice(device)
}
- log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
- return errors.New("device-handler-not-found")
-
+ return NewErrNotFound("device-handler", log.Fields{"device-id": device.Id}, nil).Log()
}
//Self_test_device unimplented
func (oo *OpenOLT) Self_test_device(device *voltha.Device) error {
- return errors.New("unImplemented")
+ return ErrNotImplemented
}
//Delete_device unimplemented
@@ -276,18 +266,17 @@
oo.deleteDeviceHandlerToMap(handler)
return nil
}
- log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
- return errors.New("device-handler-not-found")
+ return NewErrNotFound("device-handler", log.Fields{"device-id": device.Id}, nil).Log()
}
//Get_device_details unimplemented
func (oo *OpenOLT) Get_device_details(device *voltha.Device) error {
- return errors.New("unImplemented")
+ return ErrNotImplemented
}
//Update_flows_bulk returns
func (oo *OpenOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error {
- return errors.New("unImplemented")
+ return ErrNotImplemented
}
//Update_flows_incrementally updates (add/remove) the flows on a given device
@@ -297,13 +286,12 @@
if handler := oo.getDeviceHandler(device.Id); handler != nil {
return handler.UpdateFlowsIncrementally(ctx, device, flows, groups, flowMetadata)
}
- log.Errorw("Update_flows_incrementally failed-device-handler-not-set", log.Fields{"deviceId": device.Id})
- return errors.New("device-handler-not-set")
+ return NewErrNotFound("device-handler", log.Fields{"device-id": device.Id}, nil).Log()
}
//Update_pm_config returns PmConfigs nil or error
func (oo *OpenOLT) Update_pm_config(device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
- return errors.New("unImplemented")
+ return ErrNotImplemented
}
//Receive_packet_out sends packet out to the device
@@ -313,43 +301,42 @@
if handler := oo.getDeviceHandler(deviceID); handler != nil {
return handler.PacketOut(ctx, egressPortNo, packet)
}
- log.Errorw("Receive_packet_out failed-device-handler-not-set", log.Fields{"deviceId": deviceID, "egressport": egressPortNo, "packet": packet})
- return errors.New("device-handler-not-set")
+ return NewErrNotFound("device-handler", log.Fields{"device-id": deviceID}, nil).Log()
}
//Suppress_event unimplemented
func (oo *OpenOLT) Suppress_event(filter *voltha.EventFilter) error {
- return errors.New("unImplemented")
+ return ErrNotImplemented
}
//Unsuppress_event unimplemented
func (oo *OpenOLT) Unsuppress_event(filter *voltha.EventFilter) error {
- return errors.New("unImplemented")
+ return ErrNotImplemented
}
//Download_image unimplemented
func (oo *OpenOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- return nil, errors.New("unImplemented")
+ return nil, ErrNotImplemented
}
//Get_image_download_status unimplemented
func (oo *OpenOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- return nil, errors.New("unImplemented")
+ return nil, ErrNotImplemented
}
//Cancel_image_download unimplemented
func (oo *OpenOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- return nil, errors.New("unImplemented")
+ return nil, ErrNotImplemented
}
//Activate_image_update unimplemented
func (oo *OpenOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- return nil, errors.New("unImplemented")
+ return nil, ErrNotImplemented
}
//Revert_image_update unimplemented
func (oo *OpenOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- return nil, errors.New("unImplemented")
+ return nil, ErrNotImplemented
}
// Enable_port to Enable PON/NNI interface
@@ -368,8 +355,10 @@
func (oo *OpenOLT) enableDisablePort(deviceID string, port *voltha.Port, enablePort bool) error {
log.Infow("enableDisablePort", log.Fields{"deviceId": deviceID, "port": port})
if port == nil {
- log.Errorw("port-cannot-be-nil", log.Fields{"Device": deviceID, "port": port})
- return errors.New("sent-port-cannot-be-nil")
+ return NewErrInvalidValue(log.Fields{
+ "reason": "port cannot be nil",
+ "device-id": deviceID,
+ "port": nil}, nil).Log()
}
if handler := oo.getDeviceHandler(deviceID); handler != nil {
log.Debugw("Enable_Disable_Port", log.Fields{"deviceId": deviceID, "port": port})
diff --git a/adaptercore/openolt_eventmgr.go b/adaptercore/openolt_eventmgr.go
index 1ab684f..b5650ed 100644
--- a/adaptercore/openolt_eventmgr.go
+++ b/adaptercore/openolt_eventmgr.go
@@ -94,40 +94,33 @@
}
// ProcessEvents is function to process and publish OpenOLT event
-func (em *OpenOltEventMgr) ProcessEvents(alarmInd *oop.AlarmIndication, deviceID string, raisedTs int64) {
+func (em *OpenOltEventMgr) ProcessEvents(alarmInd *oop.AlarmIndication, deviceID string, raisedTs int64) error {
var err error
switch alarmInd.Data.(type) {
case *oop.AlarmIndication_LosInd:
log.Infow("Received LOS indication", log.Fields{"alarm_ind": alarmInd})
err = em.oltLosIndication(alarmInd.GetLosInd(), deviceID, raisedTs)
-
case *oop.AlarmIndication_OnuAlarmInd:
log.Infow("Received onu alarm indication ", log.Fields{"alarm_ind": alarmInd})
err = em.onuAlarmIndication(alarmInd.GetOnuAlarmInd(), deviceID, raisedTs)
-
case *oop.AlarmIndication_DyingGaspInd:
log.Infow("Received dying gasp indication", log.Fields{"alarm_ind": alarmInd})
err = em.onuDyingGaspIndication(alarmInd.GetDyingGaspInd(), deviceID, raisedTs)
case *oop.AlarmIndication_OnuActivationFailInd:
log.Infow("Received onu activation fail indication ", log.Fields{"alarm_ind": alarmInd})
err = em.onuActivationFailIndication(alarmInd.GetOnuActivationFailInd(), deviceID, raisedTs)
-
case *oop.AlarmIndication_OnuLossOmciInd:
log.Infow("Received onu loss omci indication ", log.Fields{"alarm_ind": alarmInd})
err = em.onuLossOmciIndication(alarmInd.GetOnuLossOmciInd(), deviceID, raisedTs)
-
case *oop.AlarmIndication_OnuDriftOfWindowInd:
log.Infow("Received onu drift of window indication ", log.Fields{"alarm_ind": alarmInd})
err = em.onuDriftOfWindowIndication(alarmInd.GetOnuDriftOfWindowInd(), deviceID, raisedTs)
-
case *oop.AlarmIndication_OnuSignalDegradeInd:
log.Infow("Received onu signal degrade indication ", log.Fields{"alarm_ind": alarmInd})
err = em.onuSignalDegradeIndication(alarmInd.GetOnuSignalDegradeInd(), deviceID, raisedTs)
-
case *oop.AlarmIndication_OnuSignalsFailInd:
log.Infow("Received onu signal fail indication ", log.Fields{"alarm_ind": alarmInd})
err = em.onuSignalsFailIndication(alarmInd.GetOnuSignalsFailInd(), deviceID, raisedTs)
-
case *oop.AlarmIndication_OnuStartupFailInd:
log.Infow("Received onu startup fail indication ", log.Fields{"alarm_ind": alarmInd})
err = em.onuStartupFailedIndication(alarmInd.GetOnuStartupFailInd(), deviceID, raisedTs)
@@ -141,12 +134,12 @@
log.Infow("Received onu Itu Pon Stats indication ", log.Fields{"alarm_ind": alarmInd})
log.Infow("Not implemented yet", log.Fields{"alarm_ind": alarmInd})
default:
- log.Errorw("Received unknown indication type", log.Fields{"alarm_ind": alarmInd})
-
+ err = NewErrInvalidValue(log.Fields{"indication-type": alarmInd}, nil)
}
if err != nil {
- log.Errorw("Failed to publish message to KAFKA", log.Fields{"error": err})
+ return NewErrCommunication("publish-message", log.Fields{"indication-type": alarmInd}, err).Log()
}
+ return nil
}
// oltUpDownIndication handles Up and Down state of an OLT
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index c817455..5a4225a 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -22,7 +22,6 @@
"crypto/md5"
"encoding/hex"
"encoding/json"
- "errors"
"fmt"
"math/big"
"sync"
@@ -268,8 +267,7 @@
log.Debug("multicast flow, shifting id")
return 0x2<<15 | uint64(flowID), nil
} else {
- log.Debug("Unrecognized direction")
- return 0, fmt.Errorf("unrecognized direction %s", direction)
+ return 0, NewErrInvalidValue(log.Fields{"direction": direction}, nil).Log()
}
}
@@ -362,8 +360,10 @@
log.Debug("Scheduler already created for upstream")
return nil
}
- log.Errorw("Dynamic meter update not supported", log.Fields{"KvStoreMeterId": KvStoreMeter.MeterId, "MeterID-in-flow": sq.meterID})
- return errors.New("invalid-meter-id-in-flow")
+ return NewErrInvalidValue(log.Fields{
+ "unsupported": "meter-id",
+ "kv-store-meter-id": KvStoreMeter.MeterId,
+ "meter-id-in-flow": sq.meterID}, nil).Log()
}
log.Debugw("Meter-does-not-exist-Creating-new", log.Fields{"MeterID": sq.meterID, "Direction": Direction})
@@ -392,12 +392,17 @@
log.Error("Flow-metadata-is-not-present-in-flow")
}
if meterConfig == nil {
- log.Errorw("Could-not-get-meterbands-from-flowMetadata", log.Fields{"flowMetadata": sq.flowMetadata,
- "MeterID": sq.meterID})
- return errors.New("failed-to-get-meter-from-flowMetadata")
+ return NewErrNotFound("meterbands", log.Fields{
+ "reason": "Could-not-get-meterbands-from-flowMetadata",
+ "flow-metadata": sq.flowMetadata,
+ "meter-id": sq.meterID}, nil).Log()
} else if len(meterConfig.Bands) < MaxMeterBand {
log.Errorw("Invalid-number-of-bands-in-meter", log.Fields{"Bands": meterConfig.Bands, "MeterID": sq.meterID})
- return errors.New("invalid-number-of-bands-in-meter")
+ return NewErrInvalidValue(log.Fields{
+ "reason": "Invalid-number-of-bands-in-meter",
+ "meterband-count": len(meterConfig.Bands),
+ "metabands": meterConfig.Bands,
+ "meter-id": sq.meterID}, nil).Log()
}
cir := meterConfig.Bands[0].Rate
cbs := meterConfig.Bands[0].BurstSize
@@ -652,9 +657,10 @@
}
//Make sure we have as many tech_profiles as there are pon ports on the device
if tpCount != int(f.resourceMgr.DevInfo.GetPonPorts()) {
- log.Errorw("Error while populating techprofile",
- log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
- return errors.New("error while populating techprofile mgrs")
+ return NewErrInvalidValue(log.Fields{
+ "reason": "TP count does not match number of PON ports",
+ "tech-profile-count": tpCount,
+ "pon-port-count": f.resourceMgr.DevInfo.GetPonPorts()}, nil).Log()
}
log.Infow("Populated techprofile for ponports successfully",
log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
@@ -664,10 +670,10 @@
func (f *OpenOltFlowMgr) addUpstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, uplinkClassifier map[string]interface{},
uplinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
- allocID uint32, gemportID uint32) {
+ allocID uint32, gemportID uint32) error {
uplinkClassifier[PacketTagType] = SingleTag
log.Debugw("Adding upstream data flow", log.Fields{"uplinkClassifier": uplinkClassifier, "uplinkAction": uplinkAction})
- f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
+ return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
Upstream, logicalFlow, allocID, gemportID)
/* TODO: Install Secondary EAP on the subscriber vlan */
}
@@ -675,7 +681,7 @@
func (f *OpenOltFlowMgr) addDownstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, downlinkClassifier map[string]interface{},
downlinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
- allocID uint32, gemportID uint32) {
+ allocID uint32, gemportID uint32) error {
downlinkClassifier[PacketTagType] = DoubleTag
log.Debugw("Adding downstream data flow", log.Fields{"downlinkClassifier": downlinkClassifier,
"downlinkAction": downlinkAction})
@@ -685,7 +691,7 @@
if metadata, exists := downlinkClassifier[Metadata]; exists { // inport is filled in metadata by core
if uint32(metadata.(uint64)) == MkUniPortNum(intfID, onuID, uniID) {
log.Infow("Ignoring DL trap device flow from core", log.Fields{"flow": logicalFlow})
- return
+ return nil
}
}
}
@@ -698,18 +704,18 @@
if ok {
downlinkAction[VlanVid] = dlClVid & 0xfff
} else {
- log.Error("dl-classifier-vid-type-conversion-failed")
- return
+ return NewErrInvalidValue(log.Fields{
+ "reason": "failed to convert VLANID classifier",
+ "vlan-id": VlanVid}, nil).Log()
}
- f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
+ return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
Downstream, logicalFlow, allocID, gemportID)
}
func (f *OpenOltFlowMgr) addHSIAFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
- allocID uint32, gemPortID uint32) {
- var networkIntfID uint32
+ allocID uint32, gemPortID uint32) error {
/* One of the OLT platform (Broadcom BAL) requires that symmetric
flows require the same flow_id to be used across UL and DL.
Since HSIA flow is the only symmetric flow currently, we need to
@@ -724,33 +730,35 @@
if _, ok := classifier[VlanPcp]; ok {
vlanPbit = classifier[VlanPcp].(uint32)
log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
+ } else {
+ log.Debugw("bpit-not-found-in-flow", log.Fields{"vlan-pcp": VlanPcp})
}
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("Flow-exists--not-re-adding")
- return
+ log.Debug("flow-already-exists")
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
if err != nil {
- log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
- return
+ return NewErrNotFound("hsia-flow-id", log.Fields{"direction": direction}, err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for hsia flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Errorw("Error in making action protobuf for hsia flow", log.Fields{"direction": direction})
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
log.Debugw("Created action proto", log.Fields{"action": *actionProto})
- networkIntfID, err = getNniIntfID(classifier, action)
+ networkIntfID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id",
+ log.Fields{
+ "classifier": classifier,
+ "action": action,
+ }, err).Log()
}
flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
@@ -765,29 +773,28 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
- log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
- flow.OnuId,
- flow.UniId,
- flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
- log.Errorw("Error uploading HSIA flow into KV store", log.Fields{"flow": flow, "direction": direction, "error": err})
- return
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
+ return NewErrFlowOp("add", flowID, nil, err).Log()
}
+ log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
+ flow.OnuId,
+ flow.UniId,
+ flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": flow}, err).Log()
+ }
+ return nil
}
-func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) error {
- var dhcpFlow openoltpb2.Flow
- var actionProto *openoltpb2.Action
- var classifierProto *openoltpb2.Classifier
- var flowID uint32
networkIntfID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
// Clear the action map
@@ -804,29 +811,32 @@
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
- return
+ return nil
}
- flowID, err = f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
- log.Errorw("flowId unavailable for UL DHCP", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
- return
+ return NewErrNotFound("flow", log.Fields{
+ "interface-id": intfID,
+ "gem-port": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
log.Debugw("Creating UL DHCP flow", log.Fields{"ul_classifier": classifier, "ul_action": action, "uplinkFlowId": flowID})
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for ul flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Error("Error in making action protobuf for ul flow")
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
- dhcpFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+ dhcpFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
FlowId: flowID,
@@ -840,39 +850,37 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); ok {
- log.Debug("DHCP UL flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
- dhcpFlow.OnuId,
- dhcpFlow.UniId,
- dhcpFlow.FlowId, flowsToKVStore); err != nil {
- log.Errorw("Error uploading DHCP UL flow into KV store", log.Fields{"flow": dhcpFlow, "error": err})
- return
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"dhcp-flow": dhcpFlow}, err).Log()
+ }
+ log.Debug("DHCP UL flow added to device successfully")
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
+ dhcpFlow.OnuId,
+ dhcpFlow.UniId,
+ dhcpFlow.FlowId, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", dhcpFlow.FlowId, log.Fields{"flow": dhcpFlow}, err).Log()
}
- return
+ return nil
}
//addIGMPTrapFlow creates IGMP trap-to-host flow
func (f *OpenOltFlowMgr) addIGMPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
- action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
- f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
+ action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) error {
+ return f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
}
//addUpstreamTrapFlow creates a trap-to-host flow
func (f *OpenOltFlowMgr) addUpstreamTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
- action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string) {
-
- var flow openoltpb2.Flow
- var actionProto *openoltpb2.Action
- var classifierProto *openoltpb2.Classifier
+ action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string) error {
networkIntfID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
// Clear the action map
@@ -886,30 +894,34 @@
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("Flow-exists--not-re-adding")
- return
+ log.Debug("Flow-exists-not-re-adding")
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
- log.Errorw("flowId unavailable for upstream trap flow", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie, "flowType": flowType})
- return
+ return NewErrNotFound("flow-id", log.Fields{
+ "interface-id": intfID,
+ "oni-id": onuID,
+ "cookie": flowStoreCookie,
+ "flow-type": flowType},
+ err).Log()
}
log.Debugw("Creating upstream trap flow", log.Fields{"ul_classifier": classifier, "ul_action": action, "uplinkFlowId": flowID, "flowType": flowType})
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for ul flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Error("Error in making action protobuf for ul flow")
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
- flow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+ flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
FlowId: flowID,
@@ -923,32 +935,29 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
- log.Debugf("%s UL flow added to device successfully", flowType)
+ if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": flow}, err).Log()
+ }
+ log.Debugf("%s UL flow added to device successfully", flowType)
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
- flow.OnuId,
- flow.UniId,
- flow.FlowId, flowsToKVStore); err != nil {
- log.Errorw("Error uploading UL flow into KV store", log.Fields{"flow": flow, "error": err})
- return
- }
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
+ flow.OnuId,
+ flow.UniId,
+ flow.FlowId, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flow.FlowId, log.Fields{"flow": flow}, err).Log()
}
- return
+ return nil
}
// Add EAPOL flow to device with mac, vlanId as classifier for upstream and downstream
-func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
+func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) error {
log.Debugw("Adding EAPOL to device", log.Fields{"intfId": intfID, "onuId": onuID, "portNo": portNo, "allocId": allocID, "gemPortId": gemPortID, "vlanId": vlanID, "flow": logicalFlow})
uplinkClassifier := make(map[string]interface{})
uplinkAction := make(map[string]interface{})
- var upstreamFlow openoltpb2.Flow
- var networkIntfID uint32
-
// Fill Classfier
uplinkClassifier[EthType] = uint32(EapEthType)
uplinkClassifier[PacketTagType] = SingleTag
@@ -957,36 +966,39 @@
uplinkAction[TrapToHost] = true
flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("Flow-exists--not-re-adding")
- return
+ log.Debug("Flow-exists-not-re-adding")
+ return nil
}
//Add Uplink EAPOL Flow
uplinkFlowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
if err != nil {
- log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
- return
+ return NewErrNotFound("flow-id", log.Fields{
+ "interface-id": intfID,
+ "onu-id": onuID,
+ "coookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
log.Debugw("Creating UL EAPOL flow", log.Fields{"ul_classifier": uplinkClassifier, "ul_action": uplinkAction, "uplinkFlowId": uplinkFlowID})
- if classifierProto = makeOpenOltClassifierField(uplinkClassifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for ul flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(uplinkClassifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": uplinkClassifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(uplinkAction); actionProto == nil {
- log.Error("Error in making action protobuf for ul flow")
- return
+ actionProto, err := makeOpenOltActionField(uplinkAction)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": uplinkAction}, err).Log()
}
log.Debugw("Created action proto", log.Fields{"action": *actionProto})
- networkIntfID, err = getNniIntfID(classifier, action)
+ networkIntfID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
- upstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+ upstreamFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
FlowId: uplinkFlowID,
@@ -999,25 +1011,26 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); ok {
- log.Debug("EAPOL UL flow added to device successfully")
- flowCategory := "EAPOL"
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
- upstreamFlow.OnuId,
- upstreamFlow.UniId,
- upstreamFlow.FlowId,
- /* lowCategory, */
- flowsToKVStore); err != nil {
- log.Errorw("Error uploading EAPOL UL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
- return
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); err != nil {
+ return NewErrFlowOp("add", uplinkFlowID, log.Fields{"flow": upstreamFlow}, err).Log()
+ }
+ log.Debug("EAPOL UL flow added to device successfully")
+ flowCategory := "EAPOL"
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
+ upstreamFlow.OnuId,
+ upstreamFlow.UniId,
+ upstreamFlow.FlowId,
+ /* lowCategory, */
+ flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", upstreamFlow.FlowId, log.Fields{"flow": upstreamFlow}, err).Log()
}
log.Debugw("Added EAPOL flows to device successfully", log.Fields{"flow": logicalFlow})
+ return nil
}
-func makeOpenOltClassifierField(classifierInfo map[string]interface{}) *openoltpb2.Classifier {
+func makeOpenOltClassifierField(classifierInfo map[string]interface{}) (*openoltpb2.Classifier, error) {
var classifier openoltpb2.Classifier
classifier.EthType, _ = classifierInfo[EthType].(uint32)
@@ -1054,14 +1067,13 @@
case DoubleTag:
case Untagged:
default:
- log.Error("Invalid tag type in classifier") // should not hit
- return nil
+ return nil, NewErrInvalidValue(log.Fields{"packet-tag-type": pktTagType}, nil).Log()
}
}
- return &classifier
+ return &classifier, nil
}
-func makeOpenOltActionField(actionInfo map[string]interface{}) *openoltpb2.Action {
+func makeOpenOltActionField(actionInfo map[string]interface{}) (*openoltpb2.Action, error) {
var actionCmd openoltpb2.ActionCmd
var action openoltpb2.Action
action.Cmd = &actionCmd
@@ -1074,10 +1086,9 @@
} else if _, ok := actionInfo[TrapToHost]; ok {
action.Cmd.TrapToHost = actionInfo[TrapToHost].(bool)
} else {
- log.Errorw("Invalid-action-field", log.Fields{"action": actionInfo})
- return nil
+ return nil, NewErrInvalidValue(log.Fields{"action-command": actionInfo}, nil).Log()
}
- return &action
+ return &action, nil
}
func (f *OpenOltFlowMgr) getTPpath(intfID uint32, uni string, TpID uint32) string {
@@ -1194,7 +1205,7 @@
return nil
}
-func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) bool {
+func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
var intfID uint32
/* For flows which trap out of the NNI, the AccessIntfId is invalid
@@ -1213,37 +1224,37 @@
st, _ := status.FromError(err)
if st.Code() == codes.AlreadyExists {
log.Debug("Flow already exists", log.Fields{"err": err, "deviceFlow": deviceFlow})
- return true
+ return nil
}
if err != nil {
log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": deviceFlow})
f.resourceMgr.FreeFlowID(ctx, intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
- return false
+ return err
}
if deviceFlow.GemportId != -1 {
// No need to register the flow if it is a trap on nni flow.
f.registerFlow(ctx, logicalFlow, deviceFlow)
}
log.Debugw("Flow added to device successfully ", log.Fields{"flow": *deviceFlow})
- return true
+ return nil
}
-func (f *OpenOltFlowMgr) removeFlowFromDevice(deviceFlow *openoltpb2.Flow) bool {
+func (f *OpenOltFlowMgr) removeFlowFromDevice(deviceFlow *openoltpb2.Flow) error {
log.Debugw("Sending flow to device via grpc", log.Fields{"flow": *deviceFlow})
_, err := f.deviceHandler.Client.FlowRemove(context.Background(), deviceFlow)
if err != nil {
if f.deviceHandler.device.ConnectStatus == common.ConnectStatus_UNREACHABLE {
log.Warnw("Can not remove flow from device since it's unreachable", log.Fields{"err": err, "deviceFlow": deviceFlow})
//Assume the flow is removed
- return true
+ return nil
}
log.Errorw("Failed to Remove flow from device", log.Fields{"err": err, "deviceFlow": deviceFlow})
- return false
+ return err
}
log.Debugw("Flow removed from device successfully ", log.Fields{"flow": *deviceFlow})
- return true
+ return nil
}
/*func register_flow(deviceFlow *openolt_pb2.Flow, logicalFlow *ofp.OfpFlowStats){
@@ -1266,7 +1277,7 @@
*/
-func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) {
+func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) error {
classifierInfo := make(map[string]interface{})
actionInfo := make(map[string]interface{})
@@ -1291,28 +1302,34 @@
var uniID = -1
var gemPortID = -1
- var networkInterfaceID = IntfIDFromNniPortNum(portNo)
+ networkInterfaceID, err := IntfIDFromNniPortNum(portNo)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"nni-port-number": portNo}, err).Log()
+ }
var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
- return
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
if err != nil {
- log.Errorw("Flow id unavailable for LLDP traponNNI flow", log.Fields{"error": err})
- return
+ return NewErrNotFound("flow-id", log.Fields{
+ "interface-id": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "gem-port-id": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
- if classifierProto = makeOpenOltClassifierField(classifierInfo); classifierProto == nil {
- log.Error("Error in making classifier protobuf for LLDP trap on nni flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifierInfo)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(actionInfo); actionProto == nil {
- log.Error("Error in making action protobuf for LLDP trap on nni flow")
- return
+ actionProto, err := makeOpenOltActionField(actionInfo)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": actionInfo}, err).Log()
}
log.Debugw("Created action proto", log.Fields{"action": *actionProto})
@@ -1328,17 +1345,18 @@
Priority: int32(flow.Priority),
Cookie: flow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, flow, &downstreamflow); ok {
- log.Debug("LLDP trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- log.Errorw("Error uploading LLDP flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
- }
+ if err := f.addFlowToDevice(ctx, flow, &downstreamflow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err).Log()
}
- return
+ log.Debug("LLDP trap on NNI flow added to device successfully")
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+ int32(onuID),
+ int32(uniID),
+ flowID, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err).Log()
+ }
+ return nil
}
func getUniPortPath(intfID uint32, onuID int32, uniID int32) string {
@@ -1349,10 +1367,12 @@
func (f *OpenOltFlowMgr) getOnuChildDevice(intfID uint32, onuID uint32) (*voltha.Device, error) {
log.Debugw("GetChildDevice", log.Fields{"pon port": intfID, "onuId": onuID})
parentPortNo := IntfIDToPortNo(intfID, voltha.Port_PON_OLT)
- onuDevice := f.deviceHandler.GetChildDevice(parentPortNo, onuID)
- if onuDevice == nil {
- log.Errorw("onu not found", log.Fields{"intfId": parentPortNo, "onuId": onuID})
- return nil, errors.New("onu not found")
+ onuDevice, err := f.deviceHandler.GetChildDevice(parentPortNo, onuID)
+ if err != nil {
+ return nil, NewErrNotFound("onu", log.Fields{
+ "interface-id": parentPortNo,
+ "onu-id": onuID},
+ err).Log()
}
log.Debugw("Successfully received child device from core", log.Fields{"child_device": *onuDevice})
return onuDevice, nil
@@ -1575,6 +1595,7 @@
return nil
}
+// nolint: gocyclo
func (f *OpenOltFlowMgr) clearFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) {
log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
@@ -1585,8 +1606,6 @@
}
var updatedFlows []rsrcMgr.FlowInfo
- var flowID uint32
- var onuID, uniID int32
classifierInfo := make(map[string]interface{})
portNum, Intf, onu, uni, inPort, ethType, err := FlowExtractInfo(flow, flowDirection)
@@ -1595,8 +1614,8 @@
return
}
- onuID = int32(onu)
- uniID = int32(uni)
+ onuID := int32(onu)
+ uniID := int32(uni)
for _, field := range flows.GetOfbFields(flow) {
if field.Type == flows.IP_PROTO {
@@ -1611,10 +1630,17 @@
onuID = -1
uniID = -1
log.Debug("Trap on nni flow set oni, uni to -1")
- Intf = IntfIDFromNniPortNum(inPort)
+ Intf, err = IntfIDFromNniPortNum(inPort)
+ if err != nil {
+ log.Errorw("invalid-in-port-number",
+ log.Fields{
+ "port-number": inPort,
+ "error": err})
+ return
+ }
}
flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, Intf, onuID, uniID)
- for _, flowID = range flowIds {
+ for _, flowID := range flowIds {
flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flowID)
if flowInfo == nil {
log.Debugw("No FlowInfo found found in KV store",
@@ -1630,18 +1656,17 @@
if flow.Id == storedFlow.LogicalFlowID {
removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
log.Debugw("Flow to be deleted", log.Fields{"flow": storedFlow})
- if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
- log.Debug("Flow removed from device successfully")
- //Remove the Flow from FlowInfo
- updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
- flowID, flowDirection, portNum, updatedFlows)
- if err != nil {
- log.Error("Failed to clear resources for flow", log.Fields{"flow": storedFlow})
- return
- }
- } else {
- log.Error("Failed to remove flow from device")
+ // DKB
+ if err = f.removeFlowFromDevice(&removeFlowMessage); err != nil {
+ log.Errorw("failed-to-remove-flow", log.Fields{"error": err})
+ return
+ }
+ log.Debug("Flow removed from device successfully")
+ //Remove the Flow from FlowInfo
+ updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
+ flowID, flowDirection, portNum, updatedFlows); err != nil {
+ log.Error("Failed to clear resources for flow", log.Fields{"flow": storedFlow})
return
}
}
@@ -1661,7 +1686,15 @@
return
}
- networkInterfaceID := IntfIDFromNniPortNum(inPort)
+ networkInterfaceID, err := IntfIDFromNniPortNum(inPort)
+ if err != nil {
+ // DKB
+ log.Errorw("invalid-in-port-number",
+ log.Fields{
+ "port-number": inPort,
+ "error": err})
+ return
+ }
var onuID = int32(NoneOnuID)
var uniID = int32(NoneUniID)
var flowID uint32
@@ -1685,8 +1718,12 @@
removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
log.Debugw("Multicast flow to be deleted", log.Fields{"flow": storedFlow})
//remove from device
- if ok := f.removeFlowFromDevice(&removeFlowMessage); !ok {
- log.Errorw("Failed to remove multicast flow from device", log.Fields{"flowId": flow.Id})
+ if err := f.removeFlowFromDevice(&removeFlowMessage); err != nil {
+ // DKB
+ log.Errorw("failed-to-remove-multicast-flow",
+ log.Fields{
+ "flow-id": flow.Id,
+ "error": err})
return
}
log.Debugw("Multicast flow removed from device successfully", log.Fields{"flowId": flow.Id})
@@ -1863,14 +1900,13 @@
}
// handleFlowWithGroup adds multicast flow to the device.
-func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
classifierInfo[PacketTagType] = DoubleTag
log.Debugw("add-multicast-flow", log.Fields{"classifierInfo": classifierInfo, "actionInfo": actionInfo})
inPort, err := f.getInPortOfMulticastFlow(ctx, classifierInfo)
if err != nil {
- log.Warnw("No inPort found. Ignoring multicast flow.", log.Fields{"flowId:": flow.Id})
- return
+ return NewErrNotFound("multicast-in-port", log.Fields{"classifier": classifierInfo}, err).Log()
}
//replace ipDst with ethDst
if ipv4Dst, ok := classifierInfo[Ipv4Dst]; ok &&
@@ -1883,26 +1919,33 @@
log.Debugw("multicast-ip-to-mac-conversion-success", log.Fields{"ip:": ipv4Dst.(uint32), "mac:": multicastMac})
}
- var onuID = NoneOnuID
- var uniID = NoneUniID
- var gemPortID = NoneGemPortID
+ onuID := NoneOnuID
+ uniID := NoneUniID
+ gemPortID := NoneGemPortID
- networkInterfaceID := IntfIDFromNniPortNum(inPort)
+ networkInterfaceID, err := IntfIDFromNniPortNum(inPort)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"nni-in-port-number": inPort}, err).Log()
+ }
- var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
+ flowStoreCookie := getFlowStoreCookie(classifierInfo, uint32(0))
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debugw("multicast-flow-exists--not-re-adding", log.Fields{"classifierInfo": classifierInfo})
- return
+ log.Debugw("multicast-flow-exists-not-re-adding", log.Fields{"classifierInfo": classifierInfo})
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
if err != nil {
- log.Errorw("Flow id unavailable for multicast flow", log.Fields{"error": err})
- return
+ return NewErrNotFound("multicast-flow-id", log.Fields{
+ "interface-id": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "gem-port-id": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- if classifierProto = makeOpenOltClassifierField(classifierInfo); classifierProto == nil {
- log.Error("Error in making classifier protobuf for multicast flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifierInfo)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err).Log()
}
groupID := actionInfo[GroupID].(uint32)
multicastFlow := openoltpb2.Flow{
@@ -1914,27 +1957,28 @@
Priority: int32(flow.Priority),
Cookie: flow.Cookie}
- if ok := f.addFlowToDevice(ctx, flow, &multicastFlow); ok {
- log.Debug("multicast flow added to device successfully")
- //get cached group
- group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
- if err == nil {
- //calling groupAdd to set group members after multicast flow creation
- if f.ModifyGroup(ctx, group) {
- //cached group can be removed now
- f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
- }
- }
-
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- log.Errorw("Error uploading multicast flow into KV store", log.Fields{"flow": multicastFlow, "error": err})
+ if err = f.addFlowToDevice(ctx, flow, &multicastFlow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": multicastFlow}, err).Log()
+ }
+ log.Debug("multicast flow added to device successfully")
+ //get cached group
+ group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
+ if err == nil {
+ //calling groupAdd to set group members after multicast flow creation
+ if f.ModifyGroup(ctx, group) {
+ //cached group can be removed now
+ f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
}
}
- return
+
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
+ if err = f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+ int32(onuID),
+ int32(uniID),
+ flowID, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": multicastFlow}, err).Log()
+ }
+ return nil
}
//getInPortOfMulticastFlow return inPort criterion if exists; returns NNI interface of the device otherwise
@@ -1947,7 +1991,7 @@
if e == nil && len(nniPorts) > 0 {
return nniPorts[0], nil
}
- return 0, errors.New("cannot find NNI port of device")
+ return 0, NewErrNotFound("nni-port", nil, e).Log()
}
// AddGroup add or update the group
@@ -2236,8 +2280,11 @@
}
}
}
- log.Errorw("onuid is not found", log.Fields{"serialNumber": serialNumber, "intfId": intfID, "gemPort": gemPortID})
- return uint32(0), errors.New("key error, onuid is not found") // ONU ID 0 is not a valid one
+ return uint32(0), NewErrNotFound("onu-id", log.Fields{
+ "serial-number": serialNumber,
+ "interface-id": intfID,
+ "gem-port-id": gemPortID},
+ nil).Log()
}
//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
@@ -2302,10 +2349,10 @@
func installFlowOnAllGemports(ctx context.Context,
f1 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32,
portNo uint32, classifier map[string]interface{}, action map[string]interface{},
- logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32),
+ logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32) error,
f2 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32, portNo uint32,
logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32,
- classifier map[string]interface{}, action map[string]interface{}),
+ classifier map[string]interface{}, action map[string]interface{}) error,
args map[string]uint32,
classifier map[string]interface{}, action map[string]interface{},
logicalFlow *ofp.OfpFlowStats,
@@ -2325,13 +2372,11 @@
}
}
-func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) error {
log.Debug("Adding trap-dhcp-of-nni-flow")
action := make(map[string]interface{})
classifier[PacketTagType] = DoubleTag
action[TrapToHost] = true
- var err error
- var networkInterfaceID uint32
/* We manage flowId resource pool on per PON port basis.
Since this situation is tricky, as a hack, we pass the NNI port
index (network_intf_id) as PON port Index for the flowId resource
@@ -2347,32 +2392,37 @@
uniID := -1
gemPortID := -1
allocID := -1
- networkInterfaceID, err = getNniIntfID(classifier, action)
+ networkInterfaceID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-intreface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("Flow-exists--not-re-adding")
- return
+ log.Debug("Flow-exists-not-re-adding")
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
if err != nil {
- log.Errorw("Flow id unavailable for DHCP traponNNI flow", log.Fields{"error": err})
- return
+ return NewErrNotFound("dhcp-trap-nni-flow-id", log.Fields{
+ "interface-id": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "gem-port-id": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for dhcp trap on nni flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Error("Error in making action protobuf for dhcp trap on nni flow")
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
log.Debugw("Created action proto", log.Fields{"action": *actionProto})
downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
@@ -2388,17 +2438,18 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
- log.Debug("DHCP trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- log.Errorw("Error uploading DHCP DL flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err).Log()
}
- return
+ log.Debug("DHCP trap on NNI flow added to device successfully")
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+ int32(onuID),
+ int32(uniID),
+ flowID, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err).Log()
+ }
+ return nil
}
//getPacketTypeFromClassifiers finds and returns packet type of a flow by checking flow classifiers
@@ -2428,7 +2479,7 @@
}
//addIgmpTrapFlowOnNNI adds a trap-to-host flow on NNI
-func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) error {
log.Debugw("Adding igmp-trap-of-nni-flow", log.Fields{"classifierInfo": classifier})
action := make(map[string]interface{})
classifier[PacketTagType] = getPacketTypeFromClassifiers(classifier)
@@ -2450,29 +2501,34 @@
allocID := -1
networkInterfaceID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("igmp-flow-exists--not-re-adding")
- return
+ log.Debug("igmp-flow-exists-not-re-adding")
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
if err != nil {
- log.Errorw("IGMP flow id unavailable for trap-on-NNI flow", log.Fields{"error": err})
- return
+ return NewErrNotFound("igmp-flow-id", log.Fields{
+ "interface-id": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "gem-port-id": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for igmp trap on nni flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto for the IGMP flow", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Error("Error in making action protobuf for IGMP trap on nni flow")
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
log.Debugw("Created action proto for the IGMP flow", log.Fields{"action": *actionProto})
downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
@@ -2488,23 +2544,23 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
- log.Debug("IGMP Trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- log.Errorw("Error uploading igmp-trap-on-nni flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err).Log()
}
- return
+ log.Debug("IGMP Trap on NNI flow added to device successfully")
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+ int32(onuID),
+ int32(uniID),
+ flowID, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err).Log()
+ }
+ return nil
}
func verifyMeterIDAndGetDirection(MeterID uint32, Dir tp_pb.Direction) (string, error) {
if MeterID == 0 { // This should never happen
- log.Error("Invalid meter id")
- return "", errors.New("invalid meter id")
+ return "", NewErrInvalidValue(log.Fields{"meter-id": MeterID}, nil).Log()
}
if Dir == tp_pb.Direction_UPSTREAM {
return "upstream", nil
@@ -2699,8 +2755,7 @@
actionInfo[Output] = out.GetPort()
log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
} else {
- log.Error("Invalid output port in action")
- return errors.New("invalid output port in action")
+ return NewErrInvalidValue(log.Fields{"output-port": nil}, nil).Log()
}
} else if action.Type == flows.POP_VLAN {
actionInfo[PopVlan] = true
@@ -2720,8 +2775,7 @@
if out := action.GetSetField(); out != nil {
if field := out.GetField(); field != nil {
if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
- log.Errorw("Invalid openflow class", log.Fields{"class": ofClass})
- return errors.New("invalid openflow class")
+ return NewErrInvalidValue(log.Fields{"openflow-class": ofClass}, nil).Log()
}
/*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
formulateSetFieldActionInfoFromFlow(field, actionInfo)
@@ -2730,8 +2784,7 @@
} else if action.Type == flows.GROUP {
formulateGroupActionInfoFromFlow(action, actionInfo)
} else {
- log.Errorw("Un supported action type", log.Fields{"type": action.Type})
- return errors.New("un supported action type")
+ return NewErrInvalidValue(log.Fields{"action-type": action.Type}, nil).Log()
}
}
return nil
@@ -2770,8 +2823,9 @@
classifierInfo[InPort] = uniPort
log.Debugw("upstream pon-to-controller-flow,inport-in-tunnelid", log.Fields{"newInPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[Output].(uint32)})
} else {
- log.Error("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
- return errors.New("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
+ return NewErrNotFound("child-in-port", log.Fields{
+ "reason": "upstream pon-to-controller-flow, NO-inport-in-tunnelid",
+ "flow": flow}, nil).Log()
}
}
} else {
@@ -2782,8 +2836,9 @@
actionInfo[Output] = uniPort
log.Debugw("downstream-nni-to-pon-port-flow, outport-in-tunnelid", log.Fields{"newOutPort": actionInfo[Output].(uint32), "outPort": actionInfo[Output].(uint32)})
} else {
- log.Debug("downstream-nni-to-pon-port-flow, no-outport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[Output].(uint32)})
- return errors.New("downstream-nni-to-pon-port-flow, no-outport-in-tunnelid")
+ return NewErrNotFound("out-port", log.Fields{
+ "reason": "downstream-nni-to-pon-port-flow, no-outport-in-tunnelid",
+ "flow": flow}, nil).Log()
}
// Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
} else if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_PON_OLT {
@@ -2792,9 +2847,11 @@
log.Debugw("upstream-pon-to-nni-port-flow, inport-in-tunnelid", log.Fields{"newInPort": actionInfo[Output].(uint32),
"outport": actionInfo[Output].(uint32)})
} else {
- log.Debug("upstream-pon-to-nni-port-flow, no-inport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32),
- "outPort": actionInfo[Output].(uint32)})
- return errors.New("upstream-pon-to-nni-port-flow, no-inport-in-tunnelid")
+ return NewErrNotFound("nni-port", log.Fields{
+ "reason": "upstream-pon-to-nni-port-flow, no-inport-in-tunnelid",
+ "in-port": classifierInfo[InPort].(uint32),
+ "out-port": actionInfo[Output].(uint32),
+ "flow": flow}, nil).Log()
}
}
}
@@ -2811,8 +2868,7 @@
*/
metadata := flows.GetMetadataFromWriteMetadataAction(flow)
if metadata == 0 {
- log.Error("metadata-is-not-present-in-flow-which-is-mandatory")
- return 0, errors.New("metadata-is-not-present-in-flow-which-is-mandatory")
+ return 0, NewErrNotFound("metadata", log.Fields{"flow": flow}, nil).Log()
}
TpID := flows.GetTechProfileIDFromWriteMetaData(metadata)
return uint32(TpID), nil
@@ -2832,11 +2888,25 @@
portType := IntfIDToPortTypeName(classifier[InPort].(uint32))
if portType == voltha.Port_PON_OLT {
- intfID := IntfIDFromNniPortNum(action[Output].(uint32))
+ intfID, err := IntfIDFromNniPortNum(action[Output].(uint32))
+ if err != nil {
+ log.Debugw("invalid-action-port-number",
+ log.Fields{
+ "port-number": action[Output].(uint32),
+ "error": err})
+ return uint32(0), err
+ }
log.Debugw("output Nni IntfID is", log.Fields{"intfid": intfID})
return intfID, nil
} else if portType == voltha.Port_ETHERNET_NNI {
- intfID := IntfIDFromNniPortNum(classifier[InPort].(uint32))
+ intfID, err := IntfIDFromNniPortNum(classifier[InPort].(uint32))
+ if err != nil {
+ log.Debugw("invalid-classifier-port-number",
+ log.Fields{
+ "port-number": action[Output].(uint32),
+ "error": err})
+ return uint32(0), err
+ }
log.Debugw("input Nni IntfID is", log.Fields{"intfid": intfID})
return intfID, nil
}
@@ -2922,8 +2992,7 @@
func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
if err != nil {
- log.Errorw("Failed to get the flow group from KV store", log.Fields{"groupId": groupID, "err": err})
- return nil, false, errors.New("failed to retrieve the flow group")
+ return nil, false, NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err).Log()
}
if exists {
return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
diff --git a/adaptercore/openolt_test.go b/adaptercore/openolt_test.go
index 0b6b366..940704a 100644
--- a/adaptercore/openolt_test.go
+++ b/adaptercore/openolt_test.go
@@ -28,6 +28,7 @@
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-openolt-adapter/config"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
"github.com/opencord/voltha-protos/v3/go/openflow_13"
@@ -134,15 +135,14 @@
args args
wantErr error
}{
- {"abandon_device-1", &fields{}, args{}, errors.New("unImplemented")},
- {"abandon_device-2", &fields{}, args{}, errors.New("unImplemented")},
- {"abandon_device-3", &fields{}, args{}, errors.New("unImplemented")},
+ {"abandon_device-1", &fields{}, args{}, ErrNotImplemented},
+ {"abandon_device-2", &fields{}, args{}, ErrNotImplemented},
+ {"abandon_device-3", &fields{}, args{}, ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Abandon_device(tt.args.device); (err != nil) && (reflect.TypeOf(err) !=
- reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Abandon_device(tt.args.device); err != tt.wantErr {
t.Errorf("Abandon_device() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -162,17 +162,17 @@
wantErr error
}{
{"activate_image_upate-1", &fields{}, args{}, &voltha.ImageDownload{Id: "Image1-ABC123XYZ"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"activate_image_upate-2", &fields{}, args{}, &voltha.ImageDownload{Id: "Image2-ABC123CDE"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"activate_image_upate-3", &fields{}, args{}, &voltha.ImageDownload{Id: "Image3-ABC123EFG"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
got, err := oo.Activate_image_update(tt.args.device, tt.args.request)
- if (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && (got == nil) {
+ if err != tt.wantErr && got == nil {
t.Errorf("Activate_image_update() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -185,14 +185,14 @@
fields *fields
wantErr error
}{
- {"adapter_descriptor-1", &fields{}, errors.New("unImplemented")},
- {"adapter_descriptor-2", &fields{}, errors.New("unImplemented")},
- {"adapter_descriptor-3", &fields{}, errors.New("unImplemented")},
+ {"adapter_descriptor-1", &fields{}, ErrNotImplemented},
+ {"adapter_descriptor-2", &fields{}, ErrNotImplemented},
+ {"adapter_descriptor-3", &fields{}, ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Adapter_descriptor(); (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Adapter_descriptor(); err != tt.wantErr {
t.Errorf("Adapter_descriptor() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -205,16 +205,16 @@
}
var device = mockDevice()
device.Id = "olt"
+ nilDevice := NewErrInvalidValue(log.Fields{"device": nil}, nil)
tests := []struct {
name string
fields *fields
args args
wantErr error
}{
- {"adopt_device-1", mockOlt(), args{}, errors.New("nil-device")},
- {"adopt_device-2", mockOlt(), args{device}, errors.New("nil-device")},
- {"adopt_device-3", mockOlt(),
- args{mockDevice()}, nil},
+ {"adopt_device-1", mockOlt(), args{}, nilDevice},
+ {"adopt_device-2", mockOlt(), args{device}, nilDevice},
+ {"adopt_device-3", mockOlt(), args{mockDevice()}, nil},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -244,17 +244,17 @@
wantErr error
}{
{"cancel_image_download-1", &fields{}, args{}, &voltha.ImageDownload{Id: "Image1-ABC123XYZ"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"cancel_image_download-2", &fields{}, args{}, &voltha.ImageDownload{Id: "Image2-ABC123IJK"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"cancel_image_download-3", &fields{}, args{}, &voltha.ImageDownload{Id: "Image3-ABC123KLM"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
got, err := oo.Cancel_image_download(tt.args.device, tt.args.request)
- if (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && got == nil {
+ if err != tt.wantErr && got == nil {
t.Errorf("Cancel_image_download() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -271,13 +271,13 @@
args args
wantErr error
}{
- {"delete_device-1", &fields{}, args{mockDevice()}, errors.New("device-handler-not-found")},
+ {"delete_device-1", &fields{}, args{mockDevice()},
+ NewErrNotFound("device-handler", log.Fields{"device-id": "olt"}, nil)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Delete_device(tt.args.device); (err != nil) && (reflect.TypeOf(err) !=
- reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Delete_device(tt.args.device); !reflect.DeepEqual(err, tt.wantErr) {
t.Errorf("Delete_device() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -292,17 +292,17 @@
wantErr error
}{
{"device_types-1", &fields{}, &voltha.DeviceTypes{},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"device_types-2", &fields{}, &voltha.DeviceTypes{},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"device_types-3", &fields{}, &voltha.DeviceTypes{},
- errors.New("unImplemented")},
+ ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
got, err := oo.Device_types()
- if (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && got == nil {
+ if err != tt.wantErr && got == nil {
t.Errorf("Device_types() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -319,13 +319,14 @@
args args
wantErr error
}{
- {"disable_device-1", mockOlt(), args{mockDevice()}, errors.New("device-handler-not-found")},
- {"disable_device-2", &fields{}, args{mockDevice()}, errors.New("device-handler-not-found")},
+ {"disable_device-1", mockOlt(), args{mockDevice()}, nil},
+ {"disable_device-2", &fields{}, args{mockDevice()},
+ NewErrNotFound("device-handler", log.Fields{"device-id": "olt"}, nil)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Disable_device(tt.args.device); (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && err != nil {
+ if err := oo.Disable_device(tt.args.device); !reflect.DeepEqual(err, tt.wantErr) {
t.Errorf("Disable_device() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -345,17 +346,17 @@
wantErr error
}{
{"download_image-1", &fields{}, args{}, &voltha.ImageDownload{Id: "Image1-ABC123XYZ"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"download_image-2", &fields{}, args{}, &voltha.ImageDownload{Id: "Image2-ABC123LKJ"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"download_image-3", &fields{}, args{}, &voltha.ImageDownload{Id: "Image1-ABC123RTY"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
got, err := oo.Download_image(tt.args.device, tt.args.request)
- if (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && got == nil {
+ if err != tt.wantErr && got == nil {
t.Errorf("Download_image() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -372,15 +373,14 @@
args args
wantErr error
}{
- {"get_device_details-1", &fields{}, args{}, errors.New("unImplemented")},
- {"get_device_details-2", &fields{}, args{}, errors.New("unImplemented")},
- {"get_device_details-3", &fields{}, args{}, errors.New("unImplemented")},
+ {"get_device_details-1", &fields{}, args{}, ErrNotImplemented},
+ {"get_device_details-2", &fields{}, args{}, ErrNotImplemented},
+ {"get_device_details-3", &fields{}, args{}, ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Get_device_details(tt.args.device); (err != nil) &&
- (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Get_device_details(tt.args.device); err != tt.wantErr {
t.Errorf("Get_device_details() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -400,18 +400,19 @@
wantErr error
}{
{"get_image_download_status-1", &fields{}, args{}, &voltha.ImageDownload{Id: "Image1-ABC123XYZ"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"get_image_download_status-2", &fields{}, args{}, &voltha.ImageDownload{Id: "Image2-ABC123LKJ"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"get_image_download_status-3", &fields{}, args{}, &voltha.ImageDownload{Id: "Image1-ABC123DFG"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
got, err := oo.Get_image_download_status(tt.args.device, tt.args.request)
- if (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && got == nil {
- t.Errorf("Get_image_download_status() error = %v, wantErr %v", err, tt.wantErr)
+ if err != tt.wantErr && got == nil {
+ t.Errorf("Get_image_download_status() got = %v want = %v error = %v, wantErr %v",
+ got, tt.want, err, tt.wantErr)
}
})
}
@@ -428,20 +429,28 @@
want *ic.SwitchCapability
wantErr error
}{
- {"get_ofp_device_info-1", mockOlt(), args{mockDevice()}, &ic.SwitchCapability{},
- errors.New("device-handler-not-set")},
- {"get_ofp_device_info-2", &fields{}, args{mockDevice()}, &ic.SwitchCapability{},
- errors.New("device-handler-not-set")},
+ {"get_ofp_device_info-1", mockOlt(), args{mockDevice()}, &ic.SwitchCapability{
+ Desc: &openflow_13.OfpDesc{
+ MfrDesc: "VOLTHA Project",
+ HwDesc: "open_pon",
+ SwDesc: "open_pon",
+ },
+ SwitchFeatures: &openflow_13.OfpSwitchFeatures{
+ NBuffers: uint32(256),
+ NTables: uint32(2),
+ Capabilities: uint32(15),
+ },
+ }, nil},
+ {"get_ofp_device_info-2", &fields{}, args{mockDevice()}, nil,
+ NewErrNotFound("device-handler", log.Fields{"device-id": "olt"}, nil)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
got, err := oo.Get_ofp_device_info(tt.args.device)
- if (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && got == nil {
- t.Errorf("Get_ofp_device_info() error = %v, wantErr %v", err, tt.wantErr)
- }
- if (err == nil) && got != nil {
- t.Log("got :", got)
+ if !reflect.DeepEqual(err, tt.wantErr) || !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("Get_ofp_device_info() got = %v want = %v error = %v, wantErr = %v",
+ got, tt.want, err, tt.wantErr)
}
})
}
@@ -459,20 +468,31 @@
want *ic.PortCapability
wantErr error
}{
- {"get_ofp_port_info-1", mockOlt(), args{mockDevice(), 1}, &ic.PortCapability{},
- errors.New("device-handler-not-set")},
- {"get_ofp_port_info-2", &fields{}, args{mockDevice(), 1}, &ic.PortCapability{},
- errors.New("device-handler-not-set")},
+ {"get_ofp_port_info-1", mockOlt(), args{mockDevice(), 1}, &ic.PortCapability{
+ Port: &voltha.LogicalPort{
+ DeviceId: "olt",
+ DevicePortNo: uint32(1),
+ OfpPort: &openflow_13.OfpPort{
+ HwAddr: []uint32{1, 2, 3, 4, 5, 6},
+ State: uint32(4),
+ Curr: uint32(4128),
+ Advertised: uint32(4128),
+ Peer: uint32(4128),
+ CurrSpeed: uint32(32),
+ MaxSpeed: uint32(32),
+ },
+ },
+ }, nil},
+ {"get_ofp_port_info-2", &fields{}, args{mockDevice(), 1}, nil,
+ NewErrNotFound("device-handler", log.Fields{"device-id": "olt"}, nil)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
got, err := oo.Get_ofp_port_info(tt.args.device, tt.args.portNo)
- if (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && got == nil {
- t.Errorf("Get_ofp_port_info() error = %v, wantErr %v", err, tt.wantErr)
- }
- if (err == nil) && got != nil {
- t.Log("got :", got)
+ if !reflect.DeepEqual(err, tt.wantErr) || !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("Get_ofp_port_info() got = %v want = %v error = %v, wantErr = %v",
+ got, tt.want, err, tt.wantErr)
}
})
}
@@ -485,15 +505,15 @@
want *voltha.HealthStatus
wantErr error
}{
- {"health-1", &fields{}, &voltha.HealthStatus{}, errors.New("unImplemented")},
- {"health-2", &fields{}, &voltha.HealthStatus{}, errors.New("unImplemented")},
- {"health-3", &fields{}, &voltha.HealthStatus{}, errors.New("unImplemented")},
+ {"health-1", &fields{}, &voltha.HealthStatus{}, ErrNotImplemented},
+ {"health-2", &fields{}, &voltha.HealthStatus{}, ErrNotImplemented},
+ {"health-3", &fields{}, &voltha.HealthStatus{}, ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
got, err := oo.Health()
- if (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && got == nil {
+ if err != tt.wantErr && got == nil {
t.Errorf("Get_ofp_port_info() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -519,23 +539,39 @@
Id: "olt",
ProxyDeviceId: "olt",
ToDeviceId: "olt",
+ Type: ic.InterAdapterMessageType_OMCI_REQUEST,
+ },
+ },
+ }
+ var message3 = args{
+ msg: &ic.InterAdapterMessage{
+ Header: &ic.InterAdapterHeader{
+ Id: "olt",
+ ProxyDeviceId: "olt",
+ ToDeviceId: "olt",
+ Type: ic.InterAdapterMessageType_FLOW_REQUEST,
},
},
}
tests := []struct {
- name string
- fields *fields
- args args
- wantErr error
+ name string
+ fields *fields
+ args args
+ wantErrType reflect.Type
}{
- {"process_inter_adaptor_messgae-1", mockOlt(), message1, errors.New("handler-not-found")},
- {"process_inter_adaptor_messgae-2", mockOlt(), message2, errors.New("handler-not-found")},
+ {"process_inter_adaptor_messgae-1", mockOlt(), message1,
+ reflect.TypeOf(&ErrNotFound{})},
+ {"process_inter_adaptor_messgae-2", mockOlt(), message2,
+ reflect.TypeOf(errors.New("message is nil"))},
+ {"process_inter_adaptor_messgae-3", mockOlt(), message3,
+ reflect.TypeOf(&ErrInvalidValue{})},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Process_inter_adapter_message(tt.args.msg); (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && err != nil {
- t.Errorf("Process_inter_adapter_message() error = %v, wantErr %v", err, tt.wantErr)
+ if err := oo.Process_inter_adapter_message(tt.args.msg); reflect.TypeOf(err) != tt.wantErrType {
+ t.Errorf("Process_inter_adapter_message() error = %v, wantErr %v",
+ reflect.TypeOf(err), tt.wantErrType)
}
})
}
@@ -551,13 +587,14 @@
args args
wantErr error
}{
- {"reboot_device-1", mockOlt(), args{mockDevice()}, errors.New("device-handler-not-found")},
- {"reboot_device-2", &fields{}, args{mockDevice()}, errors.New("device-handler-not-found")},
+ {"reboot_device-1", mockOlt(), args{mockDevice()}, nil},
+ {"reboot_device-2", &fields{}, args{mockDevice()},
+ NewErrNotFound("device-handler", log.Fields{"device-id": "olt"}, nil)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Reboot_device(tt.args.device); (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && err != nil {
+ if err := oo.Reboot_device(tt.args.device); !reflect.DeepEqual(err, tt.wantErr) {
t.Errorf("Reboot_device() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -584,15 +621,14 @@
args args
wantErr error
}{
- {"receive_packet_out-1", mockOlt(), args{mockDevice().Id, 1, pktout},
- errors.New("device-handler-not-set")},
+ {"receive_packet_out-1", mockOlt(), args{mockDevice().Id, 1, pktout}, nil},
{"receive_packet_out-2", mockOlt(), args{"1234", 1, pktout},
- errors.New("device-handler-not-set")},
+ NewErrNotFound("device-handler", log.Fields{"device-id": "1234"}, nil)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Receive_packet_out(tt.args.deviceID, tt.args.egressPortNo, tt.args.packet); (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Receive_packet_out(tt.args.deviceID, tt.args.egressPortNo, tt.args.packet); !reflect.DeepEqual(err, tt.wantErr) {
t.Errorf("Receive_packet_out() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -603,20 +639,21 @@
type args struct {
device *voltha.Device
}
+ expectedError := NewErrInvalidValue(log.Fields{"device": nil}, nil)
tests := []struct {
name string
fields *fields
args args
wantErr error
}{
- {"reconcile_device-1", &fields{}, args{}, errors.New("unImplemented")},
- {"reconcile_device-2", &fields{}, args{}, errors.New("unImplemented")},
- {"reconcile_device-3", &fields{}, args{}, errors.New("unImplemented")},
+ {"reconcile_device-1", &fields{}, args{}, expectedError},
+ {"reconcile_device-2", &fields{}, args{}, expectedError},
+ {"reconcile_device-3", &fields{}, args{}, expectedError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Reconcile_device(tt.args.device); (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Reconcile_device(tt.args.device); !reflect.DeepEqual(err, tt.wantErr) {
t.Errorf("Reconcile_device() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -633,13 +670,14 @@
args args
wantErr error
}{
- {"reenable_device-1", mockOlt(), args{mockDevice()}, errors.New("device-handler-not-found")},
- {"reenable_device-2", &fields{}, args{mockDevice()}, errors.New("device-handler-not-found")},
+ {"reenable_device-1", mockOlt(), args{mockDevice()}, nil},
+ {"reenable_device-2", &fields{}, args{mockDevice()},
+ NewErrNotFound("device-handler", log.Fields{"device-id": "olt"}, nil)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Reenable_device(tt.args.device); (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && err != nil {
+ if err := oo.Reenable_device(tt.args.device); !reflect.DeepEqual(err, tt.wantErr) {
t.Errorf("Reenable_device() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -659,17 +697,17 @@
wantErr error
}{
{"revert_image_update-1", &fields{}, args{}, &voltha.ImageDownload{Id: "Image1-ABC123XYZ"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"revert_image_update-2", &fields{}, args{}, &voltha.ImageDownload{Id: "Image2-ABC123TYU"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
{"revert_image_update-3", &fields{}, args{}, &voltha.ImageDownload{Id: "Image3-ABC123GTH"},
- errors.New("unImplemented")},
+ ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
got, err := oo.Revert_image_update(tt.args.device, tt.args.request)
- if (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) && got == nil {
+ if err != tt.wantErr && got == nil {
t.Log("error :", err)
}
})
@@ -686,14 +724,14 @@
args args
wantErr error
}{
- {"self_test_device-1", &fields{}, args{}, errors.New("unImplemented")},
- {"self_test_device-2", &fields{}, args{}, errors.New("unImplemented")},
- {"self_test_device-3", &fields{}, args{}, errors.New("unImplemented")},
+ {"self_test_device-1", &fields{}, args{}, ErrNotImplemented},
+ {"self_test_device-2", &fields{}, args{}, ErrNotImplemented},
+ {"self_test_device-3", &fields{}, args{}, ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Self_test_device(tt.args.device); (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Self_test_device(tt.args.device); err != tt.wantErr {
t.Errorf("Self_test_device() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -755,15 +793,14 @@
args args
wantErr error
}{
- {"suppress_event-1", &fields{}, args{}, errors.New("unImplemented")},
- {"suppress_event-2", &fields{}, args{}, errors.New("unImplemented")},
- {"suppress_event-3", &fields{}, args{}, errors.New("unImplemented")},
+ {"suppress_event-1", &fields{}, args{}, ErrNotImplemented},
+ {"suppress_event-2", &fields{}, args{}, ErrNotImplemented},
+ {"suppress_event-3", &fields{}, args{}, ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Suppress_event(tt.args.filter); (err != nil) &&
- (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Suppress_event(tt.args.filter); err != tt.wantErr {
t.Errorf("Suppress_event() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -780,15 +817,14 @@
args args
wantErr error
}{
- {"unsupress_event-1", &fields{}, args{}, errors.New("unImplemented")},
- {"unsupress_event-2", &fields{}, args{}, errors.New("unImplemented")},
- {"unsupress_event-3", &fields{}, args{}, errors.New("unImplemented")},
+ {"unsupress_event-1", &fields{}, args{}, ErrNotImplemented},
+ {"unsupress_event-2", &fields{}, args{}, ErrNotImplemented},
+ {"unsupress_event-3", &fields{}, args{}, ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Unsuppress_event(tt.args.filter); (err != nil) &&
- (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Unsuppress_event(tt.args.filter); err != tt.wantErr {
t.Errorf("Unsuppress_event() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -808,14 +844,14 @@
args args
wantErr error
}{
- {"update_flows_bulk-1", &fields{}, args{}, errors.New("unImplemented")},
- {"update_flows_bulk-2", &fields{}, args{}, errors.New("unImplemented")},
- {"update_flows_bulk-3", &fields{}, args{}, errors.New("unImplemented")},
+ {"update_flows_bulk-1", &fields{}, args{}, ErrNotImplemented},
+ {"update_flows_bulk-2", &fields{}, args{}, ErrNotImplemented},
+ {"update_flows_bulk-3", &fields{}, args{}, ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Update_flows_bulk(tt.args.device, tt.args.flows, tt.args.groups, tt.args.flowMetadata); (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Update_flows_bulk(tt.args.device, tt.args.flows, tt.args.groups, tt.args.flowMetadata); err != tt.wantErr {
t.Errorf("Update_flows_bulk() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -837,14 +873,13 @@
wantErr error
}{
{"update_flows_incrementally-1", &fields{}, args{device: mockDevice()},
- errors.New("device-handler-not-set")},
- {"update_flows_incrementally-1", mockOlt(), args{device: mockDevice()},
- errors.New("device-handler-not-set")},
+ NewErrNotFound("device-handler", log.Fields{"device-id": "olt"}, nil)},
+ {"update_flows_incrementally-2", mockOlt(), args{device: mockDevice()}, nil},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Update_flows_incrementally(tt.args.device, tt.args.flows, tt.args.groups, tt.args.flowMetadata); (err != nil) && (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Update_flows_incrementally(tt.args.device, tt.args.flows, tt.args.groups, tt.args.flowMetadata); !reflect.DeepEqual(err, tt.wantErr) {
t.Errorf("Update_flows_incrementally() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -862,15 +897,14 @@
args args
wantErr error
}{
- {"update_pm_config-1", &fields{}, args{}, errors.New("unImplemented")},
- {"update_pm_config-2", &fields{}, args{}, errors.New("unImplemented")},
- {"update_pm_config-3", &fields{}, args{}, errors.New("unImplemented")},
+ {"update_pm_config-1", &fields{}, args{}, ErrNotImplemented},
+ {"update_pm_config-2", &fields{}, args{}, ErrNotImplemented},
+ {"update_pm_config-3", &fields{}, args{}, ErrNotImplemented},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
oo := testOltObject(tt.fields)
- if err := oo.Update_pm_config(tt.args.device, tt.args.pmConfigs); (err != nil) &&
- (reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr)) {
+ if err := oo.Update_pm_config(tt.args.device, tt.args.pmConfigs); err != tt.wantErr {
t.Errorf("Update_pm_config() error = %v, wantErr %v", err, tt.wantErr)
}
diff --git a/adaptercore/statsmanager.go b/adaptercore/statsmanager.go
index 0905c1e..0ffa351 100755
--- a/adaptercore/statsmanager.go
+++ b/adaptercore/statsmanager.go
@@ -18,14 +18,12 @@
package adaptercore
import (
- "errors"
"fmt"
- "sync"
- "time"
-
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/openolt"
"github.com/opencord/voltha-protos/v3/go/voltha"
+ "sync"
+ "time"
)
var mutex = &sync.Mutex{}
@@ -235,7 +233,7 @@
return PONPorts, nil
} else {
log.Errorf("Invalid type of interface %s", Intftype)
- return nil, errors.New("invalid type of interface ")
+ return nil, NewErrInvalidValue(log.Fields{"interface-type": Intftype}, nil)
}
}