VOL-3505 Send an ofp_error_msg on flow add/delete error

Change-Id: I3791d4ab7ae0f7730f52988234d0c99af2cea75f
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 5817c64..f1a6ce8 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"encoding/hex"
+	"fmt"
 	"sync"
 
 	"github.com/golang/protobuf/ptypes/empty"
@@ -139,6 +140,26 @@
 	}
 }
 
+func (q *Manager) SendFlowChangeEvent(ctx context.Context, deviceID string, res []error, xid uint32) {
+	logger.Debugw(ctx, "SendChangeEvent", log.Fields{"device-id": deviceID, "flowId": xid})
+	errorType := openflow_13.OfpErrorType_OFPET_FLOW_MOD_FAILED
+	q.changeEventQueue <- openflow_13.ChangeEvent{
+		Id: deviceID,
+		Event: &openflow_13.ChangeEvent_Error{
+			Error: &openflow_13.OfpErrorMsg{
+				Header: &openflow_13.OfpHeader{
+					Version: 0,
+					Type:    openflow_13.OfpType_OFPT_FLOW_MOD,
+					Xid:     xid,
+				},
+				Type: uint32(errorType),
+				Code: uint32(openflow_13.OfpFlowModFailedCode_OFPFMFC_UNKNOWN),
+				Data: []byte(fmt.Sprintf("%v", res[:])),
+			},
+		},
+	}
+}
+
 // ReceiveChangeEvents receives change in events
 func (q *Manager) ReceiveChangeEvents(_ *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
 	ctx := context.Background()
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 9247e57..6c3a2b8 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -47,13 +47,13 @@
 }
 
 //updateFlowTable updates the flow table of that logical device
-func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.FlowTableUpdate) error {
 	logger.Debug(ctx, "UpdateFlowTable")
 	if flow == nil {
 		return nil
 	}
 
-	switch flow.GetCommand() {
+	switch flow.FlowMod.GetCommand() {
 	case ofp.OfpFlowModCommand_OFPFC_ADD:
 		return agent.flowAdd(ctx, flow)
 	case ofp.OfpFlowModCommand_OFPFC_DELETE:
@@ -66,11 +66,12 @@
 		return agent.flowModifyStrict(flow)
 	}
 	return status.Errorf(codes.Internal,
-		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.GetCommand())
+		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.FlowMod.GetCommand())
 }
 
 //flowAdd adds a flow to the flow table of that logical device
-func (agent *LogicalAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowAdd(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
+	mod := flowUpdate.FlowMod
 	logger.Debugw(ctx, "flowAdd", log.Fields{"flow": mod})
 	if mod == nil {
 		return nil
@@ -82,7 +83,7 @@
 	}
 	var updated bool
 	var changed bool
-	if changed, updated, err = agent.decomposeAndAdd(ctx, flow, mod); err != nil {
+	if changed, updated, err = agent.decomposeAndAdd(ctx, flow, flowUpdate); err != nil {
 		logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
 		return err
 	}
@@ -95,9 +96,10 @@
 
 }
 
-func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
+func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, flowUpdate *ofp.FlowTableUpdate) (bool, bool, error) {
 	changed := false
 	updated := false
+	mod := flowUpdate.FlowMod
 	var flowToReplace *ofp.OfpFlowStats
 
 	//if flow is not found in the map, create a new entry, otherwise get the existing one.
@@ -165,6 +167,7 @@
 			}
 		}
 		respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, toMetadata(flowMeterConfig))
+
 		// Create the go routines to wait
 		go func() {
 			// Wait for completion
@@ -184,6 +187,8 @@
 						"groups":            groups,
 					})
 				}
+				// send event
+				agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid)
 			}
 		}()
 	}
@@ -237,8 +242,9 @@
 }
 
 //flowDelete deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowDelete(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
 	logger.Debug(ctx, "flowDelete")
+	mod := flowUpdate.FlowMod
 	if mod == nil {
 		return nil
 	}
@@ -332,6 +338,8 @@
 			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
 				logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
 				// TODO: Revert the flow deletion
+				// send event, and allow any queued events to be sent as well
+				agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid)
 			}
 		}()
 	}
@@ -340,7 +348,8 @@
 }
 
 //flowDeleteStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
+	mod := flowUpdate.FlowMod
 	logger.Debugw(ctx, "flowDeleteStrict", log.Fields{"mod": mod})
 	if mod == nil {
 		return nil
@@ -409,7 +418,9 @@
 				"logical-device-id": agent.logicalDeviceID,
 				"errors":            res,
 			})
-			//TODO: Revert flow changes
+			// TODO: Revert flow changes
+			// send event, and allow any queued events to be sent as well
+			agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid)
 		}
 	}()
 
@@ -417,12 +428,12 @@
 }
 
 //flowModify modifies a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModify(flowUpdate *ofp.FlowTableUpdate) error {
 	return errors.New("flowModify not implemented")
 }
 
 //flowModifyStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModifyStrict(flowUpdate *ofp.FlowTableUpdate) error {
 	return errors.New("flowModifyStrict not implemented")
 }
 
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 8cffd78..2fca7c5 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -449,7 +449,7 @@
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
 	}
-	return &empty.Empty{}, agent.updateFlowTable(ctx, flow.FlowMod)
+	return &empty.Empty{}, agent.updateFlowTable(ctx, flow)
 }
 
 // UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response