[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 8114dd0..e611019 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -28,15 +28,15 @@
 	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	tst "github.com/opencord/voltha-go/rw_core/test"
-	com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
-	"github.com/opencord/voltha-lib-go/v5/pkg/db"
-	"github.com/opencord/voltha-lib-go/v5/pkg/events"
-	fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	mock_etcd "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
-	mock_kafka "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
-	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	com "github.com/opencord/voltha-lib-go/v7/pkg/adapters/common"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events"
+	fu "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	mock_etcd "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"github.com/phayes/freeport"
 	"github.com/stretchr/testify/assert"
 )
@@ -44,7 +44,6 @@
 type LDATest struct {
 	etcdServer       *mock_etcd.EtcdServer
 	deviceMgr        *Manager
-	kmp              kafka.InterContainerProxy
 	logicalDeviceMgr *LogicalManager
 	kClient          kafka.Client
 	kEventClient     kafka.Client
@@ -52,8 +51,8 @@
 	oltAdapterName   string
 	onuAdapterName   string
 	coreInstanceID   string
-	defaultTimeout   time.Duration
-	maxTimeout       time.Duration
+	internalTimeout  time.Duration
+	rpcTimeout       time.Duration
 	logicalDevice    *voltha.LogicalDevice
 	logicalPorts     map[uint32]*voltha.LogicalPort
 	deviceIds        []string
@@ -74,8 +73,8 @@
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"
-	test.defaultTimeout = 5 * time.Second
-	test.maxTimeout = 20 * time.Second
+	test.internalTimeout = 5 * time.Second
+	test.rpcTimeout = 20 * time.Second
 	test.done = make(chan int)
 	test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
 	test.logicalDevice = &voltha.LogicalDevice{
@@ -138,15 +137,14 @@
 func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
 	cfg := &config.RWCoreFlags{}
 	cfg.ParseCommandArguments([]string{})
-	cfg.CoreTopic = "rw_core"
 	cfg.EventTopic = "voltha.events"
-	cfg.DefaultRequestTimeout = lda.defaultTimeout
+	cfg.InternalTimeout = lda.internalTimeout
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
 	grpcPort, err := freeport.GetFreePort()
 	if err != nil {
 		logger.Fatal(ctx, "Cannot get a freeport for grpc")
 	}
-	cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
+	cfg.GrpcNBIAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
 	client := tst.SetupKVClient(ctx, cfg, lda.coreInstanceID)
 	backend := &db.Backend{
 		Client:                  client,
@@ -154,29 +152,18 @@
 		Address:                 cfg.KVStoreAddress,
 		Timeout:                 cfg.KVStoreTimeout,
 		LivenessChannelInterval: cfg.LiveProbeInterval / 2}
-	lda.kmp = kafka.NewInterContainerProxy(
-		kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
-		kafka.MsgClient(lda.kClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
 
-	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
-	adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
+	adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, backend, 5)
 	eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
-	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg, lda.coreInstanceID, eventProxy)
-	if err = lda.kmp.Start(ctx); err != nil {
-		logger.Fatal(ctx, "Cannot start InterContainerProxy")
-	}
-	adapterMgr.Start(context.Background())
+	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, cfg, lda.coreInstanceID, eventProxy)
+	adapterMgr.Start(context.Background(), "logical-test")
 }
 
 func (lda *LDATest) stopAll(ctx context.Context) {
 	if lda.kClient != nil {
 		lda.kClient.Stop(ctx)
 	}
-	if lda.kmp != nil {
-		lda.kmp.Stop(ctx)
-	}
 	if lda.etcdServer != nil {
 		tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
 	}
@@ -191,7 +178,7 @@
 	clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
 	clonedLD.Id = com.GetRandomString(10)
 	clonedLD.DatapathId = rand.Uint64()
-	lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
+	lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.internalTimeout)
 	lDeviceAgent.logicalDevice = clonedLD
 	for _, port := range lda.logicalPorts {
 		clonedPort := proto.Clone(port).(*voltha.LogicalPort)