VOL-2632 Error propagation from HashFlowStats
Change-Id: If2872e97e2b6c3c751f64dadfef47bfde3a77551
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index 9e996dc..16754b9 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -74,7 +74,7 @@
// Handles special case of any controller-bound flow for a parent device
func (fd *FlowDecomposer) updateOutputPortForControllerBoundFlowForParentDevide(flow *ofp.OfpFlowStats,
- dr *fu.DeviceRules) *fu.DeviceRules {
+ dr *fu.DeviceRules) (*fu.DeviceRules, error) {
EAPOL := fu.EthType(0x888e)
IGMP := fu.IpProto(2)
UDP := fu.IpProto(17)
@@ -100,18 +100,21 @@
uint32(ofp.OfpPortNo_OFPP_CONTROLLER))
}
// Update flow Id as a change in the instruction field will result in a new flow ID
- f.Id = fu.HashFlowStats(f)
+ var err error
+ if f.Id, err = fu.HashFlowStats(f); err != nil {
+ return nil, err
+ }
newDeviceRules.AddFlow(deviceID, (proto.Clone(f)).(*ofp.OfpFlowStats))
}
}
}
- return newDeviceRules
+ return newDeviceRules, nil
}
//processControllerBoundFlow decomposes trap flows
func (fd *FlowDecomposer) processControllerBoundFlow(ctx context.Context, agent coreif.LogicalDeviceAgent, path []route.Hop,
- inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
+ inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
log.Debugw("trap-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "flow": flow})
deviceRules := fu.NewDeviceRules()
@@ -136,7 +139,11 @@
// Augment the matchfields with the ofpfields from the flow
fg := fu.NewFlowsAndGroups()
fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
- fg.AddFlow(fu.MkFlowStat(fa))
+ fs, err := fu.MkFlowStat(fa)
+ if err != nil {
+ return nil, err
+ }
+ fg.AddFlow(fs)
deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
} else {
// Trap flow for UNI port
@@ -166,7 +173,11 @@
// Augment the matchfields with the ofpfields from the flow
faParent.MatchFields = append(faParent.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
fgParent := fu.NewFlowsAndGroups()
- fgParent.AddFlow(fu.MkFlowStat(faParent))
+ fs, err := fu.MkFlowStat(faParent)
+ if err != nil {
+ return nil, err
+ }
+ fgParent.AddFlow(fs)
deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fgParent)
log.Debugw("parent-trap-flow-set", log.Fields{"flow": faParent})
@@ -203,13 +214,17 @@
faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
}
fgChild := fu.NewFlowsAndGroups()
- fgChild.AddFlow(fu.MkFlowStat(faChild))
+ fs, err = fu.MkFlowStat(faChild)
+ if err != nil {
+ return nil, err
+ }
+ fgChild.AddFlow(fs)
deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fgChild)
log.Debugw("child-trap-flow-set", log.Fields{"flow": faChild})
}
}
- return deviceRules
+ return deviceRules, nil
}
// processUpstreamNonControllerBoundFlow processes non-controller bound flow. We assume that anything that is
@@ -217,7 +232,7 @@
// goto-statement. We also assume that the inner tag is applied at the ONU, while the outer tag is
// applied at the OLT
func (fd *FlowDecomposer) processUpstreamNonControllerBoundFlow(ctx context.Context,
- path []route.Hop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
+ path []route.Hop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
log.Debugw("upstream-non-controller-bound-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
deviceRules := fu.NewDeviceRules()
@@ -232,7 +247,7 @@
log.Debugw("decomposing-onu-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
if outPortNo != 0 {
log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
- return deviceRules
+ return deviceRules, nil
}
fa := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
@@ -249,7 +264,11 @@
fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
fg := fu.NewFlowsAndGroups()
- fg.AddFlow(fu.MkFlowStat(fa))
+ fs, err := fu.MkFlowStat(fa)
+ if err != nil {
+ return nil, err
+ }
+ fg.AddFlow(fs)
deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
} else if flow.TableId == 1 && outPortNo != 0 {
log.Debugw("decomposing-olt-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
@@ -269,15 +288,19 @@
fa.Actions = filteredAction
fg := fu.NewFlowsAndGroups()
- fg.AddFlow(fu.MkFlowStat(fa))
+ fs, err := fu.MkFlowStat(fa)
+ if err != nil {
+ return nil, err
+ }
+ fg.AddFlow(fs)
deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
}
- return deviceRules
+ return deviceRules, nil
}
// processDownstreamFlowWithNextTable decomposes downstream flows containing next table ID instructions
func (fd *FlowDecomposer) processDownstreamFlowWithNextTable(ctx context.Context, agent coreif.LogicalDeviceAgent, path []route.Hop,
- inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
+ inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
log.Debugw("decomposing-olt-flow-in-downstream-flow-with-next-table", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
deviceRules := fu.NewDeviceRules()
meterID := fu.GetMeterIdFromFlow(flow)
@@ -285,12 +308,12 @@
if outPortNo != 0 {
log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
- return deviceRules
+ return deviceRules, nil
}
if flow.TableId != 0 {
log.Warnw("This is not olt pipeline table, so skipping", log.Fields{"tableId": flow.TableId})
- return deviceRules
+ return deviceRules, nil
}
ingressHop := path[0]
@@ -302,18 +325,18 @@
recalculatedRoute, err := agent.GetRoute(ctx, inPortNo, portNumber)
if err != nil {
log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "metadata": metadataFromwriteMetadata, "error": err})
- return deviceRules
+ return deviceRules, nil
}
switch len(recalculatedRoute) {
case 0:
log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
//TODO: Delete flow
- return deviceRules
+ return deviceRules, nil
case 2:
log.Debugw("route-found", log.Fields{"ingressHop": ingressHop, "egressHop": egressHop})
default:
log.Errorw("invalid-route-length", log.Fields{"routeLen": len(path)})
- return deviceRules
+ return deviceRules, nil
}
ingressHop = recalculatedRoute[0]
}
@@ -321,7 +344,7 @@
if innerTag == 0 {
log.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
//TODO: Delete flow
- return deviceRules
+ return deviceRules, nil
}
fa := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
@@ -339,7 +362,11 @@
fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
fg := fu.NewFlowsAndGroups()
- fg.AddFlow(fu.MkFlowStat(fa))
+ fs, err := fu.MkFlowStat(fa)
+ if err != nil {
+ return nil, err
+ }
+ fg.AddFlow(fs)
deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
} else { // Create standard flow
log.Debugw("creating-standard-flow", log.Fields{"flow": flow})
@@ -358,16 +385,20 @@
fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
fg := fu.NewFlowsAndGroups()
- fg.AddFlow(fu.MkFlowStat(fa))
+ fs, err := fu.MkFlowStat(fa)
+ if err != nil {
+ return nil, err
+ }
+ fg.AddFlow(fs)
deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
}
- return deviceRules
+ return deviceRules, nil
}
// processUnicastFlow decomposes unicast flows
func (fd *FlowDecomposer) processUnicastFlow(ctx context.Context, path []route.Hop,
- inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
+ inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
log.Debugw("decomposing-onu-flow-in-downstream-unicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
deviceRules := fu.NewDeviceRules()
@@ -391,9 +422,13 @@
fa.Actions = filteredAction
fg := fu.NewFlowsAndGroups()
- fg.AddFlow(fu.MkFlowStat(fa))
+ fs, err := fu.MkFlowStat(fa)
+ if err != nil {
+ return nil, err
+ }
+ fg.AddFlow(fs)
deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
- return deviceRules
+ return deviceRules, nil
}
// processMulticastFlow decompose multicast flows
@@ -457,7 +492,10 @@
// Process controller bound flow
if outPortNo != 0 && (outPortNo&0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER) {
- deviceRules = fd.processControllerBoundFlow(ctx, agent, path, inPortNo, outPortNo, flow)
+ deviceRules, err = fd.processControllerBoundFlow(ctx, agent, path, inPortNo, outPortNo, flow)
+ if err != nil {
+ return nil, err
+ }
} else {
var ingressDevice *voltha.Device
var err error
@@ -467,13 +505,22 @@
isUpstream := !ingressDevice.Root
if isUpstream { // Unicast OLT and ONU UL
log.Debug("process-olt-nd-onu-upstream-noncontrollerbound-unicast-flows", log.Fields{"flows": flow})
- deviceRules = fd.processUpstreamNonControllerBoundFlow(ctx, path, inPortNo, outPortNo, flow)
+ deviceRules, err = fd.processUpstreamNonControllerBoundFlow(ctx, path, inPortNo, outPortNo, flow)
+ if err != nil {
+ return nil, err
+ }
} else if fu.HasNextTable(flow) && flow.TableId == 0 { // Unicast OLT flow DL
log.Debugw("process-olt-downstream-noncontrollerbound-flow-with-nexttable", log.Fields{"flows": flow})
- deviceRules = fd.processDownstreamFlowWithNextTable(ctx, agent, path, inPortNo, outPortNo, flow)
+ deviceRules, err = fd.processDownstreamFlowWithNextTable(ctx, agent, path, inPortNo, outPortNo, flow)
+ if err != nil {
+ return nil, err
+ }
} else if flow.TableId == 1 && outPortNo != 0 { // Unicast ONU flow DL
log.Debugw("process-onu-downstream-unicast-flow", log.Fields{"flows": flow})
- deviceRules = fd.processUnicastFlow(ctx, path, inPortNo, outPortNo, flow)
+ deviceRules, err = fd.processUnicastFlow(ctx, path, inPortNo, outPortNo, flow)
+ if err != nil {
+ return nil, err
+ }
} else if grpID := fu.GetGroup(flow); grpID != 0 && flow.TableId == 0 { //Multicast
log.Debugw("process-multicast-flow", log.Fields{"flows": flow})
deviceRules = fd.processMulticastFlow(ctx, path, inPortNo, outPortNo, flow, grpID, groupMap)
@@ -481,6 +528,6 @@
return deviceRules, status.Errorf(codes.Aborted, "unknown downstream flow %v", *flow)
}
}
- deviceRules = fd.updateOutputPortForControllerBoundFlowForParentDevide(flow, deviceRules)
- return deviceRules, nil
+ deviceRules, err = fd.updateOutputPortForControllerBoundFlowForParentDevide(flow, deviceRules)
+ return deviceRules, err
}