VOL-2293, VOL-2456 improve error handling
Change-Id: I4be5f12719a31b40363758cd47cc02968f180c75
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index c817455..5a4225a 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -22,7 +22,6 @@
"crypto/md5"
"encoding/hex"
"encoding/json"
- "errors"
"fmt"
"math/big"
"sync"
@@ -268,8 +267,7 @@
log.Debug("multicast flow, shifting id")
return 0x2<<15 | uint64(flowID), nil
} else {
- log.Debug("Unrecognized direction")
- return 0, fmt.Errorf("unrecognized direction %s", direction)
+ return 0, NewErrInvalidValue(log.Fields{"direction": direction}, nil).Log()
}
}
@@ -362,8 +360,10 @@
log.Debug("Scheduler already created for upstream")
return nil
}
- log.Errorw("Dynamic meter update not supported", log.Fields{"KvStoreMeterId": KvStoreMeter.MeterId, "MeterID-in-flow": sq.meterID})
- return errors.New("invalid-meter-id-in-flow")
+ return NewErrInvalidValue(log.Fields{
+ "unsupported": "meter-id",
+ "kv-store-meter-id": KvStoreMeter.MeterId,
+ "meter-id-in-flow": sq.meterID}, nil).Log()
}
log.Debugw("Meter-does-not-exist-Creating-new", log.Fields{"MeterID": sq.meterID, "Direction": Direction})
@@ -392,12 +392,17 @@
log.Error("Flow-metadata-is-not-present-in-flow")
}
if meterConfig == nil {
- log.Errorw("Could-not-get-meterbands-from-flowMetadata", log.Fields{"flowMetadata": sq.flowMetadata,
- "MeterID": sq.meterID})
- return errors.New("failed-to-get-meter-from-flowMetadata")
+ return NewErrNotFound("meterbands", log.Fields{
+ "reason": "Could-not-get-meterbands-from-flowMetadata",
+ "flow-metadata": sq.flowMetadata,
+ "meter-id": sq.meterID}, nil).Log()
} else if len(meterConfig.Bands) < MaxMeterBand {
log.Errorw("Invalid-number-of-bands-in-meter", log.Fields{"Bands": meterConfig.Bands, "MeterID": sq.meterID})
- return errors.New("invalid-number-of-bands-in-meter")
+ return NewErrInvalidValue(log.Fields{
+ "reason": "Invalid-number-of-bands-in-meter",
+ "meterband-count": len(meterConfig.Bands),
+ "metabands": meterConfig.Bands,
+ "meter-id": sq.meterID}, nil).Log()
}
cir := meterConfig.Bands[0].Rate
cbs := meterConfig.Bands[0].BurstSize
@@ -652,9 +657,10 @@
}
//Make sure we have as many tech_profiles as there are pon ports on the device
if tpCount != int(f.resourceMgr.DevInfo.GetPonPorts()) {
- log.Errorw("Error while populating techprofile",
- log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
- return errors.New("error while populating techprofile mgrs")
+ return NewErrInvalidValue(log.Fields{
+ "reason": "TP count does not match number of PON ports",
+ "tech-profile-count": tpCount,
+ "pon-port-count": f.resourceMgr.DevInfo.GetPonPorts()}, nil).Log()
}
log.Infow("Populated techprofile for ponports successfully",
log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
@@ -664,10 +670,10 @@
func (f *OpenOltFlowMgr) addUpstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, uplinkClassifier map[string]interface{},
uplinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
- allocID uint32, gemportID uint32) {
+ allocID uint32, gemportID uint32) error {
uplinkClassifier[PacketTagType] = SingleTag
log.Debugw("Adding upstream data flow", log.Fields{"uplinkClassifier": uplinkClassifier, "uplinkAction": uplinkAction})
- f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
+ return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
Upstream, logicalFlow, allocID, gemportID)
/* TODO: Install Secondary EAP on the subscriber vlan */
}
@@ -675,7 +681,7 @@
func (f *OpenOltFlowMgr) addDownstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, downlinkClassifier map[string]interface{},
downlinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
- allocID uint32, gemportID uint32) {
+ allocID uint32, gemportID uint32) error {
downlinkClassifier[PacketTagType] = DoubleTag
log.Debugw("Adding downstream data flow", log.Fields{"downlinkClassifier": downlinkClassifier,
"downlinkAction": downlinkAction})
@@ -685,7 +691,7 @@
if metadata, exists := downlinkClassifier[Metadata]; exists { // inport is filled in metadata by core
if uint32(metadata.(uint64)) == MkUniPortNum(intfID, onuID, uniID) {
log.Infow("Ignoring DL trap device flow from core", log.Fields{"flow": logicalFlow})
- return
+ return nil
}
}
}
@@ -698,18 +704,18 @@
if ok {
downlinkAction[VlanVid] = dlClVid & 0xfff
} else {
- log.Error("dl-classifier-vid-type-conversion-failed")
- return
+ return NewErrInvalidValue(log.Fields{
+ "reason": "failed to convert VLANID classifier",
+ "vlan-id": VlanVid}, nil).Log()
}
- f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
+ return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
Downstream, logicalFlow, allocID, gemportID)
}
func (f *OpenOltFlowMgr) addHSIAFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
- allocID uint32, gemPortID uint32) {
- var networkIntfID uint32
+ allocID uint32, gemPortID uint32) error {
/* One of the OLT platform (Broadcom BAL) requires that symmetric
flows require the same flow_id to be used across UL and DL.
Since HSIA flow is the only symmetric flow currently, we need to
@@ -724,33 +730,35 @@
if _, ok := classifier[VlanPcp]; ok {
vlanPbit = classifier[VlanPcp].(uint32)
log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
+ } else {
+ log.Debugw("bpit-not-found-in-flow", log.Fields{"vlan-pcp": VlanPcp})
}
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("Flow-exists--not-re-adding")
- return
+ log.Debug("flow-already-exists")
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
if err != nil {
- log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
- return
+ return NewErrNotFound("hsia-flow-id", log.Fields{"direction": direction}, err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for hsia flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Errorw("Error in making action protobuf for hsia flow", log.Fields{"direction": direction})
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
log.Debugw("Created action proto", log.Fields{"action": *actionProto})
- networkIntfID, err = getNniIntfID(classifier, action)
+ networkIntfID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id",
+ log.Fields{
+ "classifier": classifier,
+ "action": action,
+ }, err).Log()
}
flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
@@ -765,29 +773,28 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
- log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
- flow.OnuId,
- flow.UniId,
- flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
- log.Errorw("Error uploading HSIA flow into KV store", log.Fields{"flow": flow, "direction": direction, "error": err})
- return
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
+ return NewErrFlowOp("add", flowID, nil, err).Log()
}
+ log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
+ flow.OnuId,
+ flow.UniId,
+ flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": flow}, err).Log()
+ }
+ return nil
}
-func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) error {
- var dhcpFlow openoltpb2.Flow
- var actionProto *openoltpb2.Action
- var classifierProto *openoltpb2.Classifier
- var flowID uint32
networkIntfID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
// Clear the action map
@@ -804,29 +811,32 @@
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
- return
+ return nil
}
- flowID, err = f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
- log.Errorw("flowId unavailable for UL DHCP", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
- return
+ return NewErrNotFound("flow", log.Fields{
+ "interface-id": intfID,
+ "gem-port": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
log.Debugw("Creating UL DHCP flow", log.Fields{"ul_classifier": classifier, "ul_action": action, "uplinkFlowId": flowID})
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for ul flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Error("Error in making action protobuf for ul flow")
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
- dhcpFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+ dhcpFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
FlowId: flowID,
@@ -840,39 +850,37 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); ok {
- log.Debug("DHCP UL flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
- dhcpFlow.OnuId,
- dhcpFlow.UniId,
- dhcpFlow.FlowId, flowsToKVStore); err != nil {
- log.Errorw("Error uploading DHCP UL flow into KV store", log.Fields{"flow": dhcpFlow, "error": err})
- return
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"dhcp-flow": dhcpFlow}, err).Log()
+ }
+ log.Debug("DHCP UL flow added to device successfully")
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
+ dhcpFlow.OnuId,
+ dhcpFlow.UniId,
+ dhcpFlow.FlowId, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", dhcpFlow.FlowId, log.Fields{"flow": dhcpFlow}, err).Log()
}
- return
+ return nil
}
//addIGMPTrapFlow creates IGMP trap-to-host flow
func (f *OpenOltFlowMgr) addIGMPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
- action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
- f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
+ action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) error {
+ return f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
}
//addUpstreamTrapFlow creates a trap-to-host flow
func (f *OpenOltFlowMgr) addUpstreamTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
- action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string) {
-
- var flow openoltpb2.Flow
- var actionProto *openoltpb2.Action
- var classifierProto *openoltpb2.Classifier
+ action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string) error {
networkIntfID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
// Clear the action map
@@ -886,30 +894,34 @@
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("Flow-exists--not-re-adding")
- return
+ log.Debug("Flow-exists-not-re-adding")
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
- log.Errorw("flowId unavailable for upstream trap flow", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie, "flowType": flowType})
- return
+ return NewErrNotFound("flow-id", log.Fields{
+ "interface-id": intfID,
+ "oni-id": onuID,
+ "cookie": flowStoreCookie,
+ "flow-type": flowType},
+ err).Log()
}
log.Debugw("Creating upstream trap flow", log.Fields{"ul_classifier": classifier, "ul_action": action, "uplinkFlowId": flowID, "flowType": flowType})
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for ul flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Error("Error in making action protobuf for ul flow")
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
- flow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+ flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
FlowId: flowID,
@@ -923,32 +935,29 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
- log.Debugf("%s UL flow added to device successfully", flowType)
+ if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": flow}, err).Log()
+ }
+ log.Debugf("%s UL flow added to device successfully", flowType)
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
- flow.OnuId,
- flow.UniId,
- flow.FlowId, flowsToKVStore); err != nil {
- log.Errorw("Error uploading UL flow into KV store", log.Fields{"flow": flow, "error": err})
- return
- }
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
+ flow.OnuId,
+ flow.UniId,
+ flow.FlowId, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flow.FlowId, log.Fields{"flow": flow}, err).Log()
}
- return
+ return nil
}
// Add EAPOL flow to device with mac, vlanId as classifier for upstream and downstream
-func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
+func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) error {
log.Debugw("Adding EAPOL to device", log.Fields{"intfId": intfID, "onuId": onuID, "portNo": portNo, "allocId": allocID, "gemPortId": gemPortID, "vlanId": vlanID, "flow": logicalFlow})
uplinkClassifier := make(map[string]interface{})
uplinkAction := make(map[string]interface{})
- var upstreamFlow openoltpb2.Flow
- var networkIntfID uint32
-
// Fill Classfier
uplinkClassifier[EthType] = uint32(EapEthType)
uplinkClassifier[PacketTagType] = SingleTag
@@ -957,36 +966,39 @@
uplinkAction[TrapToHost] = true
flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("Flow-exists--not-re-adding")
- return
+ log.Debug("Flow-exists-not-re-adding")
+ return nil
}
//Add Uplink EAPOL Flow
uplinkFlowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
if err != nil {
- log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
- return
+ return NewErrNotFound("flow-id", log.Fields{
+ "interface-id": intfID,
+ "onu-id": onuID,
+ "coookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
log.Debugw("Creating UL EAPOL flow", log.Fields{"ul_classifier": uplinkClassifier, "ul_action": uplinkAction, "uplinkFlowId": uplinkFlowID})
- if classifierProto = makeOpenOltClassifierField(uplinkClassifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for ul flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(uplinkClassifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": uplinkClassifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(uplinkAction); actionProto == nil {
- log.Error("Error in making action protobuf for ul flow")
- return
+ actionProto, err := makeOpenOltActionField(uplinkAction)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": uplinkAction}, err).Log()
}
log.Debugw("Created action proto", log.Fields{"action": *actionProto})
- networkIntfID, err = getNniIntfID(classifier, action)
+ networkIntfID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
- upstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+ upstreamFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
FlowId: uplinkFlowID,
@@ -999,25 +1011,26 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); ok {
- log.Debug("EAPOL UL flow added to device successfully")
- flowCategory := "EAPOL"
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
- upstreamFlow.OnuId,
- upstreamFlow.UniId,
- upstreamFlow.FlowId,
- /* lowCategory, */
- flowsToKVStore); err != nil {
- log.Errorw("Error uploading EAPOL UL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
- return
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); err != nil {
+ return NewErrFlowOp("add", uplinkFlowID, log.Fields{"flow": upstreamFlow}, err).Log()
+ }
+ log.Debug("EAPOL UL flow added to device successfully")
+ flowCategory := "EAPOL"
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
+ upstreamFlow.OnuId,
+ upstreamFlow.UniId,
+ upstreamFlow.FlowId,
+ /* lowCategory, */
+ flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", upstreamFlow.FlowId, log.Fields{"flow": upstreamFlow}, err).Log()
}
log.Debugw("Added EAPOL flows to device successfully", log.Fields{"flow": logicalFlow})
+ return nil
}
-func makeOpenOltClassifierField(classifierInfo map[string]interface{}) *openoltpb2.Classifier {
+func makeOpenOltClassifierField(classifierInfo map[string]interface{}) (*openoltpb2.Classifier, error) {
var classifier openoltpb2.Classifier
classifier.EthType, _ = classifierInfo[EthType].(uint32)
@@ -1054,14 +1067,13 @@
case DoubleTag:
case Untagged:
default:
- log.Error("Invalid tag type in classifier") // should not hit
- return nil
+ return nil, NewErrInvalidValue(log.Fields{"packet-tag-type": pktTagType}, nil).Log()
}
}
- return &classifier
+ return &classifier, nil
}
-func makeOpenOltActionField(actionInfo map[string]interface{}) *openoltpb2.Action {
+func makeOpenOltActionField(actionInfo map[string]interface{}) (*openoltpb2.Action, error) {
var actionCmd openoltpb2.ActionCmd
var action openoltpb2.Action
action.Cmd = &actionCmd
@@ -1074,10 +1086,9 @@
} else if _, ok := actionInfo[TrapToHost]; ok {
action.Cmd.TrapToHost = actionInfo[TrapToHost].(bool)
} else {
- log.Errorw("Invalid-action-field", log.Fields{"action": actionInfo})
- return nil
+ return nil, NewErrInvalidValue(log.Fields{"action-command": actionInfo}, nil).Log()
}
- return &action
+ return &action, nil
}
func (f *OpenOltFlowMgr) getTPpath(intfID uint32, uni string, TpID uint32) string {
@@ -1194,7 +1205,7 @@
return nil
}
-func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) bool {
+func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
var intfID uint32
/* For flows which trap out of the NNI, the AccessIntfId is invalid
@@ -1213,37 +1224,37 @@
st, _ := status.FromError(err)
if st.Code() == codes.AlreadyExists {
log.Debug("Flow already exists", log.Fields{"err": err, "deviceFlow": deviceFlow})
- return true
+ return nil
}
if err != nil {
log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": deviceFlow})
f.resourceMgr.FreeFlowID(ctx, intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
- return false
+ return err
}
if deviceFlow.GemportId != -1 {
// No need to register the flow if it is a trap on nni flow.
f.registerFlow(ctx, logicalFlow, deviceFlow)
}
log.Debugw("Flow added to device successfully ", log.Fields{"flow": *deviceFlow})
- return true
+ return nil
}
-func (f *OpenOltFlowMgr) removeFlowFromDevice(deviceFlow *openoltpb2.Flow) bool {
+func (f *OpenOltFlowMgr) removeFlowFromDevice(deviceFlow *openoltpb2.Flow) error {
log.Debugw("Sending flow to device via grpc", log.Fields{"flow": *deviceFlow})
_, err := f.deviceHandler.Client.FlowRemove(context.Background(), deviceFlow)
if err != nil {
if f.deviceHandler.device.ConnectStatus == common.ConnectStatus_UNREACHABLE {
log.Warnw("Can not remove flow from device since it's unreachable", log.Fields{"err": err, "deviceFlow": deviceFlow})
//Assume the flow is removed
- return true
+ return nil
}
log.Errorw("Failed to Remove flow from device", log.Fields{"err": err, "deviceFlow": deviceFlow})
- return false
+ return err
}
log.Debugw("Flow removed from device successfully ", log.Fields{"flow": *deviceFlow})
- return true
+ return nil
}
/*func register_flow(deviceFlow *openolt_pb2.Flow, logicalFlow *ofp.OfpFlowStats){
@@ -1266,7 +1277,7 @@
*/
-func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) {
+func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) error {
classifierInfo := make(map[string]interface{})
actionInfo := make(map[string]interface{})
@@ -1291,28 +1302,34 @@
var uniID = -1
var gemPortID = -1
- var networkInterfaceID = IntfIDFromNniPortNum(portNo)
+ networkInterfaceID, err := IntfIDFromNniPortNum(portNo)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"nni-port-number": portNo}, err).Log()
+ }
var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
- return
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
if err != nil {
- log.Errorw("Flow id unavailable for LLDP traponNNI flow", log.Fields{"error": err})
- return
+ return NewErrNotFound("flow-id", log.Fields{
+ "interface-id": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "gem-port-id": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
- if classifierProto = makeOpenOltClassifierField(classifierInfo); classifierProto == nil {
- log.Error("Error in making classifier protobuf for LLDP trap on nni flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifierInfo)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(actionInfo); actionProto == nil {
- log.Error("Error in making action protobuf for LLDP trap on nni flow")
- return
+ actionProto, err := makeOpenOltActionField(actionInfo)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": actionInfo}, err).Log()
}
log.Debugw("Created action proto", log.Fields{"action": *actionProto})
@@ -1328,17 +1345,18 @@
Priority: int32(flow.Priority),
Cookie: flow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, flow, &downstreamflow); ok {
- log.Debug("LLDP trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- log.Errorw("Error uploading LLDP flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
- }
+ if err := f.addFlowToDevice(ctx, flow, &downstreamflow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err).Log()
}
- return
+ log.Debug("LLDP trap on NNI flow added to device successfully")
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+ int32(onuID),
+ int32(uniID),
+ flowID, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err).Log()
+ }
+ return nil
}
func getUniPortPath(intfID uint32, onuID int32, uniID int32) string {
@@ -1349,10 +1367,12 @@
func (f *OpenOltFlowMgr) getOnuChildDevice(intfID uint32, onuID uint32) (*voltha.Device, error) {
log.Debugw("GetChildDevice", log.Fields{"pon port": intfID, "onuId": onuID})
parentPortNo := IntfIDToPortNo(intfID, voltha.Port_PON_OLT)
- onuDevice := f.deviceHandler.GetChildDevice(parentPortNo, onuID)
- if onuDevice == nil {
- log.Errorw("onu not found", log.Fields{"intfId": parentPortNo, "onuId": onuID})
- return nil, errors.New("onu not found")
+ onuDevice, err := f.deviceHandler.GetChildDevice(parentPortNo, onuID)
+ if err != nil {
+ return nil, NewErrNotFound("onu", log.Fields{
+ "interface-id": parentPortNo,
+ "onu-id": onuID},
+ err).Log()
}
log.Debugw("Successfully received child device from core", log.Fields{"child_device": *onuDevice})
return onuDevice, nil
@@ -1575,6 +1595,7 @@
return nil
}
+// nolint: gocyclo
func (f *OpenOltFlowMgr) clearFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) {
log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
@@ -1585,8 +1606,6 @@
}
var updatedFlows []rsrcMgr.FlowInfo
- var flowID uint32
- var onuID, uniID int32
classifierInfo := make(map[string]interface{})
portNum, Intf, onu, uni, inPort, ethType, err := FlowExtractInfo(flow, flowDirection)
@@ -1595,8 +1614,8 @@
return
}
- onuID = int32(onu)
- uniID = int32(uni)
+ onuID := int32(onu)
+ uniID := int32(uni)
for _, field := range flows.GetOfbFields(flow) {
if field.Type == flows.IP_PROTO {
@@ -1611,10 +1630,17 @@
onuID = -1
uniID = -1
log.Debug("Trap on nni flow set oni, uni to -1")
- Intf = IntfIDFromNniPortNum(inPort)
+ Intf, err = IntfIDFromNniPortNum(inPort)
+ if err != nil {
+ log.Errorw("invalid-in-port-number",
+ log.Fields{
+ "port-number": inPort,
+ "error": err})
+ return
+ }
}
flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, Intf, onuID, uniID)
- for _, flowID = range flowIds {
+ for _, flowID := range flowIds {
flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flowID)
if flowInfo == nil {
log.Debugw("No FlowInfo found found in KV store",
@@ -1630,18 +1656,17 @@
if flow.Id == storedFlow.LogicalFlowID {
removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
log.Debugw("Flow to be deleted", log.Fields{"flow": storedFlow})
- if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
- log.Debug("Flow removed from device successfully")
- //Remove the Flow from FlowInfo
- updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
- flowID, flowDirection, portNum, updatedFlows)
- if err != nil {
- log.Error("Failed to clear resources for flow", log.Fields{"flow": storedFlow})
- return
- }
- } else {
- log.Error("Failed to remove flow from device")
+ // DKB
+ if err = f.removeFlowFromDevice(&removeFlowMessage); err != nil {
+ log.Errorw("failed-to-remove-flow", log.Fields{"error": err})
+ return
+ }
+ log.Debug("Flow removed from device successfully")
+ //Remove the Flow from FlowInfo
+ updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+ if err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
+ flowID, flowDirection, portNum, updatedFlows); err != nil {
+ log.Error("Failed to clear resources for flow", log.Fields{"flow": storedFlow})
return
}
}
@@ -1661,7 +1686,15 @@
return
}
- networkInterfaceID := IntfIDFromNniPortNum(inPort)
+ networkInterfaceID, err := IntfIDFromNniPortNum(inPort)
+ if err != nil {
+ // DKB
+ log.Errorw("invalid-in-port-number",
+ log.Fields{
+ "port-number": inPort,
+ "error": err})
+ return
+ }
var onuID = int32(NoneOnuID)
var uniID = int32(NoneUniID)
var flowID uint32
@@ -1685,8 +1718,12 @@
removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
log.Debugw("Multicast flow to be deleted", log.Fields{"flow": storedFlow})
//remove from device
- if ok := f.removeFlowFromDevice(&removeFlowMessage); !ok {
- log.Errorw("Failed to remove multicast flow from device", log.Fields{"flowId": flow.Id})
+ if err := f.removeFlowFromDevice(&removeFlowMessage); err != nil {
+ // DKB
+ log.Errorw("failed-to-remove-multicast-flow",
+ log.Fields{
+ "flow-id": flow.Id,
+ "error": err})
return
}
log.Debugw("Multicast flow removed from device successfully", log.Fields{"flowId": flow.Id})
@@ -1863,14 +1900,13 @@
}
// handleFlowWithGroup adds multicast flow to the device.
-func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
classifierInfo[PacketTagType] = DoubleTag
log.Debugw("add-multicast-flow", log.Fields{"classifierInfo": classifierInfo, "actionInfo": actionInfo})
inPort, err := f.getInPortOfMulticastFlow(ctx, classifierInfo)
if err != nil {
- log.Warnw("No inPort found. Ignoring multicast flow.", log.Fields{"flowId:": flow.Id})
- return
+ return NewErrNotFound("multicast-in-port", log.Fields{"classifier": classifierInfo}, err).Log()
}
//replace ipDst with ethDst
if ipv4Dst, ok := classifierInfo[Ipv4Dst]; ok &&
@@ -1883,26 +1919,33 @@
log.Debugw("multicast-ip-to-mac-conversion-success", log.Fields{"ip:": ipv4Dst.(uint32), "mac:": multicastMac})
}
- var onuID = NoneOnuID
- var uniID = NoneUniID
- var gemPortID = NoneGemPortID
+ onuID := NoneOnuID
+ uniID := NoneUniID
+ gemPortID := NoneGemPortID
- networkInterfaceID := IntfIDFromNniPortNum(inPort)
+ networkInterfaceID, err := IntfIDFromNniPortNum(inPort)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"nni-in-port-number": inPort}, err).Log()
+ }
- var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
+ flowStoreCookie := getFlowStoreCookie(classifierInfo, uint32(0))
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debugw("multicast-flow-exists--not-re-adding", log.Fields{"classifierInfo": classifierInfo})
- return
+ log.Debugw("multicast-flow-exists-not-re-adding", log.Fields{"classifierInfo": classifierInfo})
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
if err != nil {
- log.Errorw("Flow id unavailable for multicast flow", log.Fields{"error": err})
- return
+ return NewErrNotFound("multicast-flow-id", log.Fields{
+ "interface-id": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "gem-port-id": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- if classifierProto = makeOpenOltClassifierField(classifierInfo); classifierProto == nil {
- log.Error("Error in making classifier protobuf for multicast flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifierInfo)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err).Log()
}
groupID := actionInfo[GroupID].(uint32)
multicastFlow := openoltpb2.Flow{
@@ -1914,27 +1957,28 @@
Priority: int32(flow.Priority),
Cookie: flow.Cookie}
- if ok := f.addFlowToDevice(ctx, flow, &multicastFlow); ok {
- log.Debug("multicast flow added to device successfully")
- //get cached group
- group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
- if err == nil {
- //calling groupAdd to set group members after multicast flow creation
- if f.ModifyGroup(ctx, group) {
- //cached group can be removed now
- f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
- }
- }
-
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- log.Errorw("Error uploading multicast flow into KV store", log.Fields{"flow": multicastFlow, "error": err})
+ if err = f.addFlowToDevice(ctx, flow, &multicastFlow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": multicastFlow}, err).Log()
+ }
+ log.Debug("multicast flow added to device successfully")
+ //get cached group
+ group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
+ if err == nil {
+ //calling groupAdd to set group members after multicast flow creation
+ if f.ModifyGroup(ctx, group) {
+ //cached group can be removed now
+ f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
}
}
- return
+
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
+ if err = f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+ int32(onuID),
+ int32(uniID),
+ flowID, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": multicastFlow}, err).Log()
+ }
+ return nil
}
//getInPortOfMulticastFlow return inPort criterion if exists; returns NNI interface of the device otherwise
@@ -1947,7 +1991,7 @@
if e == nil && len(nniPorts) > 0 {
return nniPorts[0], nil
}
- return 0, errors.New("cannot find NNI port of device")
+ return 0, NewErrNotFound("nni-port", nil, e).Log()
}
// AddGroup add or update the group
@@ -2236,8 +2280,11 @@
}
}
}
- log.Errorw("onuid is not found", log.Fields{"serialNumber": serialNumber, "intfId": intfID, "gemPort": gemPortID})
- return uint32(0), errors.New("key error, onuid is not found") // ONU ID 0 is not a valid one
+ return uint32(0), NewErrNotFound("onu-id", log.Fields{
+ "serial-number": serialNumber,
+ "interface-id": intfID,
+ "gem-port-id": gemPortID},
+ nil).Log()
}
//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
@@ -2302,10 +2349,10 @@
func installFlowOnAllGemports(ctx context.Context,
f1 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32,
portNo uint32, classifier map[string]interface{}, action map[string]interface{},
- logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32),
+ logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32) error,
f2 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32, portNo uint32,
logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32,
- classifier map[string]interface{}, action map[string]interface{}),
+ classifier map[string]interface{}, action map[string]interface{}) error,
args map[string]uint32,
classifier map[string]interface{}, action map[string]interface{},
logicalFlow *ofp.OfpFlowStats,
@@ -2325,13 +2372,11 @@
}
}
-func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) error {
log.Debug("Adding trap-dhcp-of-nni-flow")
action := make(map[string]interface{})
classifier[PacketTagType] = DoubleTag
action[TrapToHost] = true
- var err error
- var networkInterfaceID uint32
/* We manage flowId resource pool on per PON port basis.
Since this situation is tricky, as a hack, we pass the NNI port
index (network_intf_id) as PON port Index for the flowId resource
@@ -2347,32 +2392,37 @@
uniID := -1
gemPortID := -1
allocID := -1
- networkInterfaceID, err = getNniIntfID(classifier, action)
+ networkInterfaceID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-intreface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("Flow-exists--not-re-adding")
- return
+ log.Debug("Flow-exists-not-re-adding")
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
if err != nil {
- log.Errorw("Flow id unavailable for DHCP traponNNI flow", log.Fields{"error": err})
- return
+ return NewErrNotFound("dhcp-trap-nni-flow-id", log.Fields{
+ "interface-id": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "gem-port-id": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for dhcp trap on nni flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Error("Error in making action protobuf for dhcp trap on nni flow")
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
log.Debugw("Created action proto", log.Fields{"action": *actionProto})
downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
@@ -2388,17 +2438,18 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
- log.Debug("DHCP trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- log.Errorw("Error uploading DHCP DL flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err).Log()
}
- return
+ log.Debug("DHCP trap on NNI flow added to device successfully")
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+ int32(onuID),
+ int32(uniID),
+ flowID, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err).Log()
+ }
+ return nil
}
//getPacketTypeFromClassifiers finds and returns packet type of a flow by checking flow classifiers
@@ -2428,7 +2479,7 @@
}
//addIgmpTrapFlowOnNNI adds a trap-to-host flow on NNI
-func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) error {
log.Debugw("Adding igmp-trap-of-nni-flow", log.Fields{"classifierInfo": classifier})
action := make(map[string]interface{})
classifier[PacketTagType] = getPacketTypeFromClassifiers(classifier)
@@ -2450,29 +2501,34 @@
allocID := -1
networkInterfaceID, err := getNniIntfID(classifier, action)
if err != nil {
- log.Error("Failed to get nniIntf ID")
- return
+ return NewErrNotFound("nni-interface-id", log.Fields{
+ "classifier": classifier,
+ "action": action},
+ err).Log()
}
flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
- log.Debug("igmp-flow-exists--not-re-adding")
- return
+ log.Debug("igmp-flow-exists-not-re-adding")
+ return nil
}
flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
if err != nil {
- log.Errorw("IGMP flow id unavailable for trap-on-NNI flow", log.Fields{"error": err})
- return
+ return NewErrNotFound("igmp-flow-id", log.Fields{
+ "interface-id": networkInterfaceID,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "gem-port-id": gemPortID,
+ "cookie": flowStoreCookie},
+ err).Log()
}
- var classifierProto *openoltpb2.Classifier
- var actionProto *openoltpb2.Action
- if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for igmp trap on nni flow")
- return
+ classifierProto, err := makeOpenOltClassifierField(classifier)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
}
log.Debugw("Created classifier proto for the IGMP flow", log.Fields{"classifier": *classifierProto})
- if actionProto = makeOpenOltActionField(action); actionProto == nil {
- log.Error("Error in making action protobuf for IGMP trap on nni flow")
- return
+ actionProto, err := makeOpenOltActionField(action)
+ if err != nil {
+ return NewErrInvalidValue(log.Fields{"action": action}, err).Log()
}
log.Debugw("Created action proto for the IGMP flow", log.Fields{"action": *actionProto})
downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
@@ -2488,23 +2544,23 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
- log.Debug("IGMP Trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
- int32(onuID),
- int32(uniID),
- flowID, flowsToKVStore); err != nil {
- log.Errorw("Error uploading igmp-trap-on-nni flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
- }
+ if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
+ return NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err).Log()
}
- return
+ log.Debug("IGMP Trap on NNI flow added to device successfully")
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
+ int32(onuID),
+ int32(uniID),
+ flowID, flowsToKVStore); err != nil {
+ return NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err).Log()
+ }
+ return nil
}
func verifyMeterIDAndGetDirection(MeterID uint32, Dir tp_pb.Direction) (string, error) {
if MeterID == 0 { // This should never happen
- log.Error("Invalid meter id")
- return "", errors.New("invalid meter id")
+ return "", NewErrInvalidValue(log.Fields{"meter-id": MeterID}, nil).Log()
}
if Dir == tp_pb.Direction_UPSTREAM {
return "upstream", nil
@@ -2699,8 +2755,7 @@
actionInfo[Output] = out.GetPort()
log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
} else {
- log.Error("Invalid output port in action")
- return errors.New("invalid output port in action")
+ return NewErrInvalidValue(log.Fields{"output-port": nil}, nil).Log()
}
} else if action.Type == flows.POP_VLAN {
actionInfo[PopVlan] = true
@@ -2720,8 +2775,7 @@
if out := action.GetSetField(); out != nil {
if field := out.GetField(); field != nil {
if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
- log.Errorw("Invalid openflow class", log.Fields{"class": ofClass})
- return errors.New("invalid openflow class")
+ return NewErrInvalidValue(log.Fields{"openflow-class": ofClass}, nil).Log()
}
/*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
formulateSetFieldActionInfoFromFlow(field, actionInfo)
@@ -2730,8 +2784,7 @@
} else if action.Type == flows.GROUP {
formulateGroupActionInfoFromFlow(action, actionInfo)
} else {
- log.Errorw("Un supported action type", log.Fields{"type": action.Type})
- return errors.New("un supported action type")
+ return NewErrInvalidValue(log.Fields{"action-type": action.Type}, nil).Log()
}
}
return nil
@@ -2770,8 +2823,9 @@
classifierInfo[InPort] = uniPort
log.Debugw("upstream pon-to-controller-flow,inport-in-tunnelid", log.Fields{"newInPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[Output].(uint32)})
} else {
- log.Error("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
- return errors.New("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
+ return NewErrNotFound("child-in-port", log.Fields{
+ "reason": "upstream pon-to-controller-flow, NO-inport-in-tunnelid",
+ "flow": flow}, nil).Log()
}
}
} else {
@@ -2782,8 +2836,9 @@
actionInfo[Output] = uniPort
log.Debugw("downstream-nni-to-pon-port-flow, outport-in-tunnelid", log.Fields{"newOutPort": actionInfo[Output].(uint32), "outPort": actionInfo[Output].(uint32)})
} else {
- log.Debug("downstream-nni-to-pon-port-flow, no-outport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32), "outPort": actionInfo[Output].(uint32)})
- return errors.New("downstream-nni-to-pon-port-flow, no-outport-in-tunnelid")
+ return NewErrNotFound("out-port", log.Fields{
+ "reason": "downstream-nni-to-pon-port-flow, no-outport-in-tunnelid",
+ "flow": flow}, nil).Log()
}
// Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
} else if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_PON_OLT {
@@ -2792,9 +2847,11 @@
log.Debugw("upstream-pon-to-nni-port-flow, inport-in-tunnelid", log.Fields{"newInPort": actionInfo[Output].(uint32),
"outport": actionInfo[Output].(uint32)})
} else {
- log.Debug("upstream-pon-to-nni-port-flow, no-inport-in-tunnelid", log.Fields{"InPort": classifierInfo[InPort].(uint32),
- "outPort": actionInfo[Output].(uint32)})
- return errors.New("upstream-pon-to-nni-port-flow, no-inport-in-tunnelid")
+ return NewErrNotFound("nni-port", log.Fields{
+ "reason": "upstream-pon-to-nni-port-flow, no-inport-in-tunnelid",
+ "in-port": classifierInfo[InPort].(uint32),
+ "out-port": actionInfo[Output].(uint32),
+ "flow": flow}, nil).Log()
}
}
}
@@ -2811,8 +2868,7 @@
*/
metadata := flows.GetMetadataFromWriteMetadataAction(flow)
if metadata == 0 {
- log.Error("metadata-is-not-present-in-flow-which-is-mandatory")
- return 0, errors.New("metadata-is-not-present-in-flow-which-is-mandatory")
+ return 0, NewErrNotFound("metadata", log.Fields{"flow": flow}, nil).Log()
}
TpID := flows.GetTechProfileIDFromWriteMetaData(metadata)
return uint32(TpID), nil
@@ -2832,11 +2888,25 @@
portType := IntfIDToPortTypeName(classifier[InPort].(uint32))
if portType == voltha.Port_PON_OLT {
- intfID := IntfIDFromNniPortNum(action[Output].(uint32))
+ intfID, err := IntfIDFromNniPortNum(action[Output].(uint32))
+ if err != nil {
+ log.Debugw("invalid-action-port-number",
+ log.Fields{
+ "port-number": action[Output].(uint32),
+ "error": err})
+ return uint32(0), err
+ }
log.Debugw("output Nni IntfID is", log.Fields{"intfid": intfID})
return intfID, nil
} else if portType == voltha.Port_ETHERNET_NNI {
- intfID := IntfIDFromNniPortNum(classifier[InPort].(uint32))
+ intfID, err := IntfIDFromNniPortNum(classifier[InPort].(uint32))
+ if err != nil {
+ log.Debugw("invalid-classifier-port-number",
+ log.Fields{
+ "port-number": action[Output].(uint32),
+ "error": err})
+ return uint32(0), err
+ }
log.Debugw("input Nni IntfID is", log.Fields{"intfid": intfID})
return intfID, nil
}
@@ -2922,8 +2992,7 @@
func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
if err != nil {
- log.Errorw("Failed to get the flow group from KV store", log.Fields{"groupId": groupID, "err": err})
- return nil, false, errors.New("failed to retrieve the flow group")
+ return nil, false, NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err).Log()
}
if exists {
return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil