[VOL-1668] Remove default flow rules from the Core
This commit cleans up the default flow rules that started with
VOL-1628.
Change-Id: I965e07b9bd2be1f25dca2cdf18a90aa4c16eea88
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index bb25b4b..176634d 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -35,26 +35,23 @@
)
type LogicalDeviceAgent struct {
- logicalDeviceId string
- //lastData *voltha.LogicalDevice
- rootDeviceId string
- deviceMgr *DeviceManager
- ldeviceMgr *LogicalDeviceManager
- clusterDataProxy *model.Proxy
- exitChannel chan int
- deviceGraph *graph.DeviceGraph
- DefaultFlowRules *fu.DeviceRules
- flowProxy *model.Proxy
- groupProxy *model.Proxy
- ldProxy *model.Proxy
- portProxies map[string]*model.Proxy
- portProxiesLock sync.RWMutex
- lockLogicalDevice sync.RWMutex
- logicalPortsNo map[uint32]bool //value is true for NNI port
- lockLogicalPortsNo sync.RWMutex
- flowDecomposer *fd.FlowDecomposer
- includeDefaultFlows bool
- defaultTimeout int64
+ logicalDeviceId string
+ rootDeviceId string
+ deviceMgr *DeviceManager
+ ldeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ deviceGraph *graph.DeviceGraph
+ flowProxy *model.Proxy
+ groupProxy *model.Proxy
+ ldProxy *model.Proxy
+ portProxies map[string]*model.Proxy
+ portProxiesLock sync.RWMutex
+ lockLogicalDevice sync.RWMutex
+ logicalPortsNo map[uint32]bool //value is true for NNI port
+ lockLogicalPortsNo sync.RWMutex
+ flowDecomposer *fd.FlowDecomposer
+ defaultTimeout int64
}
func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager,
@@ -73,7 +70,6 @@
agent.portProxiesLock = sync.RWMutex{}
agent.lockLogicalPortsNo = sync.RWMutex{}
agent.logicalPortsNo = make(map[uint32]bool)
- agent.includeDefaultFlows = true
agent.defaultTimeout = timeout
return &agent
}
@@ -156,8 +152,6 @@
return status.Error(codes.Internal, "logical-device-proxy-null")
}
- agent.includeDefaultFlows = true
-
return nil
}
@@ -551,14 +545,11 @@
}
if changed {
// Launch a routine to decompose the flows
- if err := agent.decomposeAndSendFlows(&ofp.Flows{Items: updatedFlows}, lDevice.FlowGroups, agent.includeDefaultFlows); err != nil {
+ if err := agent.decomposeAndSendFlows(&ofp.Flows{Items: updatedFlows}, lDevice.FlowGroups); err != nil {
log.Errorw("decomposing-and-sending-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return err
}
- // We no longer need to sent the default flows, unless there is a change in device topology
- agent.includeDefaultFlows = false
-
// Update model
flowsToUpdate := &ofp.Flows{}
if lDevice.Flows != nil {
@@ -572,10 +563,10 @@
return nil
}
-func (agent *LogicalDeviceAgent) decomposeAndSendFlows(flows *ofp.Flows, groups *ofp.FlowGroups, includeDefaultFlows bool) error {
+func (agent *LogicalDeviceAgent) decomposeAndSendFlows(flows *ofp.Flows, groups *ofp.FlowGroups) error {
log.Debugw("decomposeAndSendFlows", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *groups, includeDefaultFlows)
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *groups)
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
chnlsList := make([]chan interface{}, 0)
@@ -1022,99 +1013,6 @@
return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
}
-func (agent *LogicalDeviceAgent) rootDeviceDefaultRules() *fu.FlowsAndGroups {
- return fu.NewFlowsAndGroups()
-}
-
-func (agent *LogicalDeviceAgent) leafDeviceDefaultRules(deviceId string) *fu.FlowsAndGroups {
- fg := fu.NewFlowsAndGroups()
- var device *voltha.Device
- var err error
- if device, err = agent.deviceMgr.GetDevice(deviceId); err != nil {
- return fg
- }
- //set the upstream and downstream ports
- upstreamPorts := make([]*voltha.Port, 0)
- downstreamPorts := make([]*voltha.Port, 0)
- for _, port := range device.Ports {
- if port.Type == voltha.Port_PON_ONU || port.Type == voltha.Port_VENET_ONU {
- upstreamPorts = append(upstreamPorts, port)
- } else if port.Type == voltha.Port_ETHERNET_UNI {
- downstreamPorts = append(downstreamPorts, port)
- }
- }
- //it is possible that the downstream ports are not created, but the flow_decomposition has already
- //kicked in. In such scenarios, cut short the processing and return.
- if len(downstreamPorts) == 0 || len(upstreamPorts) == 0 {
- return fg
- }
- // set up the default flows
- var fa *fu.FlowArgs
- fa = &fu.FlowArgs{
- KV: fu.OfpFlowModArgs{"priority": 500},
- MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(downstreamPorts[0].PortNo),
- fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
- },
- Actions: []*ofp.OfpAction{
- fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
- fu.Output(upstreamPorts[0].PortNo),
- },
- }
- fg.AddFlow(fu.MkFlowStat(fa))
-
- fa = &fu.FlowArgs{
- KV: fu.OfpFlowModArgs{"priority": 500},
- MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(downstreamPorts[0].PortNo),
- fu.VlanVid(0),
- },
- Actions: []*ofp.OfpAction{
- fu.PushVlan(0x8100),
- fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
- fu.Output(upstreamPorts[0].PortNo),
- },
- }
- fg.AddFlow(fu.MkFlowStat(fa))
-
- fa = &fu.FlowArgs{
- KV: fu.OfpFlowModArgs{"priority": 500},
- MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(upstreamPorts[0].PortNo),
- fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan),
- },
- Actions: []*ofp.OfpAction{
- fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
- fu.Output(downstreamPorts[0].PortNo),
- },
- }
- fg.AddFlow(fu.MkFlowStat(fa))
-
- return fg
-}
-
-func (agent *LogicalDeviceAgent) generateDefaultRules() *fu.DeviceRules {
- rules := fu.NewDeviceRules()
- var ld *voltha.LogicalDevice
- var err error
- if ld, err = agent.GetLogicalDevice(); err != nil {
- log.Warnw("no-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- return rules
- }
-
- deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
- for deviceId := range deviceNodeIds {
- if deviceId == ld.RootDeviceId {
- rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
- }
- }
- return rules
-}
-
-func (agent *LogicalDeviceAgent) GetAllDefaultRules() *fu.DeviceRules {
- return agent.DefaultFlowRules
-}
-
//GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
//returns their port numbers. This function is invoked only during flow decomposition where the lock on the logical
//device is already held. Therefore it is safe to retrieve the logical device without lock.
@@ -1138,12 +1036,11 @@
return agent.deviceGraph
}
-//updateRoutes redo the device graph if not done already and setup the default rules as well
+//updateRoutes rebuilds the device graph if not done already
func (agent *LogicalDeviceAgent) updateRoutes(device *voltha.Device, port *voltha.Port) error {
log.Debugf("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "device": device.Id, "port": port})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- rules := fu.NewDeviceRules()
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
}
@@ -1155,17 +1052,6 @@
//TODO: Find a better way to refresh only missing routes
agent.deviceGraph.ComputeRoutes(lDevice.Ports)
}
- deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
- for deviceId := range deviceNodeIds {
- if deviceId == agent.rootDeviceId {
- rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
- }
- }
- agent.DefaultFlowRules = rules
-
- // Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
- // one when a flow request is received.
- agent.includeDefaultFlows = true
agent.deviceGraph.Print()
return nil
}
@@ -1175,22 +1061,10 @@
log.Debugf("updateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- rules := fu.NewDeviceRules()
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
}
agent.deviceGraph.AddPort(lp)
- deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
- for deviceId := range deviceNodeIds {
- if deviceId == agent.rootDeviceId {
- rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
- }
- }
- agent.DefaultFlowRules = rules
-
- // Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
- // one when a flow request is received.
- agent.includeDefaultFlows = true
agent.deviceGraph.Print()
}
diff --git a/rw_core/coreIf/logical_device_agent_if.go b/rw_core/coreIf/logical_device_agent_if.go
index c2614b2..704f268 100644
--- a/rw_core/coreIf/logical_device_agent_if.go
+++ b/rw_core/coreIf/logical_device_agent_if.go
@@ -21,7 +21,6 @@
import (
"github.com/opencord/voltha-go/rw_core/graph"
- "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-protos/go/voltha"
)
@@ -29,7 +28,6 @@
type LogicalDeviceAgent interface {
GetLogicalDevice() (*voltha.LogicalDevice, error)
GetDeviceGraph() *graph.DeviceGraph
- GetAllDefaultRules() *utils.DeviceRules
GetWildcardInputPorts(excludePort ...uint32) []uint32
GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop
}
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index 06d1b8b..98d5092 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -41,9 +41,8 @@
}
//DecomposeRules decomposes per-device flows and flow-groups from the flows and groups defined on a logical device
-func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups, includeDefaultFlows bool) *fu.DeviceRules {
- rules := agent.GetAllDefaultRules()
- deviceRules := rules.Copy()
+func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) *fu.DeviceRules {
+ deviceRules := *fu.NewDeviceRules()
devicesToUpdate := make(map[string]string)
groupMap := make(map[uint32]*ofp.OfpGroupEntry)
@@ -60,12 +59,7 @@
devicesToUpdate[deviceId] = deviceId
}
}
- if includeDefaultFlows {
- return deviceRules
- }
- updatedDeviceRules := deviceRules.FilterRules(devicesToUpdate)
-
- return updatedDeviceRules
+ return deviceRules.FilterRules(devicesToUpdate)
}
// Handles special case of any controller-bound flow for a parent device
diff --git a/rw_core/flow_decomposition/flow_decomposer_test.go b/rw_core/flow_decomposition/flow_decomposer_test.go
index 6464c5d..42e42e6 100644
--- a/rw_core/flow_decomposition/flow_decomposer_test.go
+++ b/rw_core/flow_decomposition/flow_decomposer_test.go
@@ -482,29 +482,14 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
- assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
- assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
+ assert.Nil(t, onu1FlowAndGroup)
assert.Equal(t, 2, oltFlowAndGroup.Flows.Len())
assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
fa = &fu.FlowArgs{
- MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(2),
- fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
- },
- Actions: []*ofp.OfpAction{
- fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
- fu.Output(1),
- },
- }
- expectedOnu1Flow := fu.MkFlowStat(fa)
- derivedFlow := onu1FlowAndGroup.GetFlow(0)
- assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
-
- fa = &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": 1000},
MatchFields: []*ofp.OfpOxmOfbField{
fu.InPort(1),
@@ -519,7 +504,7 @@
},
}
expectedOltFlow := fu.MkFlowStat(fa)
- derivedFlow = oltFlowAndGroup.GetFlow(0)
+ derivedFlow := oltFlowAndGroup.GetFlow(0)
assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
fa = &fu.FlowArgs{
@@ -564,29 +549,14 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
- assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
- assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
+ assert.Nil(t, onu1FlowAndGroup)
assert.Equal(t, 2, oltFlowAndGroup.Flows.Len())
assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
fa = &fu.FlowArgs{
- MatchFields: []*ofp.OfpOxmOfbField{
- fu.InPort(2),
- fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
- },
- Actions: []*ofp.OfpAction{
- fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 101)),
- fu.Output(1),
- },
- }
- expectedOnu1Flow := fu.MkFlowStat(fa)
- derivedFlow := onu1FlowAndGroup.GetFlow(0)
- assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
-
- fa = &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": 1000},
MatchFields: []*ofp.OfpOxmOfbField{
fu.InPort(1),
@@ -605,7 +575,7 @@
},
}
expectedOltFlow := fu.MkFlowStat(fa)
- derivedFlow = oltFlowAndGroup.GetFlow(0)
+ derivedFlow := oltFlowAndGroup.GetFlow(0)
assert.Equal(t, expectedOltFlow.String(), derivedFlow.String())
fa = &fu.FlowArgs{
@@ -662,10 +632,10 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
- assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
+ assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
@@ -684,7 +654,7 @@
},
}
expectedOnu1Flow := fu.MkFlowStat(fa)
- derivedFlow := onu1FlowAndGroup.GetFlow(1)
+ derivedFlow := onu1FlowAndGroup.GetFlow(0)
assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
fa = &fu.FlowArgs{
@@ -739,10 +709,10 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
- assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
+ assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
@@ -777,7 +747,7 @@
},
}
expectedOnu1Flow := fu.MkFlowStat(fa1)
- derivedFlow = onu1FlowAndGroup.GetFlow(1)
+ derivedFlow = onu1FlowAndGroup.GetFlow(0)
assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
}
@@ -813,10 +783,10 @@
groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{fu.MkGroupStat(ga)}}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
- assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
+ assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
assert.Equal(t, 0, onu1FlowAndGroup.Groups.Len())
assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())
@@ -851,6 +821,6 @@
},
}
expectedOnu1Flow := fu.MkFlowStat(fa)
- derivedFlow = onu1FlowAndGroup.GetFlow(1)
+ derivedFlow = onu1FlowAndGroup.GetFlow(0)
assert.Equal(t, expectedOnu1Flow.String(), derivedFlow.String())
}