[VOL-4478]  If a flow operation fails, the OLT adapter should return an error to the core.

Change-Id: Ie948e9a355ddf3838179a32112e4d17ae3f70b41
diff --git a/VERSION b/VERSION
index 4d0dcda..de197cc 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-4.1.2
+4.1.3
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 57833fa..3f09ba4 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -384,9 +384,7 @@
 	/* Flows can be added specific to gemport if p-bits are received.
 	 * If no pbit mentioned then adding flows for all gemports
 	 */
-	f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
-
-	return nil
+	return f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
 }
 
 // CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
@@ -2834,7 +2832,7 @@
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) checkAndAddFlow(ctx context.Context, args map[string]uint32, classifierInfo map[string]interface{},
 	actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpInst interface{}, gemPorts []uint32,
-	tpID uint32, uni string) {
+	tpID uint32, uni string) error {
 	var gemPortID uint32
 	intfID := args[IntfID]
 	onuID := args[OnuID]
@@ -2856,7 +2854,7 @@
 		}
 	default:
 		logger.Errorw(ctx, "unsupported-tech", log.Fields{"tpInst": TpInst})
-		return
+		return olterrors.NewErrInvalidValue(log.Fields{"tpInst": TpInst}, nil)
 	}
 
 	if len(gemPorts) == 1 {
@@ -2900,6 +2898,7 @@
 			//Adding DHCP upstream flow
 			if err := f.addDHCPTrapFlow(ctx, flowContext); err != nil {
 				logger.Warn(ctx, err)
+				return err
 			}
 
 		} else if ipProto.(uint32) == IgmpProto {
@@ -2911,10 +2910,11 @@
 					"classifier-info:": classifierInfo})
 			if err := f.addIGMPTrapFlow(ctx, flowContext); err != nil {
 				logger.Warn(ctx, err)
+				return err
 			}
 		} else {
 			logger.Errorw(ctx, "invalid-classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
-			return
+			return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifierInfo, "action": actionInfo}, nil)
 		}
 	} else if ethType, ok := classifierInfo[EthType]; ok {
 		if ethType.(uint32) == EapEthType {
@@ -2932,6 +2932,7 @@
 			}
 			if err := f.addEthTypeBasedFlow(ctx, flowContext, vlanID, ethType.(uint32)); err != nil {
 				logger.Warn(ctx, err)
+				return err
 			}
 		} else if ethType.(uint32) == PPPoEDEthType {
 			logger.Infow(ctx, "adding-pppoed-flow", log.Fields{
@@ -2944,6 +2945,7 @@
 			//Adding PPPOED upstream flow
 			if err := f.addUpstreamTrapFlow(ctx, flowContext); err != nil {
 				logger.Warn(ctx, err)
+				return err
 			}
 		}
 	} else if direction == tp_pb.Direction_UPSTREAM {
@@ -2955,6 +2957,7 @@
 		//Adding HSIA upstream flow
 		if err := f.addUpstreamDataPathFlow(ctx, flowContext); err != nil {
 			logger.Warn(ctx, err)
+			return err
 		}
 	} else if direction == tp_pb.Direction_DOWNSTREAM {
 		logger.Infow(ctx, "adding-downstream-data-rule", log.Fields{
@@ -2965,17 +2968,17 @@
 		//Adding HSIA downstream flow
 		if err := f.addDownstreamDataPathFlow(ctx, flowContext); err != nil {
 			logger.Warn(ctx, err)
+			return err
 		}
 	} else {
-		logger.Errorw(ctx, "invalid-flow-type-to-handle",
-			log.Fields{
-				"intf-id":    intfID,
-				"onu-id":     onuID,
-				"uni-id":     uniID,
-				"classifier": classifierInfo,
-				"action":     actionInfo,
-				"flow":       flow})
-		return
+		return olterrors.NewErrInvalidValue(log.Fields{
+			"intf-id":    intfID,
+			"onu-id":     onuID,
+			"uni-id":     uniID,
+			"classifier": classifierInfo,
+			"action":     actionInfo,
+			"flow":       flow},
+			nil).Log()
 	}
 	// Send Techprofile download event to child device in go routine as it takes time
 	go func() {
@@ -2983,6 +2986,7 @@
 			logger.Warn(ctx, err)
 		}
 	}()
+	return nil
 }
 
 func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32) bool {
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 45cd79e..7154225 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -1212,8 +1212,12 @@
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			flowMgr[tt.args.intfID].checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
+			err := flowMgr[tt.args.intfID].checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
 				tt.args.TpInst, tt.args.gemPorts, tt.args.TpID, tt.args.uni)
+			if err != nil {
+				t.Error("check-and-add-flow failed", err)
+				return
+			}
 		})
 	}
 }