VOL-2909 - Disaggregating rw_core/core/.

This breaks the core package into logical components. (adapter manager, adapter proxy, devices, nbi/api), as well as the "core" which aggregates all these.

Change-Id: I257ac64024a1cf3efe3f5d89d508e60e6e681fb1
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
new file mode 100644
index 0000000..3c3b2b0
--- /dev/null
+++ b/rw_core/core/device/logical_agent_test.go
@@ -0,0 +1,616 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package device
+
+import (
+	"context"
+	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/gogo/protobuf/proto"
+	"github.com/opencord/voltha-go/rw_core/config"
+	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
+	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"github.com/phayes/freeport"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
+	currentLogicalPorts := []*voltha.LogicalPort{}
+	updatedLogicalPorts := []*voltha.LogicalPort{}
+	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	assert.Equal(t, 0, len(newPorts))
+	assert.Equal(t, 0, len(changedPorts))
+	assert.Equal(t, 0, len(deletedPorts))
+}
+
+func TestLogicalDeviceAgent_diff_nochange_2(t *testing.T) {
+	currentLogicalPorts := []*voltha.LogicalPort{
+		{
+			Id:           "1231",
+			DeviceId:     "d1234",
+			DevicePortNo: 1,
+			RootPort:     true,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 1,
+				Name:   "port1",
+				Config: 1,
+				State:  1,
+			},
+		},
+		{
+			Id:           "1232",
+			DeviceId:     "d1234",
+			DevicePortNo: 2,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 2,
+				Name:   "port2",
+				Config: 1,
+				State:  1,
+			},
+		},
+		{
+			Id:           "1233",
+			DeviceId:     "d1234",
+			DevicePortNo: 3,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 3,
+				Name:   "port3",
+				Config: 1,
+				State:  1,
+			},
+		},
+	}
+	updatedLogicalPorts := []*voltha.LogicalPort{
+		{
+			Id:           "1231",
+			DeviceId:     "d1234",
+			DevicePortNo: 1,
+			RootPort:     true,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 1,
+				Name:   "port1",
+				Config: 1,
+				State:  1,
+			},
+		},
+		{
+			Id:           "1232",
+			DeviceId:     "d1234",
+			DevicePortNo: 2,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 2,
+				Name:   "port2",
+				Config: 1,
+				State:  1,
+			},
+		},
+		{
+			Id:           "1233",
+			DeviceId:     "d1234",
+			DevicePortNo: 3,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 3,
+				Name:   "port3",
+				Config: 1,
+				State:  1,
+			},
+		},
+	}
+	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	assert.Equal(t, 0, len(newPorts))
+	assert.Equal(t, 0, len(changedPorts))
+	assert.Equal(t, 0, len(deletedPorts))
+}
+
+func TestLogicalDeviceAgent_diff_add(t *testing.T) {
+	currentLogicalPorts := []*voltha.LogicalPort{}
+	updatedLogicalPorts := []*voltha.LogicalPort{
+		{
+			Id:           "1231",
+			DeviceId:     "d1234",
+			DevicePortNo: 1,
+			RootPort:     true,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 1,
+				Name:   "port1",
+				Config: 1,
+				State:  1,
+			},
+		},
+		{
+			Id:           "1232",
+			DeviceId:     "d1234",
+			DevicePortNo: 2,
+			RootPort:     true,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 2,
+				Name:   "port2",
+				Config: 1,
+				State:  1,
+			},
+		},
+	}
+	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	assert.Equal(t, 2, len(newPorts))
+	assert.Equal(t, 0, len(changedPorts))
+	assert.Equal(t, 0, len(deletedPorts))
+	assert.Equal(t, updatedLogicalPorts[0], newPorts[0])
+	assert.Equal(t, updatedLogicalPorts[1], newPorts[1])
+}
+
+func TestLogicalDeviceAgent_diff_delete(t *testing.T) {
+	currentLogicalPorts := []*voltha.LogicalPort{
+		{
+			Id:           "1231",
+			DeviceId:     "d1234",
+			DevicePortNo: 1,
+			RootPort:     true,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 1,
+				Name:   "port1",
+				Config: 1,
+				State:  1,
+			},
+		},
+	}
+	updatedLogicalPorts := []*voltha.LogicalPort{}
+	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	assert.Equal(t, 0, len(newPorts))
+	assert.Equal(t, 0, len(changedPorts))
+	assert.Equal(t, 1, len(deletedPorts))
+	assert.Equal(t, currentLogicalPorts[0], deletedPorts[0])
+}
+
+func TestLogicalDeviceAgent_diff_changed(t *testing.T) {
+	currentLogicalPorts := []*voltha.LogicalPort{
+		{
+			Id:           "1231",
+			DeviceId:     "d1234",
+			DevicePortNo: 1,
+			RootPort:     true,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 1,
+				Name:   "port1",
+				Config: 1,
+				State:  1,
+			},
+		},
+		{
+			Id:           "1232",
+			DeviceId:     "d1234",
+			DevicePortNo: 2,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 2,
+				Name:   "port2",
+				Config: 1,
+				State:  1,
+			},
+		},
+		{
+			Id:           "1233",
+			DeviceId:     "d1234",
+			DevicePortNo: 3,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 3,
+				Name:   "port3",
+				Config: 1,
+				State:  1,
+			},
+		},
+	}
+	updatedLogicalPorts := []*voltha.LogicalPort{
+		{
+			Id:           "1231",
+			DeviceId:     "d1234",
+			DevicePortNo: 1,
+			RootPort:     true,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 1,
+				Name:   "port1",
+				Config: 4,
+				State:  4,
+			},
+		},
+		{
+			Id:           "1232",
+			DeviceId:     "d1234",
+			DevicePortNo: 2,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 2,
+				Name:   "port2",
+				Config: 4,
+				State:  4,
+			},
+		},
+		{
+			Id:           "1233",
+			DeviceId:     "d1234",
+			DevicePortNo: 3,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 3,
+				Name:   "port3",
+				Config: 1,
+				State:  1,
+			},
+		},
+	}
+	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	assert.Equal(t, 0, len(newPorts))
+	assert.Equal(t, 2, len(changedPorts))
+	assert.Equal(t, 0, len(deletedPorts))
+	assert.Equal(t, updatedLogicalPorts[0], changedPorts[0])
+	assert.Equal(t, updatedLogicalPorts[1], changedPorts[1])
+}
+
+func TestLogicalDeviceAgent_diff_mix(t *testing.T) {
+	currentLogicalPorts := []*voltha.LogicalPort{
+		{
+			Id:           "1231",
+			DeviceId:     "d1234",
+			DevicePortNo: 1,
+			RootPort:     true,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 1,
+				Name:   "port1",
+				Config: 1,
+				State:  1,
+			},
+		},
+		{
+			Id:           "1232",
+			DeviceId:     "d1234",
+			DevicePortNo: 2,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 2,
+				Name:   "port2",
+				Config: 1,
+				State:  1,
+			},
+		},
+		{
+			Id:           "1233",
+			DeviceId:     "d1234",
+			DevicePortNo: 3,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 3,
+				Name:   "port3",
+				Config: 1,
+				State:  1,
+			},
+		},
+	}
+	updatedLogicalPorts := []*voltha.LogicalPort{
+		{
+			Id:           "1231",
+			DeviceId:     "d1234",
+			DevicePortNo: 1,
+			RootPort:     true,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 1,
+				Name:   "port1",
+				Config: 4,
+				State:  4,
+			},
+		},
+		{
+			Id:           "1232",
+			DeviceId:     "d1234",
+			DevicePortNo: 2,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 2,
+				Name:   "port2",
+				Config: 4,
+				State:  4,
+			},
+		},
+		{
+			Id:           "1234",
+			DeviceId:     "d1234",
+			DevicePortNo: 4,
+			RootPort:     false,
+			OfpPort: &ofp.OfpPort{
+				PortNo: 4,
+				Name:   "port4",
+				Config: 4,
+				State:  4,
+			},
+		},
+	}
+	newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+	assert.Equal(t, 1, len(newPorts))
+	assert.Equal(t, 2, len(changedPorts))
+	assert.Equal(t, 1, len(deletedPorts))
+	assert.Equal(t, updatedLogicalPorts[0], changedPorts[0])
+	assert.Equal(t, updatedLogicalPorts[1], changedPorts[1])
+	assert.Equal(t, currentLogicalPorts[2], deletedPorts[0])
+}
+
+type LDATest struct {
+	etcdServer       *mock_etcd.EtcdServer
+	deviceMgr        *Manager
+	kmp              kafka.InterContainerProxy
+	logicalDeviceMgr *LogicalManager
+	kClient          kafka.Client
+	kvClientPort     int
+	oltAdapterName   string
+	onuAdapterName   string
+	coreInstanceID   string
+	defaultTimeout   time.Duration
+	maxTimeout       time.Duration
+	logicalDevice    *voltha.LogicalDevice
+	deviceIds        []string
+	done             chan int
+}
+
+func newLDATest() *LDATest {
+	test := &LDATest{}
+	// Start the embedded etcd server
+	var err error
+	test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
+	if err != nil {
+		logger.Fatal(err)
+	}
+	// Create the kafka client
+	test.kClient = mock_kafka.NewKafkaClient()
+	test.oltAdapterName = "olt_adapter_mock"
+	test.onuAdapterName = "onu_adapter_mock"
+	test.coreInstanceID = "rw-da-test"
+	test.defaultTimeout = 5 * time.Second
+	test.maxTimeout = 20 * time.Second
+	test.done = make(chan int)
+	test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
+	test.logicalDevice = &voltha.LogicalDevice{
+		Desc: &ofp.OfpDesc{
+			HwDesc:    "olt_adapter_mock",
+			SwDesc:    "olt_adapter_mock",
+			SerialNum: com.GetRandomSerialNumber(),
+		},
+		SwitchFeatures: &ofp.OfpSwitchFeatures{
+			NBuffers: 256,
+			NTables:  2,
+			Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
+				ofp.OfpCapabilities_OFPC_TABLE_STATS |
+				ofp.OfpCapabilities_OFPC_PORT_STATS |
+				ofp.OfpCapabilities_OFPC_GROUP_STATS),
+		},
+		RootDeviceId: test.deviceIds[0],
+		Ports: []*voltha.LogicalPort{
+			{
+				Id:           "1001",
+				DeviceId:     test.deviceIds[0],
+				DevicePortNo: 1,
+				RootPort:     true,
+				OfpPort: &ofp.OfpPort{
+					PortNo: 1,
+					Name:   "port1",
+					Config: 4,
+					State:  4,
+				},
+			},
+			{
+				Id:           "1002",
+				DeviceId:     test.deviceIds[1],
+				DevicePortNo: 2,
+				RootPort:     false,
+				OfpPort: &ofp.OfpPort{
+					PortNo: 2,
+					Name:   "port2",
+					Config: 4,
+					State:  4,
+				},
+			},
+			{
+				Id:           "1003",
+				DeviceId:     test.deviceIds[2],
+				DevicePortNo: 3,
+				RootPort:     false,
+				OfpPort: &ofp.OfpPort{
+					PortNo: 4,
+					Name:   "port3",
+					Config: 4,
+					State:  4,
+				},
+			},
+		},
+	}
+	return test
+}
+
+func (lda *LDATest) startCore(inCompeteMode bool) {
+	cfg := config.NewRWCoreFlags()
+	cfg.CorePairTopic = "rw_core"
+	cfg.DefaultRequestTimeout = lda.defaultTimeout
+	cfg.KVStorePort = lda.kvClientPort
+	cfg.InCompetingMode = inCompeteMode
+	grpcPort, err := freeport.GetFreePort()
+	if err != nil {
+		logger.Fatal("Cannot get a freeport for grpc")
+	}
+	cfg.GrpcPort = grpcPort
+	cfg.GrpcHost = "127.0.0.1"
+	client := setupKVClient(cfg, lda.coreInstanceID)
+	backend := &db.Backend{
+		Client:                  client,
+		StoreType:               cfg.KVStoreType,
+		Host:                    cfg.KVStoreHost,
+		Port:                    cfg.KVStorePort,
+		Timeout:                 cfg.KVStoreTimeout,
+		LivenessChannelInterval: cfg.LiveProbeInterval / 2,
+		PathPrefix:              cfg.KVStoreDataPrefix}
+	lda.kmp = kafka.NewInterContainerProxy(
+		kafka.InterContainerHost(cfg.KafkaAdapterHost),
+		kafka.InterContainerPort(cfg.KafkaAdapterPort),
+		kafka.MsgClient(lda.kClient),
+		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
+		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+
+	endpointMgr := kafka.NewEndpointManager(backend)
+	proxy := model.NewProxy(backend, "/")
+	adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
+
+	lda.deviceMgr, lda.logicalDeviceMgr = NewDeviceManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
+	lda.logicalDeviceMgr.SetEventCallbacks(fakeEventCallbacks{})
+	if err = lda.kmp.Start(); err != nil {
+		logger.Fatal("Cannot start InterContainerProxy")
+	}
+	if err = adapterMgr.Start(context.Background()); err != nil {
+		logger.Fatal("Cannot start adapterMgr")
+	}
+	lda.deviceMgr.Start(context.Background())
+	lda.logicalDeviceMgr.Start(context.Background())
+}
+
+func (lda *LDATest) stopAll() {
+	if lda.kClient != nil {
+		lda.kClient.Stop()
+	}
+	if lda.logicalDeviceMgr != nil {
+		lda.logicalDeviceMgr.Stop(context.Background())
+	}
+	if lda.deviceMgr != nil {
+		lda.deviceMgr.Stop(context.Background())
+	}
+	if lda.kmp != nil {
+		lda.kmp.Stop()
+	}
+	if lda.etcdServer != nil {
+		stopEmbeddedEtcdServer(lda.etcdServer)
+	}
+}
+
+func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
+	lDeviceMgr := lda.logicalDeviceMgr
+	deviceMgr := lda.deviceMgr
+	clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
+	clonedLD.Id = com.GetRandomString(10)
+	clonedLD.DatapathId = rand.Uint64()
+	lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
+	lDeviceAgent.logicalDevice = clonedLD
+	err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "logical_devices", clonedLD.Id, clonedLD)
+	assert.Nil(t, err)
+	lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
+	return lDeviceAgent
+}
+
+func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalAgent, globalWG *sync.WaitGroup) {
+	originalLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
+	assert.NotNil(t, originalLogicalDevice)
+	var localWG sync.WaitGroup
+
+	// Change the state of the first port to FAILED
+	localWG.Add(1)
+	go func() {
+		err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
+		assert.Nil(t, err)
+		localWG.Done()
+	}()
+
+	// Change the state of the second port to TESTING
+	localWG.Add(1)
+	go func() {
+		err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
+		assert.Nil(t, err)
+		localWG.Done()
+	}()
+
+	// Change the state of the third port to UNKNOWN and then back to ACTIVE
+	localWG.Add(1)
+	go func() {
+		err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
+		assert.Nil(t, err)
+		err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
+		assert.Nil(t, err)
+		localWG.Done()
+	}()
+
+	// Add a meter to the logical device
+	meterMod := &ofp.OfpMeterMod{
+		Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+		Flags:   rand.Uint32(),
+		MeterId: rand.Uint32(),
+		Bands: []*ofp.OfpMeterBandHeader{
+			{Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+				Rate:      rand.Uint32(),
+				BurstSize: rand.Uint32(),
+				Data:      nil,
+			},
+		},
+	}
+	localWG.Add(1)
+	go func() {
+		err := ldAgent.meterAdd(context.Background(), meterMod)
+		assert.Nil(t, err)
+		localWG.Done()
+	}()
+
+	// wait for go routines to be done
+	localWG.Wait()
+
+	expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
+	expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+	expectedChange.Ports[0].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+	expectedChange.Ports[1].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+	expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+	expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+	expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+	expectedChange.Meters = &voltha.Meters{Items: nil}
+	expectedChange.Meters.Items = append(expectedChange.Meters.Items, fu.MeterEntryFromMeterMod(meterMod))
+	updatedLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
+	assert.NotNil(t, updatedLogicalDevice)
+	assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice))
+	globalWG.Done()
+}
+
+func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
+	lda := newLDATest()
+	assert.NotNil(t, lda)
+	defer lda.stopAll()
+
+	// Start the Core
+	lda.startCore(false)
+
+	var wg sync.WaitGroup
+	numConCurrentLogicalDeviceAgents := 3
+	for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
+		wg.Add(1)
+		a := lda.createLogicalDeviceAgent(t)
+		go lda.updateLogicalDeviceConcurrently(t, a, &wg)
+	}
+
+	wg.Wait()
+}