[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index e3716a7..0577c1e 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -18,7 +18,6 @@
 
 import (
 	"context"
-	"fmt"
 	"math/rand"
 	"sort"
 	"strconv"
@@ -27,21 +26,23 @@
 	"testing"
 	"time"
 
+	ver "github.com/opencord/voltha-lib-go/v7/pkg/version"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
-	cm "github.com/opencord/voltha-go/rw_core/mocks"
 	tst "github.com/opencord/voltha-go/rw_core/test"
-	com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
-	"github.com/opencord/voltha-lib-go/v5/pkg/db"
-	"github.com/opencord/voltha-lib-go/v5/pkg/events"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	mock_etcd "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
-	mock_kafka "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
-	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	com "github.com/opencord/voltha-lib-go/v7/pkg/adapters/common"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events"
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	mock_etcd "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"github.com/phayes/freeport"
 	"github.com/stretchr/testify/assert"
 )
@@ -51,17 +52,13 @@
 	deviceMgr        *Manager
 	logicalDeviceMgr *LogicalManager
 	adapterMgr       *adapter.Manager
-	kmp              kafka.InterContainerProxy
 	kClient          kafka.Client
 	kEventClient     kafka.Client
 	kvClientPort     int
-	oltAdapter       *cm.OLTAdapter
-	onuAdapter       *cm.ONUAdapter
 	oltAdapterName   string
 	onuAdapterName   string
 	coreInstanceID   string
-	defaultTimeout   time.Duration
-	maxTimeout       time.Duration
+	internalTimeout  time.Duration
 	device           *voltha.Device
 	devicePorts      map[uint32]*voltha.Port
 	done             chan int
@@ -78,24 +75,22 @@
 	// Create the kafka client
 	test.kClient = mock_kafka.NewKafkaClient()
 	test.kEventClient = mock_kafka.NewKafkaClient()
-	test.oltAdapterName = "olt_adapter_mock"
-	test.onuAdapterName = "onu_adapter_mock"
+	test.oltAdapterName = "olt-mock-adapter"
+	test.onuAdapterName = "onu-mock-adapter"
 	test.coreInstanceID = "rw-da-test"
-	test.defaultTimeout = 5 * time.Second
-	test.maxTimeout = 20 * time.Second
+	test.internalTimeout = 5 * time.Second
 	test.done = make(chan int)
 	parentID := com.GetRandomString(10)
 	test.device = &voltha.Device{
-		Type:         "onu_adapter_mock",
+		Type:         "onu-mock-device-type",
 		ParentId:     parentID,
 		ParentPortNo: 1,
-		VendorId:     "onu_adapter_mock",
-		Adapter:      "onu_adapter_mock",
+		VendorId:     "onu-mock-vendor",
 		Vlan:         100,
 		Address:      nil,
 		ProxyAddress: &voltha.Device_ProxyAddress{
 			DeviceId:           parentID,
-			DeviceType:         "olt_adapter_mock",
+			DeviceType:         "olt-mock-device-type",
 			ChannelId:          100,
 			ChannelGroupId:     0,
 			ChannelTermination: "",
@@ -119,15 +114,14 @@
 func (dat *DATest) startCore(ctx context.Context) {
 	cfg := &config.RWCoreFlags{}
 	cfg.ParseCommandArguments([]string{})
-	cfg.CoreTopic = "rw_core"
 	cfg.EventTopic = "voltha.events"
-	cfg.DefaultRequestTimeout = dat.defaultTimeout
+	cfg.InternalTimeout = dat.internalTimeout
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
 	grpcPort, err := freeport.GetFreePort()
 	if err != nil {
 		logger.Fatal(ctx, "Cannot get a freeport for grpc")
 	}
-	cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
+	cfg.GrpcNBIAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
 	client := tst.SetupKVClient(ctx, cfg, dat.coreInstanceID)
 	backend := &db.Backend{
 		Client:                  client,
@@ -135,34 +129,20 @@
 		Address:                 cfg.KVStoreAddress,
 		Timeout:                 cfg.KVStoreTimeout,
 		LivenessChannelInterval: cfg.LiveProbeInterval / 2}
-	dat.kmp = kafka.NewInterContainerProxy(
-		kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
-		kafka.MsgClient(dat.kClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
 
-	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
-	dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
+	dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, backend, 5)
 	eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
-	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg, dat.coreInstanceID, eventProxy)
-	dat.adapterMgr.Start(context.Background())
-	if err = dat.kmp.Start(ctx); err != nil {
-		logger.Fatal(ctx, "Cannot start InterContainerProxy")
-	}
-
-	if err := dat.kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
-		logger.Fatalf(ctx, "Cannot add default request handler: %s", err)
-	}
-
+	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, cfg, dat.coreInstanceID, eventProxy)
+	dat.adapterMgr.Start(context.Background(), "agent-test")
+	dat.registerAdapters(context.Background())
+	log.SetAllLogLevel(log.FatalLevel)
 }
 
 func (dat *DATest) stopAll(ctx context.Context) {
 	if dat.kClient != nil {
 		dat.kClient.Stop(ctx)
 	}
-	if dat.kmp != nil {
-		dat.kmp.Stop(ctx)
-	}
 	if dat.etcdServer != nil {
 		tst.StopEmbeddedEtcdServer(ctx, dat.etcdServer)
 	}
@@ -174,7 +154,7 @@
 func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
 	deviceMgr := dat.deviceMgr
 	clonedDevice := proto.Clone(dat.device).(*voltha.Device)
-	deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.defaultTimeout)
+	deviceAgent := newAgent(clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.internalTimeout, deviceMgr.rpcTimeout)
 	d, err := deviceAgent.start(context.TODO(), false, clonedDevice)
 	assert.Nil(t, err)
 	assert.NotNil(t, d)
@@ -215,19 +195,13 @@
 		deviceToUpdate.MacAddress = macAddress
 		deviceToUpdate.Vlan = vlan
 		deviceToUpdate.Reason = reason
+		deviceToUpdate.OperStatus = voltha.OperStatus_ACTIVE
+		deviceToUpdate.ConnectStatus = voltha.ConnectStatus_REACHABLE
 		err := da.updateDeviceUsingAdapterData(context.Background(), deviceToUpdate)
 		assert.Nil(t, err)
 		localWG.Done()
 	}()
 
