VOL-3505 Send an ofp_error_msg on flow add/delete error
Change-Id: I3791d4ab7ae0f7730f52988234d0c99af2cea75f
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 5817c64..f1a6ce8 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -19,6 +19,7 @@
import (
"context"
"encoding/hex"
+ "fmt"
"sync"
"github.com/golang/protobuf/ptypes/empty"
@@ -139,6 +140,26 @@
}
}
+func (q *Manager) SendFlowChangeEvent(ctx context.Context, deviceID string, res []error, xid uint32) {
+ logger.Debugw(ctx, "SendChangeEvent", log.Fields{"device-id": deviceID, "flowId": xid})
+ errorType := openflow_13.OfpErrorType_OFPET_FLOW_MOD_FAILED
+ q.changeEventQueue <- openflow_13.ChangeEvent{
+ Id: deviceID,
+ Event: &openflow_13.ChangeEvent_Error{
+ Error: &openflow_13.OfpErrorMsg{
+ Header: &openflow_13.OfpHeader{
+ Version: 0,
+ Type: openflow_13.OfpType_OFPT_FLOW_MOD,
+ Xid: xid,
+ },
+ Type: uint32(errorType),
+ Code: uint32(openflow_13.OfpFlowModFailedCode_OFPFMFC_UNKNOWN),
+ Data: []byte(fmt.Sprintf("%v", res[:])),
+ },
+ },
+ }
+}
+
// ReceiveChangeEvents receives change in events
func (q *Manager) ReceiveChangeEvents(_ *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
ctx := context.Background()
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 9247e57..6c3a2b8 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -47,13 +47,13 @@
}
//updateFlowTable updates the flow table of that logical device
-func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.FlowTableUpdate) error {
logger.Debug(ctx, "UpdateFlowTable")
if flow == nil {
return nil
}
- switch flow.GetCommand() {
+ switch flow.FlowMod.GetCommand() {
case ofp.OfpFlowModCommand_OFPFC_ADD:
return agent.flowAdd(ctx, flow)
case ofp.OfpFlowModCommand_OFPFC_DELETE:
@@ -66,11 +66,12 @@
return agent.flowModifyStrict(flow)
}
return status.Errorf(codes.Internal,
- "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.GetCommand())
+ "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.FlowMod.GetCommand())
}
//flowAdd adds a flow to the flow table of that logical device
-func (agent *LogicalAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowAdd(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
+ mod := flowUpdate.FlowMod
logger.Debugw(ctx, "flowAdd", log.Fields{"flow": mod})
if mod == nil {
return nil
@@ -82,7 +83,7 @@
}
var updated bool
var changed bool
- if changed, updated, err = agent.decomposeAndAdd(ctx, flow, mod); err != nil {
+ if changed, updated, err = agent.decomposeAndAdd(ctx, flow, flowUpdate); err != nil {
logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
return err
}
@@ -95,9 +96,10 @@
}
-func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
+func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, flowUpdate *ofp.FlowTableUpdate) (bool, bool, error) {
changed := false
updated := false
+ mod := flowUpdate.FlowMod
var flowToReplace *ofp.OfpFlowStats
//if flow is not found in the map, create a new entry, otherwise get the existing one.
@@ -165,6 +167,7 @@
}
}
respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, toMetadata(flowMeterConfig))
+
// Create the go routines to wait
go func() {
// Wait for completion
@@ -184,6 +187,8 @@
"groups": groups,
})
}
+ // send event
+ agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid)
}
}()
}
@@ -237,8 +242,9 @@
}
//flowDelete deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowDelete(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
logger.Debug(ctx, "flowDelete")
+ mod := flowUpdate.FlowMod
if mod == nil {
return nil
}
@@ -332,6 +338,8 @@
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
// TODO: Revert the flow deletion
+ // send event, and allow any queued events to be sent as well
+ agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid)
}
}()
}
@@ -340,7 +348,8 @@
}
//flowDeleteStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
+ mod := flowUpdate.FlowMod
logger.Debugw(ctx, "flowDeleteStrict", log.Fields{"mod": mod})
if mod == nil {
return nil
@@ -409,7 +418,9 @@
"logical-device-id": agent.logicalDeviceID,
"errors": res,
})
- //TODO: Revert flow changes
+ // TODO: Revert flow changes
+ // send event, and allow any queued events to be sent as well
+ agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid)
}
}()
@@ -417,12 +428,12 @@
}
//flowModify modifies a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModify(flowUpdate *ofp.FlowTableUpdate) error {
return errors.New("flowModify not implemented")
}
//flowModifyStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModifyStrict(flowUpdate *ofp.FlowTableUpdate) error {
return errors.New("flowModifyStrict not implemented")
}
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 8cffd78..2fca7c5 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -449,7 +449,7 @@
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
}
- return &empty.Empty{}, agent.updateFlowTable(ctx, flow.FlowMod)
+ return &empty.Empty{}, agent.updateFlowTable(ctx, flow)
}
// UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response