[VOL-4292] OpenOLT Adapter changes for gRPC migration

Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/cmd/openolt-adapter/main_test.go b/cmd/openolt-adapter/main_test.go
index 617007d..f33efd0 100644
--- a/cmd/openolt-adapter/main_test.go
+++ b/cmd/openolt-adapter/main_test.go
@@ -17,30 +17,19 @@
 
 import (
 	"context"
-	"errors"
 	"testing"
-	"time"
 
-	conf "github.com/opencord/voltha-lib-go/v6/pkg/config"
-
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-
-	"github.com/opencord/voltha-lib-go/v6/pkg/kafka"
+	conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
+	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	mgrpc "github.com/opencord/voltha-lib-go/v7/pkg/mocks/grpc"
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
-	"github.com/opencord/voltha-openolt-adapter/pkg/mocks"
-	ca "github.com/opencord/voltha-protos/v4/go/inter_container"
 	"go.etcd.io/etcd/pkg/mock/mockserver"
 )
 
 func newMockAdapter() *adapter {
-	conf := config.NewAdapterFlags()
-	conf.KVStoreType = "etcd"
-	cp := mocks.MockCoreProxy{}
-	ap := mocks.MockAdapterProxy{}
-	ad := newAdapter(conf)
-	ad.coreProxy = &cp
-	ad.adapterProxy = &ap
+	cf := config.NewAdapterFlags()
+	cf.KVStoreType = "etcd"
+	ad := newAdapter(cf)
 	return ad
 }
 func Test_adapter_setKVClient(t *testing.T) {
@@ -85,24 +74,27 @@
 func Test_registerWithCore(t *testing.T) {
 	ad := newMockAdapter()
 	ctx := context.TODO()
-	err := ad.registerWithCore(ctx, 1)
+	// Create a and start a mock Core GRPC server
+	ms, err := mgrpc.NewMockGRPCServer(ctx)
+	if err != nil {
+		t.Errorf("grpc server: expected error:nil, got error: %v", err)
+	}
+	ms.AddCoreService(ctx, &vgrpc.MockCoreServiceHandler{})
+	go ms.Start(ctx)
+	defer ms.Stop()
+
+	if ad.coreClient, err = vgrpc.NewClient(ms.ApiEndpoint,
+		ad.coreRestarted); err != nil {
+		t.Errorf("grpc client: expected error:nil, got error: %v", err)
+	}
+	// Start the core grpc client
+	go ad.coreClient.Start(ctx, setAndTestCoreServiceHandler)
+	defer ad.coreClient.Stop(ctx)
+	err = ad.registerWithCore(ctx, coreService, 1)
 	if err != nil {
 		t.Errorf("Expected error:nil, got error: %v", err)
 	}
 }
-func Test_startInterContainerProxy(t *testing.T) {
-	ad := newMockAdapter()
-	kc := &mockKafkaClient{}
-	ad.kafkaClient = kc
-	ctx := context.TODO()
-	icp, err := ad.startInterContainerProxy(ctx, 1)
-	if icp != nil {
-		t.Log("Intercontainer proxy ", icp)
-	}
-	if err != nil {
-		t.Errorf("err %v", err)
-	}
-}
 
 func Test_startOpenOLT(t *testing.T) {
 	a, _ := mockserver.StartMockServers(1)
@@ -111,8 +103,7 @@
 	defer a.StopAt(0)
 
 	ad := newMockAdapter()
-	oolt, err := ad.startOpenOLT(context.TODO(), nil,
-		ad.coreProxy, ad.adapterProxy, ad.eventProxy, ad.config, cm)
+	oolt, err := ad.startOpenOLT(context.TODO(), nil, ad.eventProxy, ad.config, cm)
 	if oolt != nil {
 		t.Log("Open OLT ", oolt)
 	}
@@ -136,8 +127,8 @@
 		wantErr bool
 	}{
 		// TODO: Add test cases.
-		{"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaAdapterAddress}, false},
-		{"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaAdapterAddress}, false},
+		{"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
+		{"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -150,83 +141,3 @@
 		})
 	}
 }
-
-func Test_adapter_setupRequestHandler(t *testing.T) {
-
-	ad := newMockAdapter()
-	cm := &conf.ConfigManager{}
-
-	kip := kafka.NewInterContainerProxy(
-		kafka.InterContainerAddress(ad.config.KafkaAdapterAddress),
-		kafka.MsgClient(&mockKafkaClient{}),
-		kafka.DefaultTopic(&kafka.Topic{Name: ad.config.Topic}))
-
-	ad.kip = kip
-	_ = ad.kip.Start(context.Background())
-
-	oolt, _ := ad.startOpenOLT(context.TODO(), nil,
-		ad.coreProxy, ad.adapterProxy, ad.eventProxy, ad.config, cm)
-	printBanner()
-	printVersion()
-	ctx := context.TODO()
-	if err := ad.setupRequestHandler(ctx, ad.config.InstanceID, oolt); err != nil {
-		t.Logf("adapter.setupRequestHandler() error = %v", err)
-	}
-
-}
-
-// Kafka client mocker
-type mockKafkaClient struct {
-}
-
-func (kc *mockKafkaClient) Start(ctx context.Context) error {
-	return nil
-}
-func (kc *mockKafkaClient) Stop(ctx context.Context) {
-}
-func (kc *mockKafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
-	if topic != nil {
-		return nil
-	}
-	return errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
-	if topic != nil {
-		return nil
-	}
-	return errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ca.InterContainerMessage, error) {
-	if topic != nil {
-		ch := make(chan *ca.InterContainerMessage)
-		return ch, nil
-	}
-	return nil, errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ca.InterContainerMessage) error {
-	if topic == nil {
-		return nil
-	}
-	return errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
-	if topic != nil {
-		return nil
-	}
-	return errors.New("invalid topic")
-}
-
-func (kc *mockKafkaClient) SendLiveness(ctx context.Context) error {
-	return status.Error(codes.Unimplemented, "SendLiveness")
-}
-
-func (kc *mockKafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
-	return nil
-}
-
-func (kc *mockKafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
-	return nil
-}
-
-func (kc *mockKafkaClient) SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time)) {
-}