[VOL-2688] Improve core model performance

This commit addresses the low-hanging performance hogs in the
core model.  In particular, the following changes are made:

1) Remove proto message comparision when it's possible.  The proto
message deep comparison is quite expensive.
2) Since the Core already has a lock on the device/logicaldevice/
adapters/etc before invoking the model proxy then there is no
need for the latter to create an additional lock on these artifacts
duting an update
3) The model creates a watch on every artifacts it adds to the KV
store.   Since in the next Voltha release we will not be using Voltha
Core in pairs then there is no point in keeping these watches (these
is only 1 Core that will ever update an artifact in the next
deployment).  This update removes these watch.
4) Additional unit tests has been created, mostly around flows, in an
attempt to exercise both the core and the model further.

Change-Id: Ieaf1f6b9b05c56e819600bc55b46a05f73b8efcf
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index 346198d..d07a095 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -57,6 +57,7 @@
 type isDeviceConditionSatisfied func(ld *voltha.Device) bool
 type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
 type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
+type isConditionSatisfied func() bool
 
 func init() {
 	_, err := log.AddPackage(log.JSON, logLevel, log.Fields{"instanceId": "coreTests"})
@@ -193,7 +194,7 @@
 			d, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDeviceID})
 			if d != nil && d.ParentId != "" {
 				ld, _ := nbi.GetLogicalDevice(getContext(), &voltha.ID{Id: d.ParentId})
-				if ld != nil && verificationFunction(ld) {
+				if verificationFunction(ld) {
 					ch <- 1
 					break
 				}
@@ -270,3 +271,29 @@
 		return fmt.Errorf("timeout-waiting-logical-devices")
 	}
 }
+
+func waitUntilCondition(timeout time.Duration, nbi *APIHandler, verificationFunction isConditionSatisfied) error {
+	ch := make(chan int, 1)
+	done := false
+	go func() {
+		for {
+			if verificationFunction() {
+				ch <- 1
+				break
+			}
+			if done {
+				break
+			}
+			time.Sleep(retryInterval)
+		}
+	}()
+	timer := time.NewTimer(timeout)
+	defer timer.Stop()
+	select {
+	case <-ch:
+		return nil
+	case <-timer.C:
+		done = true
+		return fmt.Errorf("timeout-waiting-for-condition")
+	}
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 877d991..42d628a 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -1457,7 +1457,8 @@
 
 	//Remove the associated peer ports on the parent device
 	if err := agent.deviceMgr.deletePeerPorts(ctx, device.ParentId, device.Id); err != nil {
-		return err
+		// At this stage, the parent device may also have been deleted.  Just log and keep processing.
+		log.Warnw("failure-deleting-peer-port", log.Fields{"error": err, "child-device-id": device.Id, "parent-device-id": device.ParentId})
 	}
 
 	if err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId); err != nil {
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 1abc565..6122b51 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -19,7 +19,13 @@
 	"context"
 	"errors"
 	"fmt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"math/rand"
+	"os"
+	"runtime"
+	"runtime/pprof"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -39,16 +45,19 @@
 )
 
 type NBTest struct {
-	etcdServer     *lm.EtcdServer
-	core           *Core
-	kClient        kafka.Client
-	kvClientPort   int
-	numONUPerOLT   int
-	oltAdapterName string
-	onuAdapterName string
-	coreInstanceID string
-	defaultTimeout time.Duration
-	maxTimeout     time.Duration
+	etcdServer        *lm.EtcdServer
+	core              *Core
+	kClient           kafka.Client
+	kvClientPort      int
+	numONUPerOLT      int
+	startingUNIPortNo int
+	oltAdapter        *cm.OLTAdapter
+	onuAdapter        *cm.ONUAdapter
+	oltAdapterName    string
+	onuAdapterName    string
+	coreInstanceID    string
+	defaultTimeout    time.Duration
+	maxTimeout        time.Duration
 }
 
 func newNBTest() *NBTest {
@@ -64,8 +73,8 @@
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-nbi-test"
-	test.defaultTimeout = 10 * time.Second
-	test.maxTimeout = 20 * time.Second
+	test.defaultTimeout = 50 * time.Second
+	test.maxTimeout = 300 * time.Second
 	return test
 }
 
@@ -99,9 +108,10 @@
 	if err != nil {
 		log.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
 	}
-	if adapter, ok := (oltAdapter).(*cm.OLTAdapter); ok {
-		nb.numONUPerOLT = adapter.GetNumONUPerOLT()
-	}
+	nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
+	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+
 	//	Register the adapter
 	registrationData := &voltha.Adapter{
 		Id:      nb.oltAdapterName,
@@ -116,9 +126,12 @@
 	}
 
 	// Setup the mock ONU adapter
