[VOL-1385] Remove parent's device flows after child deletion
This commit fixes the following:
1) Do not automatically raise an error when no routes can be
found when decomposing a flow. In some cases flows can still
be decomposed (e.g. some trap flows).
2) Delete flows from a parent device when receiving delete flow
instructions from the OF controller after a child device has
been deleted (previously was failing as no route could be
obtained).
Change-Id: I33dd45d52626146f0a6b4668048c979b5c931f9c
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 8d18e10..264b44f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -574,6 +574,36 @@
return nil
}
+//filterOutFlows removes flows from a device using the uni-port as filter
+func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
+ device, err := agent.getDevice(ctx)
+ if err != nil {
+ return err
+ }
+ existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
+ var flowsToDelete []*ofp.OfpFlowStats
+
+ // If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
+ for _, flow := range existingFlows.Items {
+ if fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort) {
+ flowsToDelete = append(flowsToDelete, flow)
+ }
+ }
+ logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
+ if len(flowsToDelete) == 0 {
+ return nil
+ }
+
+ response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDelete, []*ofp.OfpGroupEntry{}, flowMetadata)
+ if err != nil {
+ return err
+ }
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
+ }
+ return nil
+}
+
func (agent *Agent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw("updateFlowsAndGroups", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index c6e4e73..42cbc4e 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -563,7 +563,10 @@
}
logger.Debug("Generation of device route required")
if err := agent.buildRoutes(ctx); err != nil {
- return err
+ // No Route is not an error
+ if !errors.Is(err, route.ErrNoRoute) {
+ return err
+ }
}
return nil
}
@@ -1023,19 +1026,32 @@
logger.Error("Meter-referred-in-flows-not-present")
return errors.New("Meter-referred-in-flows-not-present")
}
+
+ var respChnls []coreutils.Response
+ var partialRoute bool
deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
if err != nil {
- return err
+ // A no route error means no route exists between the ports specified in the flow. This can happen when the
+ // child device is deleted and a request to delete flows from the parent device is received
+ if !errors.Is(err, route.ErrNoRoute) {
+ logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": toDelete, "error": err})
+ return err
+ }
+ partialRoute = true
}
- logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+ // Update the dB
if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: toKeep}); err != nil {
logger.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
// Update the devices
- respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ if partialRoute {
+ respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: toDelete}, &flowMetadata)
+ } else {
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ }
// Wait for the responses
go func() {
@@ -1083,7 +1099,7 @@
ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
- logger.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
+ logger.Errorw("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
}
response.Done()
@@ -1103,7 +1119,7 @@
ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
- logger.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
+ logger.Errorw("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
}
response.Done()
@@ -1112,6 +1128,49 @@
return responses
}
+// getUNILogicalPortNo returns the UNI logical port number specified in the flow
+func (agent *LogicalAgent) getUNILogicalPortNo(flow *ofp.OfpFlowStats) (uint32, error) {
+ var uniPort uint32
+ inPortNo := fu.GetInPort(flow)
+ outPortNo := fu.GetOutPort(flow)
+ if agent.isNNIPort(inPortNo) {
+ uniPort = outPortNo
+ } else if agent.isNNIPort(outPortNo) {
+ uniPort = inPortNo
+ }
+ if uniPort != 0 {
+ return uniPort, nil
+ }
+ return 0, status.Errorf(codes.NotFound, "no-uni-port: %v", flow)
+}
+
+func (agent *LogicalAgent) deleteFlowsFromParentDevice(ctx context.Context, flows ofp.Flows, metadata *voltha.FlowMetadata) []coreutils.Response {
+ logger.Debugw("deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
+ responses := make([]coreutils.Response, 0)
+ for _, flow := range flows.Items {
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ uniPort, err := agent.getUNILogicalPortNo(flow)
+ if err != nil {
+ logger.Error("no-uni-port-in-flow", log.Fields{"deviceID": agent.rootDeviceID, "flow": flow, "error": err})
+ response.Error(err)
+ response.Done()
+ continue
+ }
+ logger.Debugw("uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
+ go func(uniPort uint32, metadata *voltha.FlowMetadata) {
+ ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ defer cancel()
+ if err := agent.deviceMgr.deleteParentFlows(ctx, agent.rootDeviceID, uniPort, metadata); err != nil {
+ logger.Error("flow-delete-failed", log.Fields{"device-id": agent.rootDeviceID, "error": err})
+ response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s %v", agent.rootDeviceID, err))
+ }
+ response.Done()
+ }(uniPort, metadata)
+ }
+ return responses
+}
+
//flowDeleteStrict deletes a flow from the flow table of that logical device
func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
logger.Debug("flowDeleteStrict")
@@ -1172,19 +1231,31 @@
logger.Error("meter-referred-in-flows-not-present")
return err
}
+ var respChnls []coreutils.Response
+ var partialRoute bool
deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
if err != nil {
- return err
+ // A no route error means no route exists between the ports specified in the flow. This can happen when the
+ // child device is deleted and a request to delete flows from the parent device is received
+ if !errors.Is(err, route.ErrNoRoute) {
+ logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": flowsToDelete, "error": err})
+ return err
+ }
+ partialRoute = true
}
- logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+ // Update the dB
if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
logger.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
// Update the devices
- respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ if partialRoute {
+ respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: flowsToDelete}, &flowMetadata)
+ } else {
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ }
// Wait for completion
go func() {
@@ -1530,7 +1601,7 @@
return routes, nil
}
}
- return nil, status.Errorf(codes.FailedPrecondition, "no upstream route from:%d to:%d", ingressPortNo, egressPortNo)
+ return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
//treat it as if the output port is the first NNI of the OLT
var err error
@@ -1552,7 +1623,7 @@
return routes, nil
}
}
- return nil, status.Errorf(codes.FailedPrecondition, "no upstream route from:%d to:%d", ingressPortNo, egressPortNo)
+ return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
//If egress port is not specified (nil), we can also can return a "half" route
if egressPortNo == 0 {
@@ -1563,7 +1634,7 @@
return routes, nil
}
}
- return nil, status.Errorf(codes.FailedPrecondition, "no downstream route from:%d to:%d", ingressPortNo, egressPortNo)
+ return nil, fmt.Errorf("no downstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
// Return the pre-calculated route
return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 48edc5b..cd73f35 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -764,6 +764,18 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
+// deleteParentFlows removes flows from the parent device based on specific attributes
+func (dMgr *Manager) deleteParentFlows(ctx context.Context, deviceID string, uniPort uint32, metadata *voltha.FlowMetadata) error {
+ logger.Debugw("deleteParentFlows", log.Fields{"device-id": deviceID, "uni-port": uniPort, "metadata": metadata})
+ if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
+ if !agent.isRootdevice {
+ return status.Errorf(codes.FailedPrecondition, "not-a-parent-device-%s", deviceID)
+ }
+ return agent.filterOutFlows(ctx, uniPort, metadata)
+ }
+ return status.Errorf(codes.NotFound, "%s", deviceID)
+}
+
func (dMgr *Manager) deleteFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
logger.Debugw("deleteFlowsAndGroups", log.Fields{"deviceid": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index 94594b0..bfeccf6 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -18,6 +18,7 @@
import (
"context"
+ "fmt"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/coreif"
"github.com/opencord/voltha-go/rw_core/route"
@@ -468,17 +469,16 @@
deviceRules := fu.NewDeviceRules()
path, err := agent.GetRoute(ctx, inPortNo, outPortNo)
if err != nil {
- logger.Errorw("no-route", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "error": err})
return deviceRules, err
}
switch len(path) {
case 0:
- return deviceRules, status.Errorf(codes.FailedPrecondition, "no route from:%d to:%d", inPortNo, outPortNo)
+ return deviceRules, fmt.Errorf("no route from:%d to:%d :%w", inPortNo, outPortNo, route.ErrNoRoute)
case 2:
logger.Debugw("route-found", log.Fields{"ingressHop": path[0], "egressHop": path[1]})
default:
- return deviceRules, status.Errorf(codes.Aborted, "invalid route length %d", len(path))
+ return deviceRules, fmt.Errorf("invalid route length %d :%w", len(path), route.ErrNoRoute)
}
// Process controller bound flow
@@ -491,7 +491,9 @@
var ingressDevice *voltha.Device
var err error
if ingressDevice, err = fd.deviceMgr.GetDevice(ctx, path[0].DeviceID); err != nil {
- return deviceRules, err
+ // This can happen in a race condition where a device is deleted right after we obtain a
+ // route involving the device (GetRoute() above). Handle it as a no route event as well.
+ return deviceRules, fmt.Errorf("get-device-error :%v :%w", err, route.ErrNoRoute)
}
isUpstream := !ingressDevice.Root
if isUpstream { // Unicast OLT and ONU UL
diff --git a/rw_core/route/device_route.go b/rw_core/route/device_route.go
index a36c841..292ef83 100644
--- a/rw_core/route/device_route.go
+++ b/rw_core/route/device_route.go
@@ -18,6 +18,7 @@
import (
"context"
+ "errors"
"fmt"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -26,6 +27,8 @@
"sync"
)
+var ErrNoRoute = errors.New("no route")
+
// Hop represent a route hop
type Hop struct {
DeviceID string
@@ -95,7 +98,7 @@
}()
if len(lps) < 2 {
- return status.Error(codes.FailedPrecondition, "not-enough-logical-ports")
+ return fmt.Errorf("not enough logical port :%w", ErrNoRoute)
}
dr.reset()
@@ -112,8 +115,7 @@
}
}
if len(nniPorts) == 0 {
- err = status.Error(codes.FailedPrecondition, "no nni port")
- return err
+ return fmt.Errorf("no nni port :%w", ErrNoRoute)
}
var rootDevice *voltha.Device
var childDevice *voltha.Device