[VOL-2688] Improve core model performance

This commit addresses the low-hanging performance hogs in the
core model.  In particular, the following changes are made:

1) Remove proto message comparision when it's possible.  The proto
message deep comparison is quite expensive.
2) Since the Core already has a lock on the device/logicaldevice/
adapters/etc before invoking the model proxy then there is no
need for the latter to create an additional lock on these artifacts
duting an update
3) The model creates a watch on every artifacts it adds to the KV
store.   Since in the next Voltha release we will not be using Voltha
Core in pairs then there is no point in keeping these watches (these
is only 1 Core that will ever update an artifact in the next
deployment).  This update removes these watch.
4) Additional unit tests has been created, mostly around flows, in an
attempt to exercise both the core and the model further.

Change-Id: Ieaf1f6b9b05c56e819600bc55b46a05f73b8efcf
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 1abc565..6122b51 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -19,7 +19,13 @@
 	"context"
 	"errors"
 	"fmt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"math/rand"
+	"os"
+	"runtime"
+	"runtime/pprof"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -39,16 +45,19 @@
 )
 
 type NBTest struct {
-	etcdServer     *lm.EtcdServer
-	core           *Core
-	kClient        kafka.Client
-	kvClientPort   int
-	numONUPerOLT   int
-	oltAdapterName string
-	onuAdapterName string
-	coreInstanceID string
-	defaultTimeout time.Duration
-	maxTimeout     time.Duration
+	etcdServer        *lm.EtcdServer
+	core              *Core
+	kClient           kafka.Client
+	kvClientPort      int
+	numONUPerOLT      int
+	startingUNIPortNo int
+	oltAdapter        *cm.OLTAdapter
+	onuAdapter        *cm.ONUAdapter
+	oltAdapterName    string
+	onuAdapterName    string
+	coreInstanceID    string
+	defaultTimeout    time.Duration
+	maxTimeout        time.Duration
 }
 
 func newNBTest() *NBTest {
@@ -64,8 +73,8 @@
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-nbi-test"
-	test.defaultTimeout = 10 * time.Second
-	test.maxTimeout = 20 * time.Second
+	test.defaultTimeout = 50 * time.Second
+	test.maxTimeout = 300 * time.Second
 	return test
 }
 
@@ -99,9 +108,10 @@
 	if err != nil {
 		log.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
 	}
-	if adapter, ok := (oltAdapter).(*cm.OLTAdapter); ok {
-		nb.numONUPerOLT = adapter.GetNumONUPerOLT()
-	}
+	nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
+	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+
 	//	Register the adapter
 	registrationData := &voltha.Adapter{
 		Id:      nb.oltAdapterName,
@@ -116,9 +126,12 @@
 	}
 
 	// Setup the mock ONU adapter
