VOL-2632 Error propagation from HashFlowStats

Change-Id: If2872e97e2b6c3c751f64dadfef47bfde3a77551
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index 9e996dc..16754b9 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -74,7 +74,7 @@
 
 // Handles special case of any controller-bound flow for a parent device
 func (fd *FlowDecomposer) updateOutputPortForControllerBoundFlowForParentDevide(flow *ofp.OfpFlowStats,
-	dr *fu.DeviceRules) *fu.DeviceRules {
+	dr *fu.DeviceRules) (*fu.DeviceRules, error) {
 	EAPOL := fu.EthType(0x888e)
 	IGMP := fu.IpProto(2)
 	UDP := fu.IpProto(17)
@@ -100,18 +100,21 @@
 						uint32(ofp.OfpPortNo_OFPP_CONTROLLER))
 				}
 				// Update flow Id as a change in the instruction field will result in a new flow ID
-				f.Id = fu.HashFlowStats(f)
+				var err error
+				if f.Id, err = fu.HashFlowStats(f); err != nil {
+					return nil, err
+				}
 				newDeviceRules.AddFlow(deviceID, (proto.Clone(f)).(*ofp.OfpFlowStats))
 			}
 		}
 	}
 
-	return newDeviceRules
+	return newDeviceRules, nil
 }
 
 //processControllerBoundFlow decomposes trap flows
 func (fd *FlowDecomposer) processControllerBoundFlow(ctx context.Context, agent coreif.LogicalDeviceAgent, path []route.Hop,
-	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
+	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
 
 	log.Debugw("trap-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "flow": flow})
 	deviceRules := fu.NewDeviceRules()
@@ -136,7 +139,11 @@
 		// Augment the matchfields with the ofpfields from the flow
 		fg := fu.NewFlowsAndGroups()
 		fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
-		fg.AddFlow(fu.MkFlowStat(fa))
+		fs, err := fu.MkFlowStat(fa)
+		if err != nil {
+			return nil, err
+		}
+		fg.AddFlow(fs)
 		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 	} else {
 		// Trap flow for UNI port
@@ -166,7 +173,11 @@
 			// Augment the matchfields with the ofpfields from the flow
 			faParent.MatchFields = append(faParent.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 			fgParent := fu.NewFlowsAndGroups()
-			fgParent.AddFlow(fu.MkFlowStat(faParent))
+			fs, err := fu.MkFlowStat(faParent)
+			if err != nil {
+				return nil, err
+			}
+			fgParent.AddFlow(fs)
 			deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fgParent)
 			log.Debugw("parent-trap-flow-set", log.Fields{"flow": faParent})
 
@@ -203,13 +214,17 @@
 				faChild.MatchFields = append(faChild.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
 			}
 			fgChild := fu.NewFlowsAndGroups()
-			fgChild.AddFlow(fu.MkFlowStat(faChild))
+			fs, err = fu.MkFlowStat(faChild)
+			if err != nil {
+				return nil, err
+			}
+			fgChild.AddFlow(fs)
 			deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fgChild)
 			log.Debugw("child-trap-flow-set", log.Fields{"flow": faChild})
 		}
 	}
 
-	return deviceRules
+	return deviceRules, nil
 }
 
 // processUpstreamNonControllerBoundFlow processes non-controller bound flow. We assume that anything that is