-	// Update the device status routine
-	localWG.Add(1)
-	go func() {
-		err := da.updateDeviceStatus(context.Background(), voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
-		assert.Nil(t, err)
-		localWG.Done()
-	}()
-
 	// Add a port routine
 	localWG.Add(1)
 	go func() {
@@ -253,8 +227,6 @@
 	updatedDevice, _ := da.getDeviceReadOnly(context.Background())
 	updatedDevicePorts := da.listDevicePorts()
 	assert.NotNil(t, updatedDevice)
-	fmt.Printf("1 %+v\n", expectedChange)
-	fmt.Printf("2 %+v\n", updatedDevice)
 	assert.True(t, proto.Equal(expectedChange, updatedDevice))
 	assert.Equal(t, len(originalDevicePorts)+1, len(updatedDevicePorts))
 	assert.True(t, proto.Equal(updatedDevicePorts[portToAdd.PortNo], portToAdd))
@@ -268,7 +240,7 @@
 		da := newDATest(ctx)
 		assert.NotNil(t, da)
 		defer da.stopAll(ctx)
-		log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
+
 		// Start the Core
 		da.startCore(ctx)
 
@@ -279,7 +251,6 @@
 			a := da.createDeviceAgent(t)
 			go da.updateDeviceConcurrently(t, a, &wg)
 		}
-
 		wg.Wait()
 	}
 }
@@ -292,8 +263,6 @@
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
 	// Start the Core
 	da.startCore(ctx)
-	da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
-
 	a := da.createDeviceAgent(t)
 	err1 := a.requestQueue.WaitForGreenLight(ctx)
 	assert.Nil(t, err1)
@@ -312,7 +281,6 @@
 
 	// Start the Core
 	da.startCore(ctx)
-	da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
 	a := da.createDeviceAgent(t)
 	err1 := a.requestQueue.WaitForGreenLight(ctx)
 	assert.Nil(t, err1)
@@ -380,7 +348,9 @@
 		{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
 	}
 	err := da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	// Expect specific error as adapter communication, for unit tests, are not set
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "flow-failure-device-"))
 	daFlows := changeToFlowList(da.listDeviceFlows())
 	assert.True(t, isFlowSliceEqual(newFlows, daFlows))
 