-	if _, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName); err != nil {
+	onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
+	if err != nil {
 		log.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
 	}
+	nb.onuAdapter = (onuAdapter).(*cm.ONUAdapter)
+
 	//	Register the adapter
 	registrationData = &voltha.Adapter{
 		Id:      nb.onuAdapterName,
@@ -191,59 +204,64 @@
 	assert.Nil(t, err)
 	assert.NotNil(t, devices)
 
-	// Wait until devices are in the correct states
+	// A device is ready to be examined when its ADMIN state is ENABLED and OPERATIONAL state is ACTIVE
 	var vFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
 		return device.AdminState == voltha.AdminState_ENABLED && device.OperStatus == voltha.OperStatus_ACTIVE
 	}
-	for _, d := range devices.Items {
-		err = waitUntilDeviceReadiness(d.Id, nb.maxTimeout, vFunction, nbi)
-		assert.Nil(t, err)
-		assert.NotNil(t, d)
-	}
-	// Get the latest device updates as they may have changed since last list devices
-	updatedDevices, err := nbi.ListDevices(getContext(), &empty.Empty{})
-	assert.Nil(t, err)
-	assert.NotNil(t, devices)
-	for _, d := range updatedDevices.Items {
-		assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
-		assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
-		assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
-		assert.Equal(t, d.Type, d.Adapter)
-		assert.NotEqual(t, "", d.MacAddress)
-		assert.NotEqual(t, "", d.SerialNumber)
 
-		if d.Type == "olt_adapter_mock" {
-			assert.Equal(t, true, d.Root)
-			assert.NotEqual(t, "", d.Id)
-			assert.NotEqual(t, "", d.ParentId)
-			assert.Nil(t, d.ProxyAddress)
-		} else if d.Type == "onu_adapter_mock" {
-			assert.Equal(t, false, d.Root)
-			assert.NotEqual(t, uint32(0), d.Vlan)
-			assert.NotEqual(t, "", d.Id)
-			assert.NotEqual(t, "", d.ParentId)
-			assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
-			assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
-		} else {
-			assert.Error(t, errors.New("invalid-device-type"))
-		}
-		assert.Equal(t, 2, len(d.Ports))
-		for _, p := range d.Ports {
-			assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
-			assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
-			if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
-				assert.Equal(t, 0, len(p.Peers))
-			} else if p.Type == voltha.Port_PON_OLT {
-				assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
-				assert.Equal(t, uint32(1), p.PortNo)
-			} else if p.Type == voltha.Port_PON_ONU {
-				assert.Equal(t, 1, len(p.Peers))
-				assert.Equal(t, uint32(1), p.PortNo)
+	var wg sync.WaitGroup
+	for _, device := range devices.Items {
+		wg.Add(1)
+		go func(wg *sync.WaitGroup, device *voltha.Device) {
+			// Wait until the device is in the right state
+			err := waitUntilDeviceReadiness(device.Id, nb.maxTimeout, vFunction, nbi)
+			assert.Nil(t, err)
+
+			// Now, verify the details of the device.  First get the latest update
+			d, err := nbi.GetDevice(getContext(), &voltha.ID{Id: device.Id})
+			assert.Nil(t, err)
+			assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
+			assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
+			assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
+			assert.Equal(t, d.Type, d.Adapter)
+			assert.NotEqual(t, "", d.MacAddress)
+			assert.NotEqual(t, "", d.SerialNumber)
+
+			if d.Type == "olt_adapter_mock" {
+				assert.Equal(t, true, d.Root)
+				assert.NotEqual(t, "", d.Id)
+				assert.NotEqual(t, "", d.ParentId)
+				assert.Nil(t, d.ProxyAddress)
+			} else if d.Type == "onu_adapter_mock" {
+				assert.Equal(t, false, d.Root)
+				assert.NotEqual(t, uint32(0), d.Vlan)
+				assert.NotEqual(t, "", d.Id)
+				assert.NotEqual(t, "", d.ParentId)
+				assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
+				assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
 			} else {
-				assert.Error(t, errors.New("invalid-port"))
+				assert.Error(t, errors.New("invalid-device-type"))
 			}
-		}
+			assert.Equal(t, 2, len(d.Ports))
+			for _, p := range d.Ports {
+				assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
+				assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
+				if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
+					assert.Equal(t, 0, len(p.Peers))
+				} else if p.Type == voltha.Port_PON_OLT {
+					assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
+					assert.Equal(t, uint32(1), p.PortNo)
+				} else if p.Type == voltha.Port_PON_ONU {
+					assert.Equal(t, 1, len(p.Peers))
+					assert.Equal(t, uint32(1), p.PortNo)
+				} else {
+					assert.Error(t, errors.New("invalid-port"))
+				}
+			}
+			wg.Done()
+		}(&wg, device)
 	}
+	wg.Wait()
 }
 
 func (nb *NBTest) getADevice(rootDevice bool, nbi *APIHandler) (*voltha.Device, error) {
@@ -370,6 +388,11 @@
 	err = waitUntilConditionForDevices(5*time.Second, nbi, vdFunction)
 	assert.Nil(t, err)
 
+	// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg)
+
 	//	Create the device with valid data
 	oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
 	assert.Nil(t, err)
@@ -401,6 +424,9 @@
 
 	// Verify that the logical device has been setup correctly
 	nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+	// Wait until all flows has been sent to the devices successfully
+	wg.Wait()
 }
 
 func (nb *NBTest) testDisableAndReEnableRootDevice(t *testing.T, nbi *APIHandler) {
@@ -430,6 +456,9 @@
 
 	// Wait for the logical device to satisfy the expected condition
 	var vlFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LINK_DOWN) {
@@ -462,6 +491,9 @@
 
 	// Wait for the logical device to satisfy the expected condition
 	vlFunction = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -617,6 +649,9 @@
 	assert.Nil(t, err)
 	// Wait for the logical device to satisfy the expected condition
 	var vlFunction = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -645,6 +680,9 @@
 	assert.Nil(t, err)
 	// Wait for the logical device to satisfy the expected condition
 	vlFunction = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -672,7 +710,226 @@
 
 }
 
