[VOL-2688] Improve core model performance
This commit addresses the low-hanging performance hogs in the
core model. In particular, the following changes are made:
1) Remove proto message comparision when it's possible. The proto
message deep comparison is quite expensive.
2) Since the Core already has a lock on the device/logicaldevice/
adapters/etc before invoking the model proxy then there is no
need for the latter to create an additional lock on these artifacts
duting an update
3) The model creates a watch on every artifacts it adds to the KV
store. Since in the next Voltha release we will not be using Voltha
Core in pairs then there is no point in keeping these watches (these
is only 1 Core that will ever update an artifact in the next
deployment). This update removes these watch.
4) Additional unit tests has been created, mostly around flows, in an
attempt to exercise both the core and the model further.
Change-Id: Ieaf1f6b9b05c56e819600bc55b46a05f73b8efcf
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index 346198d..d07a095 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -57,6 +57,7 @@
type isDeviceConditionSatisfied func(ld *voltha.Device) bool
type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
+type isConditionSatisfied func() bool
func init() {
_, err := log.AddPackage(log.JSON, logLevel, log.Fields{"instanceId": "coreTests"})
@@ -193,7 +194,7 @@
d, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDeviceID})
if d != nil && d.ParentId != "" {
ld, _ := nbi.GetLogicalDevice(getContext(), &voltha.ID{Id: d.ParentId})
- if ld != nil && verificationFunction(ld) {
+ if verificationFunction(ld) {
ch <- 1
break
}
@@ -270,3 +271,29 @@
return fmt.Errorf("timeout-waiting-logical-devices")
}
}
+
+func waitUntilCondition(timeout time.Duration, nbi *APIHandler, verificationFunction isConditionSatisfied) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ if verificationFunction() {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-for-condition")
+ }
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 877d991..42d628a 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -1457,7 +1457,8 @@
//Remove the associated peer ports on the parent device
if err := agent.deviceMgr.deletePeerPorts(ctx, device.ParentId, device.Id); err != nil {
- return err
+ // At this stage, the parent device may also have been deleted. Just log and keep processing.
+ log.Warnw("failure-deleting-peer-port", log.Fields{"error": err, "child-device-id": device.Id, "parent-device-id": device.ParentId})
}
if err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId); err != nil {
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 1abc565..6122b51 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -19,7 +19,13 @@
"context"
"errors"
"fmt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "math/rand"
+ "os"
+ "runtime"
+ "runtime/pprof"
"strings"
+ "sync"
"testing"
"time"
@@ -39,16 +45,19 @@
)
type NBTest struct {
- etcdServer *lm.EtcdServer
- core *Core
- kClient kafka.Client
- kvClientPort int
- numONUPerOLT int
- oltAdapterName string
- onuAdapterName string
- coreInstanceID string
- defaultTimeout time.Duration
- maxTimeout time.Duration
+ etcdServer *lm.EtcdServer
+ core *Core
+ kClient kafka.Client
+ kvClientPort int
+ numONUPerOLT int
+ startingUNIPortNo int
+ oltAdapter *cm.OLTAdapter
+ onuAdapter *cm.ONUAdapter
+ oltAdapterName string
+ onuAdapterName string
+ coreInstanceID string
+ defaultTimeout time.Duration
+ maxTimeout time.Duration
}
func newNBTest() *NBTest {
@@ -64,8 +73,8 @@
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-nbi-test"
- test.defaultTimeout = 10 * time.Second
- test.maxTimeout = 20 * time.Second
+ test.defaultTimeout = 50 * time.Second
+ test.maxTimeout = 300 * time.Second
return test
}
@@ -99,9 +108,10 @@
if err != nil {
log.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
}
- if adapter, ok := (oltAdapter).(*cm.OLTAdapter); ok {
- nb.numONUPerOLT = adapter.GetNumONUPerOLT()
- }
+ nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
+ nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+ nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+
// Register the adapter
registrationData := &voltha.Adapter{
Id: nb.oltAdapterName,
@@ -116,9 +126,12 @@
}
// Setup the mock ONU adapter
- if _, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName); err != nil {
+ onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
+ if err != nil {
log.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
}
+ nb.onuAdapter = (onuAdapter).(*cm.ONUAdapter)
+
// Register the adapter
registrationData = &voltha.Adapter{
Id: nb.onuAdapterName,
@@ -191,59 +204,64 @@
assert.Nil(t, err)
assert.NotNil(t, devices)
- // Wait until devices are in the correct states
+ // A device is ready to be examined when its ADMIN state is ENABLED and OPERATIONAL state is ACTIVE
var vFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
return device.AdminState == voltha.AdminState_ENABLED && device.OperStatus == voltha.OperStatus_ACTIVE
}
- for _, d := range devices.Items {
- err = waitUntilDeviceReadiness(d.Id, nb.maxTimeout, vFunction, nbi)
- assert.Nil(t, err)
- assert.NotNil(t, d)
- }
- // Get the latest device updates as they may have changed since last list devices
- updatedDevices, err := nbi.ListDevices(getContext(), &empty.Empty{})
- assert.Nil(t, err)
- assert.NotNil(t, devices)
- for _, d := range updatedDevices.Items {
- assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
- assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
- assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
- assert.Equal(t, d.Type, d.Adapter)
- assert.NotEqual(t, "", d.MacAddress)
- assert.NotEqual(t, "", d.SerialNumber)
- if d.Type == "olt_adapter_mock" {
- assert.Equal(t, true, d.Root)
- assert.NotEqual(t, "", d.Id)
- assert.NotEqual(t, "", d.ParentId)
- assert.Nil(t, d.ProxyAddress)
- } else if d.Type == "onu_adapter_mock" {
- assert.Equal(t, false, d.Root)
- assert.NotEqual(t, uint32(0), d.Vlan)
- assert.NotEqual(t, "", d.Id)
- assert.NotEqual(t, "", d.ParentId)
- assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
- assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
- } else {
- assert.Error(t, errors.New("invalid-device-type"))
- }
- assert.Equal(t, 2, len(d.Ports))
- for _, p := range d.Ports {
- assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
- assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
- if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
- assert.Equal(t, 0, len(p.Peers))
- } else if p.Type == voltha.Port_PON_OLT {
- assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
- assert.Equal(t, uint32(1), p.PortNo)
- } else if p.Type == voltha.Port_PON_ONU {
- assert.Equal(t, 1, len(p.Peers))
- assert.Equal(t, uint32(1), p.PortNo)
+ var wg sync.WaitGroup
+ for _, device := range devices.Items {
+ wg.Add(1)
+ go func(wg *sync.WaitGroup, device *voltha.Device) {
+ // Wait until the device is in the right state
+ err := waitUntilDeviceReadiness(device.Id, nb.maxTimeout, vFunction, nbi)
+ assert.Nil(t, err)
+
+ // Now, verify the details of the device. First get the latest update
+ d, err := nbi.GetDevice(getContext(), &voltha.ID{Id: device.Id})
+ assert.Nil(t, err)
+ assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
+ assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
+ assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
+ assert.Equal(t, d.Type, d.Adapter)
+ assert.NotEqual(t, "", d.MacAddress)
+ assert.NotEqual(t, "", d.SerialNumber)
+
+ if d.Type == "olt_adapter_mock" {
+ assert.Equal(t, true, d.Root)
+ assert.NotEqual(t, "", d.Id)
+ assert.NotEqual(t, "", d.ParentId)
+ assert.Nil(t, d.ProxyAddress)
+ } else if d.Type == "onu_adapter_mock" {
+ assert.Equal(t, false, d.Root)
+ assert.NotEqual(t, uint32(0), d.Vlan)
+ assert.NotEqual(t, "", d.Id)
+ assert.NotEqual(t, "", d.ParentId)
+ assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
+ assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
} else {
- assert.Error(t, errors.New("invalid-port"))
+ assert.Error(t, errors.New("invalid-device-type"))
}
- }
+ assert.Equal(t, 2, len(d.Ports))
+ for _, p := range d.Ports {
+ assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
+ assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
+ if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
+ assert.Equal(t, 0, len(p.Peers))
+ } else if p.Type == voltha.Port_PON_OLT {
+ assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
+ assert.Equal(t, uint32(1), p.PortNo)
+ } else if p.Type == voltha.Port_PON_ONU {
+ assert.Equal(t, 1, len(p.Peers))
+ assert.Equal(t, uint32(1), p.PortNo)
+ } else {
+ assert.Error(t, errors.New("invalid-port"))
+ }
+ }
+ wg.Done()
+ }(&wg, device)
}
+ wg.Wait()
}
func (nb *NBTest) getADevice(rootDevice bool, nbi *APIHandler) (*voltha.Device, error) {
@@ -370,6 +388,11 @@
err = waitUntilConditionForDevices(5*time.Second, nbi, vdFunction)
assert.Nil(t, err)
+ // Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg)
+
// Create the device with valid data
oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
assert.Nil(t, err)
@@ -401,6 +424,9 @@
// Verify that the logical device has been setup correctly
nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+ // Wait until all flows has been sent to the devices successfully
+ wg.Wait()
}
func (nb *NBTest) testDisableAndReEnableRootDevice(t *testing.T, nbi *APIHandler) {
@@ -430,6 +456,9 @@
// Wait for the logical device to satisfy the expected condition
var vlFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+ if ld == nil {
+ return false
+ }
for _, lp := range ld.Ports {
if (lp.OfpPort.Config&uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LINK_DOWN) {
@@ -462,6 +491,9 @@
// Wait for the logical device to satisfy the expected condition
vlFunction = func(ld *voltha.LogicalDevice) bool {
+ if ld == nil {
+ return false
+ }
for _, lp := range ld.Ports {
if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -617,6 +649,9 @@
assert.Nil(t, err)
// Wait for the logical device to satisfy the expected condition
var vlFunction = func(ld *voltha.LogicalDevice) bool {
+ if ld == nil {
+ return false
+ }
for _, lp := range ld.Ports {
if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -645,6 +680,9 @@
assert.Nil(t, err)
// Wait for the logical device to satisfy the expected condition
vlFunction = func(ld *voltha.LogicalDevice) bool {
+ if ld == nil {
+ return false
+ }
for _, lp := range ld.Ports {
if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -672,7 +710,226 @@
}
+func makeSimpleFlowMod(fa *flows.FlowArgs) *ofp.OfpFlowMod {
+ matchFields := make([]*ofp.OfpOxmField, 0)
+ for _, val := range fa.MatchFields {
+ matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+ }
+ return flows.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
+}
+
+func createMetadata(cTag int, techProfile int, port int) uint64 {
+ md := 0
+ md = (md | (cTag & 0xFFFF)) << 16
+ md = (md | (techProfile & 0xFFFF)) << 32
+ return uint64(md | (port & 0xFFFFFFFF))
+}
+
+func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int) {
+ expectedNumFlows := numNNIPorts*3 + numNNIPorts*numUNIPorts
+ // Wait for logical device to have all the flows
+ var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+ return lds != nil && len(lds.Items) == 1 && len(lds.Items[0].Flows.Items) == expectedNumFlows
+ }
+ // No timeout implies a success
+ err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) sendTrapFlows(t *testing.T, nbi *APIHandler, logicalDevice *voltha.LogicalDevice, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
+ // Send flows for the parent device
+ var nniPorts []*voltha.LogicalPort
+ var uniPorts []*voltha.LogicalPort
+ for _, p := range logicalDevice.Ports {
+ if p.RootPort {
+ nniPorts = append(nniPorts, p)
+ } else {
+ uniPorts = append(uniPorts, p)
+ }
+ }
+ assert.Equal(t, 1, len(nniPorts))
+ //assert.Greater(t, len(uniPorts), 1 )
+ nniPort := nniPorts[0].OfpPort.PortNo
+ maxInt32 := uint64(0xFFFFFFFF)
+ controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+ var fa *flows.FlowArgs
+ fa = &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(nniPort),
+ flows.EthType(35020),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+ _, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowLLDP)
+ assert.Nil(t, err)
+
+ fa = &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(nniPort),
+ flows.EthType(2048),
+ flows.IpProto(17),
+ flows.UdpSrc(67),
+ flows.UdpDst(68),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+ _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV4)
+ assert.Nil(t, err)
+
+ fa = &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(nniPort),
+ flows.EthType(34525),
+ flows.IpProto(17),
+ flows.UdpSrc(546),
+ flows.UdpDst(547),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+ _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV6)
+ assert.Nil(t, err)
+
+ return len(nniPorts), len(uniPorts)
+}
+
+func (nb *NBTest) sendEAPFlows(t *testing.T, nbi *APIHandler, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
+ maxInt32 := uint64(0xFFFFFFFF)
+ controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1, "write_metadata": createMetadata(vlan, 64, 0), "meter_id": meterID},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(port.PortNo),
+ flows.EthType(34958),
+ flows.VlanVid(8187),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
+ _, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup) {
+ defer wg.Done()
+ if nb.core.logicalDeviceMgr.grpcNbiHdlr != nbi {
+ nb.core.logicalDeviceMgr.setGrpcNbiHandler(nbi)
+ }
+
+ // Clear any existing flows on the adapters
+ nb.oltAdapter.ClearFlows()
+ nb.onuAdapter.ClearFlows()
+
+ // Wait until a logical device is ready
+ var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+ if lds == nil || len(lds.Items) != 1 {
+ return false
+ }
+ // Ensure there are both NNI ports and at least one UNI port on the logical device
+ ld := lds.Items[0]
+ nniPort := false
+ uniPort := false
+ for _, p := range ld.Ports {
+ nniPort = nniPort || p.RootPort == true
+ uniPort = uniPort || p.RootPort == false
+ if nniPort && uniPort {
+ return true
+ }
+ }
+ return false
+ }
+ err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, logicalDevices)
+ assert.Equal(t, 1, len(logicalDevices.Items))
+
+ logicalDevice := logicalDevices.Items[0]
+ meterID := rand.Uint32()
+
+ // Add a meter to the logical device
+ meterMod := &ofp.OfpMeterMod{
+ Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+ Flags: rand.Uint32(),
+ MeterId: meterID,
+ Bands: []*ofp.OfpMeterBandHeader{
+ {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+ Rate: rand.Uint32(),
+ BurstSize: rand.Uint32(),
+ Data: nil,
+ },
+ },
+ }
+ _, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{Id: logicalDevice.Id, MeterMod: meterMod})
+ assert.Nil(t, err)
+
+ // Send initial set of Trap flows
+ startingVlan := 4091
+ nb.sendTrapFlows(t, nbi, logicalDevice, uint64(meterID), startingVlan)
+
+ // Listen for port events
+ processedLogicalPorts := 0
+ for event := range nbi.changeEventQueue {
+ startingVlan++
+ if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
+ ps := portStatus.PortStatus
+ if ps.Reason == ofp.OfpPortReason_OFPPR_ADD {
+ processedLogicalPorts++
+ if ps.Desc.PortNo >= uint32(nb.startingUNIPortNo) {
+ nb.sendEAPFlows(t, nbi, logicalDevice.Id, ps.Desc, startingVlan, uint64(meterID))
+ }
+ }
+ }
+ if processedLogicalPorts >= numNNIPorts+numUNIPorts {
+ break
+ }
+ }
+ //Verify the flow count on the logical device
+ nb.verifyLogicalDeviceFlowCount(t, nbi, numNNIPorts, numUNIPorts)
+
+ // Wait until all flows have been sent to the OLT adapters
+ var oltVFunc isConditionSatisfied = func() bool {
+ return nb.oltAdapter.GetFlowCount() >= (numNNIPorts*3)+numNNIPorts*numUNIPorts
+ }
+ err = waitUntilCondition(nb.maxTimeout, nbi, oltVFunc)
+ assert.Nil(t, err)
+
+ // Wait until all flows have been sent to the ONU adapters
+ var onuVFunc isConditionSatisfied = func() bool {
+ return nb.onuAdapter.GetFlowCount() == numUNIPorts
+ }
+ err = waitUntilCondition(nb.maxTimeout, nbi, onuVFunc)
+ assert.Nil(t, err)
+}
+
func TestSuite1(t *testing.T) {
+ f, err := os.Create("profile.cpu")
+ if err != nil {
+ log.Fatalf("could not create CPU profile: %v\n ", err)
+ }
+ defer f.Close()
+ runtime.SetBlockProfileRate(1)
+ runtime.SetMutexProfileFraction(-1)
+ if err := pprof.StartCPUProfile(f); err != nil {
+ log.Fatalf("could not start CPU profile: %v\n", err)
+ }
+ defer pprof.StopCPUProfile()
+
nb := newNBTest()
assert.NotNil(t, nb)
@@ -695,7 +952,7 @@
numberOfDeviceTestRuns := 2
for i := 1; i <= numberOfDeviceTestRuns; i++ {
- // 3. Test create device
+ //3. Test create device
nb.testCreateDevice(t, nbi)
// 4. Test Enable a device
@@ -703,6 +960,7 @@
// 5. Test disable and ReEnable a root device
nb.testDisableAndReEnableRootDevice(t, nbi)
+
// 6. Test disable and Enable pon port of OLT device
nb.testDisableAndEnablePort(t, nbi)
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 2ab98a3..303bae3 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -20,6 +20,7 @@
"context"
"fmt"
"strings"
+ "sync"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
@@ -31,19 +32,25 @@
)
const (
- numONUPerOLT = 4
+ numONUPerOLT = 4
+ startingUNIPortNo = 100
)
// OLTAdapter represent OLT adapter
type OLTAdapter struct {
+ flows map[uint64]*voltha.OfpFlowStats
+ lock sync.Mutex
Adapter
}
// NewOLTAdapter - creates OLT adapter instance
func NewOLTAdapter(cp adapterif.CoreProxy) *OLTAdapter {
- a := &OLTAdapter{}
- a.coreProxy = cp
- return a
+ return &OLTAdapter{
+ flows: map[uint64]*voltha.OfpFlowStats{},
+ Adapter: Adapter{
+ coreProxy: cp,
+ },
+ }
}
// Adopt_device creates new handler for added device
@@ -97,7 +104,7 @@
}
// Register Child devices
- initialUniPortNo := 100
+ initialUniPortNo := startingUNIPortNo
for i := 0; i < numONUPerOLT; i++ {
go func(seqNo int) {
if _, err := oltA.coreProxy.ChildDeviceDetected(
@@ -168,6 +175,11 @@
return numONUPerOLT
}
+// Returns the starting UNI port number
+func (oltA *OLTAdapter) GetStartingUNIPortNo() int {
+ return startingUNIPortNo
+}
+
// Disable_device disables device
func (oltA *OLTAdapter) Disable_device(device *voltha.Device) error { // nolint
go func() {
@@ -261,3 +273,37 @@
func (oltA *OLTAdapter) Child_device_lost(deviceID string, pPortNo uint32, onuID uint32) error { // nolint
return nil
}
+
+// Update_flows_incrementally mocks the incremental flow update
+func (oltA *OLTAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+ oltA.lock.Lock()
+ defer oltA.lock.Unlock()
+
+ if flows.ToAdd != nil {
+ for _, f := range flows.ToAdd.Items {
+ oltA.flows[f.Id] = f
+ }
+ }
+ if flows.ToRemove != nil {
+ for _, f := range flows.ToRemove.Items {
+ delete(oltA.flows, f.Id)
+ }
+ }
+ return nil
+}
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (oltA *OLTAdapter) GetFlowCount() int {
+ oltA.lock.Lock()
+ defer oltA.lock.Unlock()
+
+ return len(oltA.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (oltA *OLTAdapter) ClearFlows() {
+ oltA.lock.Lock()
+ defer oltA.lock.Unlock()
+
+ oltA.flows = map[uint64]*voltha.OfpFlowStats{}
+}
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index ea02210..73ee749 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -20,6 +20,7 @@
"context"
"fmt"
"strings"
+ "sync"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
@@ -32,15 +33,19 @@
// ONUAdapter represent ONU adapter attributes
type ONUAdapter struct {
- coreProxy adapterif.CoreProxy
+ flows map[uint64]*voltha.OfpFlowStats
+ lock sync.Mutex
Adapter
}
// NewONUAdapter creates ONU adapter
func NewONUAdapter(cp adapterif.CoreProxy) *ONUAdapter {
- a := &ONUAdapter{}
- a.coreProxy = cp
- return a
+ return &ONUAdapter{
+ flows: map[uint64]*voltha.OfpFlowStats{},
+ Adapter: Adapter{
+ coreProxy: cp,
+ },
+ }
}
// Adopt_device creates new handler for added device
@@ -200,3 +205,37 @@
}()
return nil
}
+
+// Update_flows_incrementally mocks the incremental flow update
+func (onuA *ONUAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+ onuA.lock.Lock()
+ defer onuA.lock.Unlock()
+
+ if flows.ToAdd != nil {
+ for _, f := range flows.ToAdd.Items {
+ onuA.flows[f.Id] = f
+ }
+ }
+ if flows.ToRemove != nil {
+ for _, f := range flows.ToRemove.Items {
+ delete(onuA.flows, f.Id)
+ }
+ }
+ return nil
+}
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (onuA *ONUAdapter) GetFlowCount() int {
+ onuA.lock.Lock()
+ defer onuA.lock.Unlock()
+
+ return len(onuA.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (onuA *ONUAdapter) ClearFlows() {
+ onuA.lock.Lock()
+ defer onuA.lock.Unlock()
+
+ onuA.flows = map[uint64]*voltha.OfpFlowStats{}
+}