[VOL-1588] Improve Flow Add performance
This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.
The flow update/deletion performance will be addressed in a separate
commit.
Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index ec2904f..980420a 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -22,11 +22,11 @@
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
- ofp "github.com/opencord/voltha-protos/go/openflow_13"
- "github.com/opencord/voltha-protos/go/voltha"
"github.com/opencord/voltha-go/rw_core/coreIf"
"github.com/opencord/voltha-go/rw_core/graph"
fu "github.com/opencord/voltha-go/rw_core/utils"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/opencord/voltha-protos/go/voltha"
"math/big"
)
@@ -751,9 +751,10 @@
}
//DecomposeRules decomposes per-device flows and flow-groups from the flows and groups defined on a logical device
-func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) *fu.DeviceRules {
+func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups, includeDefaultFlows bool) *fu.DeviceRules {
rules := agent.GetAllDefaultRules()
deviceRules := rules.Copy()
+ devicesToUpdate := make(map[string]string)
groupMap := make(map[uint32]*ofp.OfpGroupEntry)
for _, groupEntry := range groups.Items {
@@ -766,9 +767,15 @@
for deviceId, flowAndGroups := range decomposedRules.Rules {
deviceRules.CreateEntryIfNotExist(deviceId)
deviceRules.Rules[deviceId].AddFrom(flowAndGroups)
+ devicesToUpdate[deviceId] = deviceId
}
}
- return deviceRules
+ if includeDefaultFlows {
+ return deviceRules
+ }
+ updatedDeviceRules := deviceRules.FilterRules(devicesToUpdate)
+
+ return updatedDeviceRules
}
// Handles special case of any controller-bound flow for a parent device
@@ -804,6 +811,7 @@
}
}
}
+
return newDeviceRules
}
@@ -869,6 +877,7 @@
}
}
deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+
return deviceRules
}
@@ -1052,6 +1061,7 @@
fg.AddFlow(MkFlowStat(fa))
deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
}
+
return deviceRules
}
@@ -1230,10 +1240,9 @@
inPortNo := GetInPort(flow)
outPortNo := GetOutPort(flow)
-
deviceRules := fu.NewDeviceRules()
-
route := agent.GetRoute(inPortNo, outPortNo)
+
switch len(route) {
case 0:
log.Errorw("no-route", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "comment": "deleting-flow"})
diff --git a/rw_core/flow_decomposition/flow_decomposer_test.go b/rw_core/flow_decomposition/flow_decomposer_test.go
index d27fd21..e5c4bbd 100644
--- a/rw_core/flow_decomposition/flow_decomposer_test.go
+++ b/rw_core/flow_decomposition/flow_decomposer_test.go
@@ -18,10 +18,10 @@
import (
"errors"
"github.com/opencord/voltha-go/common/log"
- ofp "github.com/opencord/voltha-protos/go/openflow_13"
- "github.com/opencord/voltha-protos/go/voltha"
"github.com/opencord/voltha-go/rw_core/graph"
fu "github.com/opencord/voltha-go/rw_core/utils"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/opencord/voltha-protos/go/voltha"
"github.com/stretchr/testify/assert"
"testing"
@@ -446,7 +446,7 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -526,7 +526,7 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -622,7 +622,7 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
@@ -697,7 +697,7 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
@@ -770,7 +770,7 @@
groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{MkGroupStat(ga)}}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())