+func makeSimpleFlowMod(fa *flows.FlowArgs) *ofp.OfpFlowMod {
+	matchFields := make([]*ofp.OfpOxmField, 0)
+	for _, val := range fa.MatchFields {
+		matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+	}
+	return flows.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
+}
+
+func createMetadata(cTag int, techProfile int, port int) uint64 {
+	md := 0
+	md = (md | (cTag & 0xFFFF)) << 16
+	md = (md | (techProfile & 0xFFFF)) << 32
+	return uint64(md | (port & 0xFFFFFFFF))
+}
+
+func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int) {
+	expectedNumFlows := numNNIPorts*3 + numNNIPorts*numUNIPorts
+	// Wait for logical device to have all the flows
+	var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+		return lds != nil && len(lds.Items) == 1 && len(lds.Items[0].Flows.Items) == expectedNumFlows
+	}
+	// No timeout implies a success
+	err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+	assert.Nil(t, err)
+}
+
+func (nb *NBTest) sendTrapFlows(t *testing.T, nbi *APIHandler, logicalDevice *voltha.LogicalDevice, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
+	// Send flows for the parent device
+	var nniPorts []*voltha.LogicalPort
+	var uniPorts []*voltha.LogicalPort
+	for _, p := range logicalDevice.Ports {
+		if p.RootPort {
+			nniPorts = append(nniPorts, p)
+		} else {
+			uniPorts = append(uniPorts, p)
+		}
+	}
+	assert.Equal(t, 1, len(nniPorts))
+	//assert.Greater(t, len(uniPorts), 1 )
+	nniPort := nniPorts[0].OfpPort.PortNo
+	maxInt32 := uint64(0xFFFFFFFF)
+	controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+	var fa *flows.FlowArgs
+	fa = &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(nniPort),
+			flows.EthType(35020),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+	_, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowLLDP)
+	assert.Nil(t, err)
+
+	fa = &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(nniPort),
+			flows.EthType(2048),
+			flows.IpProto(17),
+			flows.UdpSrc(67),
+			flows.UdpDst(68),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+	_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV4)
+	assert.Nil(t, err)
+
+	fa = &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(nniPort),
+			flows.EthType(34525),
+			flows.IpProto(17),
+			flows.UdpSrc(546),
+			flows.UdpDst(547),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+	_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV6)
+	assert.Nil(t, err)
+
+	return len(nniPorts), len(uniPorts)
+}
+
+func (nb *NBTest) sendEAPFlows(t *testing.T, nbi *APIHandler, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
+	maxInt32 := uint64(0xFFFFFFFF)
+	controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1, "write_metadata": createMetadata(vlan, 64, 0), "meter_id": meterID},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(port.PortNo),
+			flows.EthType(34958),
+			flows.VlanVid(8187),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
+	_, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP)
+	assert.Nil(t, err)
+}
+
+func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup) {
+	defer wg.Done()
+	if nb.core.logicalDeviceMgr.grpcNbiHdlr != nbi {
+		nb.core.logicalDeviceMgr.setGrpcNbiHandler(nbi)
+	}
+
+	// Clear any existing flows on the adapters
+	nb.oltAdapter.ClearFlows()
+	nb.onuAdapter.ClearFlows()
+
+	// Wait until a logical device is ready
+	var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+		if lds == nil || len(lds.Items) != 1 {
+			return false
+		}
+		// Ensure there are both NNI ports and at least one UNI port on the logical device
+		ld := lds.Items[0]
+		nniPort := false
+		uniPort := false
+		for _, p := range ld.Ports {
+			nniPort = nniPort || p.RootPort == true
+			uniPort = uniPort || p.RootPort == false
+			if nniPort && uniPort {
+				return true
+			}
+		}
+		return false
+	}
+	err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+	assert.Nil(t, err)
+
+	logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+	assert.Nil(t, err)
+	assert.NotNil(t, logicalDevices)
+	assert.Equal(t, 1, len(logicalDevices.Items))
+
+	logicalDevice := logicalDevices.Items[0]
+	meterID := rand.Uint32()
+
+	// Add a meter to the logical device
+	meterMod := &ofp.OfpMeterMod{
+		Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+		Flags:   rand.Uint32(),
+		MeterId: meterID,
+		Bands: []*ofp.OfpMeterBandHeader{
+			{Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+				Rate:      rand.Uint32(),
+				BurstSize: rand.Uint32(),
+				Data:      nil,
+			},
+		},
+	}
+	_, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{Id: logicalDevice.Id, MeterMod: meterMod})
+	assert.Nil(t, err)
+
+	// Send initial set of Trap flows
+	startingVlan := 4091
+	nb.sendTrapFlows(t, nbi, logicalDevice, uint64(meterID), startingVlan)
+
+	// Listen for port events
+	processedLogicalPorts := 0
+	for event := range nbi.changeEventQueue {
+		startingVlan++
+		if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
+			ps := portStatus.PortStatus
+			if ps.Reason == ofp.OfpPortReason_OFPPR_ADD {
+				processedLogicalPorts++
+				if ps.Desc.PortNo >= uint32(nb.startingUNIPortNo) {
+					nb.sendEAPFlows(t, nbi, logicalDevice.Id, ps.Desc, startingVlan, uint64(meterID))
+				}
+			}
+		}
+		if processedLogicalPorts >= numNNIPorts+numUNIPorts {
+			break
+		}
+	}
+	//Verify the flow count on the logical device
+	nb.verifyLogicalDeviceFlowCount(t, nbi, numNNIPorts, numUNIPorts)
+
+	// Wait until all flows have been sent to the OLT adapters
+	var oltVFunc isConditionSatisfied = func() bool {
+		return nb.oltAdapter.GetFlowCount() >= (numNNIPorts*3)+numNNIPorts*numUNIPorts
+	}
+	err = waitUntilCondition(nb.maxTimeout, nbi, oltVFunc)
+	assert.Nil(t, err)
+
+	// Wait until all flows have been sent to the ONU adapters
+	var onuVFunc isConditionSatisfied = func() bool {
+		return nb.onuAdapter.GetFlowCount() == numUNIPorts
+	}
+	err = waitUntilCondition(nb.maxTimeout, nbi, onuVFunc)
+	assert.Nil(t, err)
+}
+
 func TestSuite1(t *testing.T) {
+	f, err := os.Create("profile.cpu")
+	if err != nil {
+		log.Fatalf("could not create CPU profile: %v\n ", err)
+	}
+	defer f.Close()
+	runtime.SetBlockProfileRate(1)
+	runtime.SetMutexProfileFraction(-1)
+	if err := pprof.StartCPUProfile(f); err != nil {
+		log.Fatalf("could not start CPU profile: %v\n", err)
+	}
+	defer pprof.StopCPUProfile()
+
 	nb := newNBTest()
 	assert.NotNil(t, nb)
 
@@ -695,7 +952,7 @@
 
 	numberOfDeviceTestRuns := 2
 	for i := 1; i <= numberOfDeviceTestRuns; i++ {
-		// 3. Test create device
+		//3. Test create device
 		nb.testCreateDevice(t, nbi)
 
 		// 4. Test Enable a device
@@ -703,6 +960,7 @@
 
 		// 5. Test disable and ReEnable a root device
 		nb.testDisableAndReEnableRootDevice(t, nbi)
+
 		// 6. Test disable and Enable pon port of OLT device
 		nb.testDisableAndEnablePort(t, nbi)
 
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 2ab98a3..303bae3 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -20,6 +20,7 @@
 	"context"
 	"fmt"
 	"strings"
+	"sync"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
@@ -31,19 +32,25 @@
 )
 
 const (
-	numONUPerOLT = 4
+	numONUPerOLT      = 4
+	startingUNIPortNo = 100
 )
 
 // OLTAdapter represent OLT adapter
 type OLTAdapter struct {
+	flows map[uint64]*voltha.OfpFlowStats
+	lock  sync.Mutex
 	Adapter
 }
 
 // NewOLTAdapter - creates OLT adapter instance
 func NewOLTAdapter(cp adapterif.CoreProxy) *OLTAdapter {
-	a := &OLTAdapter{}
-	a.coreProxy = cp
-	return a
+	return &OLTAdapter{
+		flows: map[uint64]*voltha.OfpFlowStats{},
+		Adapter: Adapter{
+			coreProxy: cp,
+		},
+	}
 }
 
 // Adopt_device creates new handler for added device
@@ -97,7 +104,7 @@
 		}
 
 		// Register Child devices
