VOL-3505 Send an ofp_error_msg on flow add/delete error

Change-Id: I3791d4ab7ae0f7730f52988234d0c99af2cea75f
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 9247e57..6c3a2b8 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -47,13 +47,13 @@
 }
 
 //updateFlowTable updates the flow table of that logical device
-func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.FlowTableUpdate) error {
 	logger.Debug(ctx, "UpdateFlowTable")
 	if flow == nil {
 		return nil
 	}
 
-	switch flow.GetCommand() {
+	switch flow.FlowMod.GetCommand() {
 	case ofp.OfpFlowModCommand_OFPFC_ADD:
 		return agent.flowAdd(ctx, flow)
 	case ofp.OfpFlowModCommand_OFPFC_DELETE:
@@ -66,11 +66,12 @@
 		return agent.flowModifyStrict(flow)
 	}
 	return status.Errorf(codes.Internal,
-		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.GetCommand())
+		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.FlowMod.GetCommand())
 }
 
 //flowAdd adds a flow to the flow table of that logical device
-func (agent *LogicalAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowAdd(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
+	mod := flowUpdate.FlowMod
 	logger.Debugw(ctx, "flowAdd", log.Fields{"flow": mod})
 	if mod == nil {
 		return nil
@@ -82,7 +83,7 @@
 	}
 	var updated bool
 	var changed bool
-	if changed, updated, err = agent.decomposeAndAdd(ctx, flow, mod); err != nil {
+	if changed, updated, err = agent.decomposeAndAdd(ctx, flow, flowUpdate); err != nil {
 		logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
 		return err
 	}
@@ -95,9 +96,10 @@
 
 }
 
-func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
+func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, flowUpdate *ofp.FlowTableUpdate) (bool, bool, error) {
 	changed := false
 	updated := false
+	mod := flowUpdate.FlowMod
 	var flowToReplace *ofp.OfpFlowStats
 
 	//if flow is not found in the map, create a new entry, otherwise get the existing one.
@@ -165,6 +167,7 @@
 			}
 		}
 		respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, toMetadata(flowMeterConfig))
+
 		// Create the go routines to wait
 		go func() {
 			// Wait for completion
@@ -184,6 +187,8 @@
 						"groups":            groups,
 					})
 				}
+				// send event
+				agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid)
 			}
 		}()
 	}
@@ -237,8 +242,9 @@
 }
 
 //flowDelete deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowDelete(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
 	logger.Debug(ctx, "flowDelete")
+	mod := flowUpdate.FlowMod
 	if mod == nil {
 		return nil
 	}
@@ -332,6 +338,8 @@
 			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
 				logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
 				// TODO: Revert the flow deletion
+				// send event, and allow any queued events to be sent as well
+				agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid)
 			}
 		}()
 	}
@@ -340,7 +348,8 @@
 }
 
 //flowDeleteStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
+	mod := flowUpdate.FlowMod
 	logger.Debugw(ctx, "flowDeleteStrict", log.Fields{"mod": mod})
 	if mod == nil {
 		return nil
@@ -409,7 +418,9 @@
 				"logical-device-id": agent.logicalDeviceID,
 				"errors":            res,
 			})
-			//TODO: Revert flow changes
+			// TODO: Revert flow changes
+			// send event, and allow any queued events to be sent as well
+			agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid)
 		}
 	}()
 
@@ -417,12 +428,12 @@
 }
 
 //flowModify modifies a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModify(flowUpdate *ofp.FlowTableUpdate) error {
 	return errors.New("flowModify not implemented")
 }
 
 //flowModifyStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModifyStrict(flowUpdate *ofp.FlowTableUpdate) error {
 	return errors.New("flowModifyStrict not implemented")
 }