@@ -217,7 +232,7 @@
 // goto-statement. We also assume that the inner tag is applied at the ONU, while the outer tag is
 // applied at the OLT
 func (fd *FlowDecomposer) processUpstreamNonControllerBoundFlow(ctx context.Context,
-	path []route.Hop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
+	path []route.Hop, inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
 
 	log.Debugw("upstream-non-controller-bound-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
 	deviceRules := fu.NewDeviceRules()
@@ -232,7 +247,7 @@
 		log.Debugw("decomposing-onu-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
 		if outPortNo != 0 {
 			log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
-			return deviceRules
+			return deviceRules, nil
 		}
 		fa := &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
@@ -249,7 +264,11 @@
 		fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
+		fs, err := fu.MkFlowStat(fa)
+		if err != nil {
+			return nil, err
+		}
+		fg.AddFlow(fs)
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 	} else if flow.TableId == 1 && outPortNo != 0 {
 		log.Debugw("decomposing-olt-flow-in-upstream-has-next-table", log.Fields{"table_id": flow.TableId})
@@ -269,15 +288,19 @@
 		fa.Actions = filteredAction
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
+		fs, err := fu.MkFlowStat(fa)
+		if err != nil {
+			return nil, err
+		}
+		fg.AddFlow(fs)
 		deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
 	}
-	return deviceRules
+	return deviceRules, nil
 }
 
 // processDownstreamFlowWithNextTable decomposes downstream flows containing next table ID instructions
 func (fd *FlowDecomposer) processDownstreamFlowWithNextTable(ctx context.Context, agent coreif.LogicalDeviceAgent, path []route.Hop,
-	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
+	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
 	log.Debugw("decomposing-olt-flow-in-downstream-flow-with-next-table", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
 	deviceRules := fu.NewDeviceRules()
 	meterID := fu.GetMeterIdFromFlow(flow)
@@ -285,12 +308,12 @@
 
 	if outPortNo != 0 {
 		log.Warnw("outPort-should-not-be-specified", log.Fields{"outPortNo": outPortNo})
-		return deviceRules
+		return deviceRules, nil
 	}
 
 	if flow.TableId != 0 {
 		log.Warnw("This is not olt pipeline table, so skipping", log.Fields{"tableId": flow.TableId})
-		return deviceRules
+		return deviceRules, nil
 	}
 
 	ingressHop := path[0]
@@ -302,18 +325,18 @@
 			recalculatedRoute, err := agent.GetRoute(ctx, inPortNo, portNumber)
 			if err != nil {
 				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "metadata": metadataFromwriteMetadata, "error": err})
-				return deviceRules
+				return deviceRules, nil
 			}
 			switch len(recalculatedRoute) {
 			case 0:
 				log.Errorw("no-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
 				//TODO: Delete flow
-				return deviceRules
+				return deviceRules, nil
 			case 2:
 				log.Debugw("route-found", log.Fields{"ingressHop": ingressHop, "egressHop": egressHop})
 			default:
 				log.Errorw("invalid-route-length", log.Fields{"routeLen": len(path)})
-				return deviceRules
+				return deviceRules, nil
 			}
 			ingressHop = recalculatedRoute[0]
 		}
@@ -321,7 +344,7 @@
 		if innerTag == 0 {
 			log.Errorw("no-inner-route-double-tag", log.Fields{"inPortNo": inPortNo, "outPortNo": portNumber, "comment": "deleting-flow", "metadata": metadataFromwriteMetadata})
 			//TODO: Delete flow
-			return deviceRules
+			return deviceRules, nil
 		}
 		fa := &fu.FlowArgs{
 			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
@@ -339,7 +362,11 @@
 		fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
+		fs, err := fu.MkFlowStat(fa)
+		if err != nil {
+			return nil, err
+		}
+		fg.AddFlow(fs)
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 	} else { // Create standard flow
 		log.Debugw("creating-standard-flow", log.Fields{"flow": flow})
@@ -358,16 +385,20 @@
 		fa.Actions = append(fa.Actions, fu.Output(ingressHop.Egress))
 
 		fg := fu.NewFlowsAndGroups()
-		fg.AddFlow(fu.MkFlowStat(fa))
+		fs, err := fu.MkFlowStat(fa)
+		if err != nil {
+			return nil, err
+		}
+		fg.AddFlow(fs)
 		deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
 	}
 
-	return deviceRules
+	return deviceRules, nil
 }
 
 // processUnicastFlow decomposes unicast flows
 func (fd *FlowDecomposer) processUnicastFlow(ctx context.Context, path []route.Hop,
-	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) *fu.DeviceRules {
+	inPortNo uint32, outPortNo uint32, flow *ofp.OfpFlowStats) (*fu.DeviceRules, error) {
 
 	log.Debugw("decomposing-onu-flow-in-downstream-unicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
 	deviceRules := fu.NewDeviceRules()
@@ -391,9 +422,13 @@
 	fa.Actions = filteredAction
 
 	fg := fu.NewFlowsAndGroups()
-	fg.AddFlow(fu.MkFlowStat(fa))
+	fs, err := fu.MkFlowStat(fa)
+	if err != nil {
+		return nil, err
+	}
+	fg.AddFlow(fs)
 	deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
-	return deviceRules
+	return deviceRules, nil
 }
 
 // processMulticastFlow decompose multicast flows
@@ -457,7 +492,10 @@
 
 	// Process controller bound flow
 	if outPortNo != 0 && (outPortNo&0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER) {
-		deviceRules = fd.processControllerBoundFlow(ctx, agent, path, inPortNo, outPortNo, flow)
+		deviceRules, err = fd.processControllerBoundFlow(ctx, agent, path, inPortNo, outPortNo, flow)
+		if err != nil {
+			return nil, err
+		}
 	} else {
 		var ingressDevice *voltha.Device
 		var err error
@@ -467,13 +505,22 @@
 		isUpstream := !ingressDevice.Root
 		if isUpstream { // Unicast OLT and ONU UL
 			log.Debug("process-olt-nd-onu-upstream-noncontrollerbound-unicast-flows", log.Fields{"flows": flow})
-			deviceRules = fd.processUpstreamNonControllerBoundFlow(ctx, path, inPortNo, outPortNo, flow)
+			deviceRules, err = fd.processUpstreamNonControllerBoundFlow(ctx, path, inPortNo, outPortNo, flow)
+			if err != nil {
+				return nil, err
+			}
 		} else if fu.HasNextTable(flow) && flow.TableId == 0 { // Unicast OLT flow DL
 			log.Debugw("process-olt-downstream-noncontrollerbound-flow-with-nexttable", log.Fields{"flows": flow})
-			deviceRules = fd.processDownstreamFlowWithNextTable(ctx, agent, path, inPortNo, outPortNo, flow)
+			deviceRules, err = fd.processDownstreamFlowWithNextTable(ctx, agent, path, inPortNo, outPortNo, flow)
+			if err != nil {
+				return nil, err
+			}
 		} else if flow.TableId == 1 && outPortNo != 0 { // Unicast ONU flow DL
 			log.Debugw("process-onu-downstream-unicast-flow", log.Fields{"flows": flow})
-			deviceRules = fd.processUnicastFlow(ctx, path, inPortNo, outPortNo, flow)
+			deviceRules, err = fd.processUnicastFlow(ctx, path, inPortNo, outPortNo, flow)
+			if err != nil {
+				return nil, err
+			}
 		} else if grpID := fu.GetGroup(flow); grpID != 0 && flow.TableId == 0 { //Multicast
 			log.Debugw("process-multicast-flow", log.Fields{"flows": flow})
 			deviceRules = fd.processMulticastFlow(ctx, path, inPortNo, outPortNo, flow, grpID, groupMap)
@@ -481,6 +528,6 @@
 			return deviceRules, status.Errorf(codes.Aborted, "unknown downstream flow %v", *flow)
 		}
 	}
-	deviceRules = fd.updateOutputPortForControllerBoundFlowForParentDevide(flow, deviceRules)
-	return deviceRules, nil
+	deviceRules, err = fd.updateOutputPortForControllerBoundFlowForParentDevide(flow, deviceRules)
+	return deviceRules, err
 }