-		initialUniPortNo := 100
+		initialUniPortNo := startingUNIPortNo
 		for i := 0; i < numONUPerOLT; i++ {
 			go func(seqNo int) {
 				if _, err := oltA.coreProxy.ChildDeviceDetected(
@@ -168,6 +175,11 @@
 	return numONUPerOLT
 }
 
+// Returns the starting UNI port number
+func (oltA *OLTAdapter) GetStartingUNIPortNo() int {
+	return startingUNIPortNo
+}
+
 // Disable_device disables device
 func (oltA *OLTAdapter) Disable_device(device *voltha.Device) error { // nolint
 	go func() {
@@ -261,3 +273,37 @@
 func (oltA *OLTAdapter) Child_device_lost(deviceID string, pPortNo uint32, onuID uint32) error { // nolint
 	return nil
 }
+
+// Update_flows_incrementally mocks the incremental flow update
+func (oltA *OLTAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+	oltA.lock.Lock()
+	defer oltA.lock.Unlock()
+
+	if flows.ToAdd != nil {
+		for _, f := range flows.ToAdd.Items {
+			oltA.flows[f.Id] = f
+		}
+	}
+	if flows.ToRemove != nil {
+		for _, f := range flows.ToRemove.Items {
+			delete(oltA.flows, f.Id)
+		}
+	}
+	return nil
+}
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (oltA *OLTAdapter) GetFlowCount() int {
+	oltA.lock.Lock()
+	defer oltA.lock.Unlock()
+
+	return len(oltA.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (oltA *OLTAdapter) ClearFlows() {
+	oltA.lock.Lock()
+	defer oltA.lock.Unlock()
+
+	oltA.flows = map[uint64]*voltha.OfpFlowStats{}
+}
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index ea02210..73ee749 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -20,6 +20,7 @@
 	"context"
 	"fmt"
 	"strings"
+	"sync"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
@@ -32,15 +33,19 @@
 
 // ONUAdapter represent ONU adapter attributes
 type ONUAdapter struct {
-	coreProxy adapterif.CoreProxy
+	flows map[uint64]*voltha.OfpFlowStats
+	lock  sync.Mutex
 	Adapter
 }
 
 // NewONUAdapter creates ONU adapter
 func NewONUAdapter(cp adapterif.CoreProxy) *ONUAdapter {
-	a := &ONUAdapter{}
-	a.coreProxy = cp
-	return a
+	return &ONUAdapter{
+		flows: map[uint64]*voltha.OfpFlowStats{},
+		Adapter: Adapter{
+			coreProxy: cp,
+		},
+	}
 }
 
 // Adopt_device creates new handler for added device
@@ -200,3 +205,37 @@
 	}()
 	return nil
 }
+
+// Update_flows_incrementally mocks the incremental flow update
+func (onuA *ONUAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+	onuA.lock.Lock()
+	defer onuA.lock.Unlock()
+
+	if flows.ToAdd != nil {
+		for _, f := range flows.ToAdd.Items {
+			onuA.flows[f.Id] = f
+		}
+	}
+	if flows.ToRemove != nil {
+		for _, f := range flows.ToRemove.Items {
+			delete(onuA.flows, f.Id)
+		}
+	}
+	return nil
+}
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (onuA *ONUAdapter) GetFlowCount() int {
+	onuA.lock.Lock()
+	defer onuA.lock.Unlock()
+
+	return len(onuA.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (onuA *ONUAdapter) ClearFlows() {
+	onuA.lock.Lock()
+	defer onuA.lock.Unlock()
+
+	onuA.flows = map[uint64]*voltha.OfpFlowStats{}
+}