@@ -399,7 +369,8 @@
 	}
 
 	err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "flow-failure-device-"))
 	daFlows = changeToFlowList(da.listDeviceFlows())
 	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
 
@@ -420,7 +391,8 @@
 	}
 
 	err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "Aborted"))
 	daFlows = changeToFlowList(da.listDeviceFlows())
 	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
 
@@ -459,7 +431,8 @@
 	}
 
 	err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "Aborted"))
 	daFlows = changeToFlowList(da.listDeviceFlows())
 	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
 	//Delete flows with an unexisting one
@@ -474,7 +447,8 @@
 	}
 
 	err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "Aborted"))
 	daFlows = changeToFlowList(da.listDeviceFlows())
 	assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
 }
@@ -486,7 +460,8 @@
 		{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
 	}
 	err := da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "flow-failure-device-"))
 	daGroups := changeToGroupList(da.listDeviceGroups())
 	assert.True(t, isGroupSliceEqual(newGroups, daGroups))
 
@@ -502,7 +477,8 @@
 		{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
 	}
 	err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "Aborted"))
 	daGroups = changeToGroupList(da.listDeviceGroups())
 	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
 
@@ -520,7 +496,8 @@
 		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
 	err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "Aborted"))
 	daGroups = changeToGroupList(da.listDeviceGroups())
 	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
 
@@ -536,7 +513,8 @@
 		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
 	err = da.updateFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, updtGroups, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "Aborted"))
 	daGroups = changeToGroupList(da.listDeviceGroups())
 	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
 
@@ -551,7 +529,8 @@
 		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
 	err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "Aborted"))
 	daGroups = changeToGroupList(da.listDeviceGroups())
 	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
 
@@ -565,7 +544,49 @@
 		{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
 	}
 	err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
-	assert.Nil(t, err)
+	assert.NotNil(t, err)
+	assert.True(t, strings.Contains(err.Error(), "Aborted"))
 	daGroups = changeToGroupList(da.listDeviceGroups())
 	assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
 }
+
+// registerAdapters registers the ONU and OLT adapters
+func (dat *DATest) registerAdapters(ctx context.Context) {
+	oltAdapter := &voltha.Adapter{
+		Id:             "olt-mock-adapter-1",
+		Vendor:         "olt-mock-vendor",
+		Version:        ver.VersionInfo.Version,
+		Type:           "olt-mock-adapter-type",
+		CurrentReplica: 1,
+		TotalReplicas:  1,
+		Endpoint:       "mock-olt-endpoint",
+	}
+	types := []*voltha.DeviceType{{Id: "olt-mock-device-type", AdapterType: "olt-mock-adapter-type", AcceptsAddRemoveFlowUpdates: true}}
+	deviceTypes := &voltha.DeviceTypes{Items: types}
+	_, err := dat.adapterMgr.RegisterAdapter(ctx, &ic.AdapterRegistration{
+		Adapter: oltAdapter,
+		DTypes:  deviceTypes,
+	})
+	if err != nil {
+		logger.Fatalw(ctx, "olt registration failed", log.Fields{"error": err})
+	}
+
+	onuAdapter := &voltha.Adapter{
+		Id:             "onu-mock-adapter-1",
+		Vendor:         "onu-mock-vendor",
+		Version:        ver.VersionInfo.Version,
+		Type:           "onu-mock-adapter-type",
+		CurrentReplica: 1,
+		TotalReplicas:  1,
+		Endpoint:       "mock-onu-endpoint",
+	}
+	types = []*voltha.DeviceType{{Id: "onu-mock-device-type", AdapterType: "onu-mock-adapter-type", AcceptsAddRemoveFlowUpdates: true}}
+	deviceTypes = &voltha.DeviceTypes{Items: types}
+	_, err = dat.adapterMgr.RegisterAdapter(ctx, &ic.AdapterRegistration{
+		Adapter: onuAdapter,
+		DTypes:  deviceTypes,
+	})
+	if err != nil {
+		logger.Fatalw(ctx, "onu registration failed", log.Fields{"error": err})
+	}
+}