-	if _, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName); err != nil {
+	onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
+	if err != nil {
 		log.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
 	}
+	nb.onuAdapter = (onuAdapter).(*cm.ONUAdapter)
+
 	//	Register the adapter
 	registrationData = &voltha.Adapter{
 		Id:      nb.onuAdapterName,
@@ -191,59 +204,64 @@
 	assert.Nil(t, err)
 	assert.NotNil(t, devices)
 
-	// Wait until devices are in the correct states
+	// A device is ready to be examined when its ADMIN state is ENABLED and OPERATIONAL state is ACTIVE
 	var vFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
 		return device.AdminState == voltha.AdminState_ENABLED && device.OperStatus == voltha.OperStatus_ACTIVE
 	}
-	for _, d := range devices.Items {
-		err = waitUntilDeviceReadiness(d.Id, nb.maxTimeout, vFunction, nbi)
-		assert.Nil(t, err)
-		assert.NotNil(t, d)
-	}
-	// Get the latest device updates as they may have changed since last list devices
-	updatedDevices, err := nbi.ListDevices(getContext(), &empty.Empty{})
-	assert.Nil(t, err)
-	assert.NotNil(t, devices)
-	for _, d := range updatedDevices.Items {
-		assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
-		assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
-		assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
-		assert.Equal(t, d.Type, d.Adapter)
-		assert.NotEqual(t, "", d.MacAddress)
-		assert.NotEqual(t, "", d.SerialNumber)
 
-		if d.Type == "olt_adapter_mock" {
-			assert.Equal(t, true, d.Root)
-			assert.NotEqual(t, "", d.Id)
-			assert.NotEqual(t, "", d.ParentId)
-			assert.Nil(t, d.ProxyAddress)
-		} else if d.Type == "onu_adapter_mock" {
-			assert.Equal(t, false, d.Root)
-			assert.NotEqual(t, uint32(0), d.Vlan)
-			assert.NotEqual(t, "", d.Id)
-			assert.NotEqual(t, "", d.ParentId)
-			assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
-			assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
-		} else {
-			assert.Error(t, errors.New("invalid-device-type"))
-		}
-		assert.Equal(t, 2, len(d.Ports))
-		for _, p := range d.Ports {
-			assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
-			assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
-			if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
-				assert.Equal(t, 0, len(p.Peers))
-			} else if p.Type == voltha.Port_PON_OLT {
-				assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
-				assert.Equal(t, uint32(1), p.PortNo)
-			} else if p.Type == voltha.Port_PON_ONU {
-				assert.Equal(t, 1, len(p.Peers))
-				assert.Equal(t, uint32(1), p.PortNo)
+	var wg sync.WaitGroup
+	for _, device := range devices.Items {
+		wg.Add(1)
+		go func(wg *sync.WaitGroup, device *voltha.Device) {
+			// Wait until the device is in the right state
+			err := waitUntilDeviceReadiness(device.Id, nb.maxTimeout, vFunction, nbi)
+			assert.Nil(t, err)
+
+			// Now, verify the details of the device.  First get the latest update
+			d, err := nbi.GetDevice(getContext(), &voltha.ID{Id: device.Id})
+			assert.Nil(t, err)
+			assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
+			assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
+			assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
+			assert.Equal(t, d.Type, d.Adapter)
+			assert.NotEqual(t, "", d.MacAddress)
+			assert.NotEqual(t, "", d.SerialNumber)
+
+			if d.Type == "olt_adapter_mock" {
+				assert.Equal(t, true, d.Root)
+				assert.NotEqual(t, "", d.Id)
+				assert.NotEqual(t, "", d.ParentId)
+				assert.Nil(t, d.ProxyAddress)
+			} else if d.Type == "onu_adapter_mock" {
+				assert.Equal(t, false, d.Root)
+				assert.NotEqual(t, uint32(0), d.Vlan)
+				assert.NotEqual(t, "", d.Id)
+				assert.NotEqual(t, "", d.ParentId)
+				assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
+				assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
 			} else {
-				assert.Error(t, errors.New("invalid-port"))
+				assert.Error(t, errors.New("invalid-device-type"))
 			}
-		}
+			assert.Equal(t, 2, len(d.Ports))
+			for _, p := range d.Ports {
+				assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
+				assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
+				if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
+					assert.Equal(t, 0, len(p.Peers))
+				} else if p.Type == voltha.Port_PON_OLT {
+					assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
+					assert.Equal(t, uint32(1), p.PortNo)
+				} else if p.Type == voltha.Port_PON_ONU {
+					assert.Equal(t, 1, len(p.Peers))
+					assert.Equal(t, uint32(1), p.PortNo)
+				} else {
+					assert.Error(t, errors.New("invalid-port"))
+				}
+			}
+			wg.Done()
+		}(&wg, device)
 	}
+	wg.Wait()
 }
 
 func (nb *NBTest) getADevice(rootDevice bool, nbi *APIHandler) (*voltha.Device, error) {
@@ -370,6 +388,11 @@
 	err = waitUntilConditionForDevices(5*time.Second, nbi, vdFunction)
 	assert.Nil(t, err)
 
+	// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg)
+
 	//	Create the device with valid data
 	oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
 	assert.Nil(t, err)
@@ -401,6 +424,9 @@
 
 	// Verify that the logical device has been setup correctly
 	nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+	// Wait until all flows has been sent to the devices successfully
+	wg.Wait()
 }
 
 func (nb *NBTest) testDisableAndReEnableRootDevice(t *testing.T, nbi *APIHandler) {
@@ -430,6 +456,9 @@
 
 	// Wait for the logical device to satisfy the expected condition
 	var vlFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LINK_DOWN) {
@@ -462,6 +491,9 @@
 
 	// Wait for the logical device to satisfy the expected condition
 	vlFunction = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -617,6 +649,9 @@
 	assert.Nil(t, err)
 	// Wait for the logical device to satisfy the expected condition
 	var vlFunction = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -645,6 +680,9 @@
 	assert.Nil(t, err)
 	// Wait for the logical device to satisfy the expected condition
 	vlFunction = func(ld *voltha.LogicalDevice) bool {
+		if ld == nil {
+			return false
+		}
 		for _, lp := range ld.Ports {
 			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
 				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
@@ -672,7 +710,226 @@
 
 }
 
+func makeSimpleFlowMod(fa *flows.FlowArgs) *ofp.OfpFlowMod {
+	matchFields := make([]*ofp.OfpOxmField, 0)
+	for _, val := range fa.MatchFields {
+		matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+	}
+	return flows.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
+}
+
+func createMetadata(cTag int, techProfile int, port int) uint64 {
+	md := 0
+	md = (md | (cTag & 0xFFFF)) << 16
+	md = (md | (techProfile & 0xFFFF)) << 32
+	return uint64(md | (port & 0xFFFFFFFF))
+}
+
+func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int) {
+	expectedNumFlows := numNNIPorts*3 + numNNIPorts*numUNIPorts
+	// Wait for logical device to have all the flows
+	var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+		return lds != nil && len(lds.Items) == 1 && len(lds.Items[0].Flows.Items) == expectedNumFlows
+	}
+	// No timeout implies a success
+	err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+	assert.Nil(t, err)
+}
+
+func (nb *NBTest) sendTrapFlows(t *testing.T, nbi *APIHandler, logicalDevice *voltha.LogicalDevice, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
+	// Send flows for the parent device
+	var nniPorts []*voltha.LogicalPort
+	var uniPorts []*voltha.LogicalPort
+	for _, p := range logicalDevice.Ports {
+		if p.RootPort {
+			nniPorts = append(nniPorts, p)
+		} else {
+			uniPorts = append(uniPorts, p)
+		}
+	}
+	assert.Equal(t, 1, len(nniPorts))
+	//assert.Greater(t, len(uniPorts), 1 )
+	nniPort := nniPorts[0].OfpPort.PortNo
+	maxInt32 := uint64(0xFFFFFFFF)
+	controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+	var fa *flows.FlowArgs
+	fa = &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(nniPort),
+			flows.EthType(35020),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+	_, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowLLDP)
+	assert.Nil(t, err)
+
+	fa = &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(nniPort),
+			flows.EthType(2048),
+			flows.IpProto(17),
+			flows.UdpSrc(67),
+			flows.UdpDst(68),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+	_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV4)
+	assert.Nil(t, err)
+
+	fa = &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(nniPort),
+			flows.EthType(34525),
+			flows.IpProto(17),
+			flows.UdpSrc(546),
+			flows.UdpDst(547),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+	_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV6)
+	assert.Nil(t, err)
+
+	return len(nniPorts), len(uniPorts)
+}
+
+func (nb *NBTest) sendEAPFlows(t *testing.T, nbi *APIHandler, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
+	maxInt32 := uint64(0xFFFFFFFF)
+	controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+	fa := &flows.FlowArgs{
+		KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1, "write_metadata": createMetadata(vlan, 64, 0), "meter_id": meterID},
+		MatchFields: []*ofp.OfpOxmOfbField{
+			flows.InPort(port.PortNo),
+			flows.EthType(34958),
+			flows.VlanVid(8187),
+		},
+		Actions: []*ofp.OfpAction{
+			flows.Output(controllerPortMask),
+		},
+	}
+	flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
+	_, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP)
+	assert.Nil(t, err)
+}
+
+func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup) {
+	defer wg.Done()
+	if nb.core.logicalDeviceMgr.grpcNbiHdlr != nbi {
+		nb.core.logicalDeviceMgr.setGrpcNbiHandler(nbi)
+	}
+
+	// Clear any existing flows on the adapters
+	nb.oltAdapter.ClearFlows()
+	nb.onuAdapter.ClearFlows()
+
+	// Wait until a logical device is ready
+	var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+		if lds == nil || len(lds.Items) != 1 {
+			return false
+		}
+		// Ensure there are both NNI ports and at least one UNI port on the logical device
+		ld := lds.Items[0]
+		nniPort := false
+		uniPort := false
+		for _, p := range ld.Ports {
+			nniPort = nniPort || p.RootPort == true
+			uniPort = uniPort || p.RootPort == false
+			if nniPort && uniPort {
+				return true
+			}
+		}
+		return false
+	}
+	err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+	assert.Nil(t, err)
+
+	logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+	assert.Nil(t, err)
+	assert.NotNil(t, logicalDevices)
+	assert.Equal(t, 1, len(logicalDevices.Items))
+
+	logicalDevice := logicalDevices.Items[0]
+	meterID := rand.Uint32()
+
+	// Add a meter to the logical device
+	meterMod := &ofp.OfpMeterMod{
+		Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+		Flags:   rand.Uint32(),
+		MeterId: meterID,
+		Bands: []*ofp.OfpMeterBandHeader{
+			{Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+				Rate:      rand.Uint32(),
+				BurstSize: rand.Uint32(),
+				Data:      nil,
+			},
+		},
+	}
+	_, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{Id: logicalDevice.Id, MeterMod: meterMod})
+	assert.Nil(t, err)
+
+	// Send initial set of Trap flows
+	startingVlan := 4091
+	nb.sendTrapFlows(t, nbi, logicalDevice, uint64(meterID), startingVlan)
+
+	// Listen for port events
+	processedLogicalPorts := 0
+	for event := range nbi.changeEventQueue {
+		startingVlan++
+		if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
+			ps := portStatus.PortStatus
+			if ps.Reason == ofp.OfpPortReason_OFPPR_ADD {
+				processedLogicalPorts++
+				if ps.Desc.PortNo >= uint32(nb.startingUNIPortNo) {
+					nb.sendEAPFlows(t, nbi, logicalDevice.Id, ps.Desc, startingVlan, uint64(meterID))
+				}
+			}
+		}
+		if processedLogicalPorts >= numNNIPorts+numUNIPorts {
+			break
+		}
+	}
+	//Verify the flow count on the logical device
+	nb.verifyLogicalDeviceFlowCount(t, nbi, numNNIPorts, numUNIPorts)
+
+	// Wait until all flows have been sent to the OLT adapters
+	var oltVFunc isConditionSatisfied = func() bool {
+		return nb.oltAdapter.GetFlowCount() >= (numNNIPorts*3)+numNNIPorts*numUNIPorts
+	}
+	err = waitUntilCondition(nb.maxTimeout, nbi, oltVFunc)
+	assert.Nil(t, err)
+
+	// Wait until all flows have been sent to the ONU adapters
+	var onuVFunc isConditionSatisfied = func() bool {
+		return nb.onuAdapter.GetFlowCount() == numUNIPorts
+	}
+	err = waitUntilCondition(nb.maxTimeout, nbi, onuVFunc)
+	assert.Nil(t, err)
+}
+
 func TestSuite1(t *testing.T) {
+	f, err := os.Create("profile.cpu")
+	if err != nil {
+		log.Fatalf("could not create CPU profile: %v\n ", err)
+	}
+	defer f.Close()
+	runtime.SetBlockProfileRate(1)
+	runtime.SetMutexProfileFraction(-1)
+	if err := pprof.StartCPUProfile(f); err != nil {
+		log.Fatalf("could not start CPU profile: %v\n", err)
+	}
+	defer pprof.StopCPUProfile()
+
 	nb := newNBTest()
 	assert.NotNil(t, nb)
 
@@ -695,7 +952,7 @@
 
 	numberOfDeviceTestRuns := 2
 	for i := 1; i <= numberOfDeviceTestRuns; i++ {
-		// 3. Test create device
+		//3. Test create device
 		nb.testCreateDevice(t, nbi)
 
 		// 4. Test Enable a device
@@ -703,6 +960,7 @@
 
 		// 5. Test disable and ReEnable a root device
 		nb.testDisableAndReEnableRootDevice(t, nbi)
+
 		// 6. Test disable and Enable pon port of OLT device
 		nb.testDisableAndEnablePort(t, nbi)