[VOL-4442] grpc streaming connection monitoring

Change-Id: I435a03fdc0ac2b549dc4512220148cb19c16db19
diff --git a/cmd/openolt-adapter/main.go b/cmd/openolt-adapter/main.go
index 434fc83..cffd2ab 100644
--- a/cmd/openolt-adapter/main.go
+++ b/cmd/openolt-adapter/main.go
@@ -38,10 +38,8 @@
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
 	ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
 	"github.com/opencord/voltha-protos/v5/go/adapter_service"
-	"github.com/opencord/voltha-protos/v5/go/common"
 	ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
 	"github.com/opencord/voltha-protos/v5/go/core_service"
-	"github.com/opencord/voltha-protos/v5/go/health"
 	"github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc"
@@ -55,16 +53,17 @@
 )
 
 type adapter struct {
-	instanceID  string
-	config      *config.AdapterFlags
-	grpcServer  *vgrpc.GrpcServer
-	oltAdapter  *ac.OpenOLT
-	kafkaClient kafka.Client
-	kvClient    kvstore.Client
-	coreClient  *vgrpc.Client
-	eventProxy  eventif.EventProxy
-	halted      bool
-	exitChannel chan int
+	instanceID      string
+	config          *config.AdapterFlags
+	grpcServer      *vgrpc.GrpcServer
+	oltAdapter      *ac.OpenOLT
+	oltInterAdapter *ac.OpenOLTInterAdapter
+	kafkaClient     kafka.Client
+	kvClient        kvstore.Client
+	coreClient      *vgrpc.Client
+	eventProxy      eventif.EventProxy
+	halted          bool
+	exitChannel     chan int
 }
 
 func newAdapter(cf *config.AdapterFlags) *adapter {
@@ -133,17 +132,23 @@
 	if a.coreClient, err = vgrpc.NewClient(
 		a.config.AdapterEndpoint,
 		a.config.CoreEndpoint,
+		"core_service.CoreService",
 		a.coreRestarted); err != nil {
 		logger.Fatal(ctx, "grpc-client-not-created")
 	}
 	// Start the core grpc client
-	go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
+	go a.coreClient.Start(ctx, getCoreServiceClientHandler)
 
 	// Create the open OLT adapter
 	if a.oltAdapter, err = a.startOpenOLT(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
 		logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
 	}
 
+	// Create the open OLT Inter adapter adapter
+	if a.oltInterAdapter, err = a.startOpenOLTInterAdapter(ctx, a.oltAdapter); err != nil {
+		logger.Fatalw(ctx, "error-starting-openolt-inter-adapter", log.Fields{"error": err})
+	}
+
 	// Create and start the grpc server
 	a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
 
@@ -151,7 +156,7 @@
 	a.addAdapterService(ctx, a.grpcServer, a.oltAdapter)
 
 	//Register the olt inter-adapter  service
-	a.addOltInterAdapterService(ctx, a.grpcServer, a.oltAdapter)
+	a.addOltInterAdapterService(ctx, a.grpcServer, a.oltInterAdapter)
 
 	// Start the grpc server
 	go a.startGRPCService(ctx, a.grpcServer, oltAdapterService)
@@ -171,13 +176,12 @@
 	return nil
 }
 
-// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
-func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
-	svc := core_service.NewCoreServiceClient(conn)
-	if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
+// getCoreServiceClientHandler is used to test whether the remote gRPC service is up
+func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+	if conn == nil {
 		return nil
 	}
-	return svc
+	return core_service.NewCoreServiceClient(conn)
 }
 
 /**
@@ -239,6 +243,14 @@
 	// send exit signal
 	a.exitChannel <- 0
 
+	// Stop all grpc processing
+	if err := a.oltAdapter.Stop(ctx); err != nil {
+		logger.Errorw(ctx, "failure-stopping-olt-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
+	}
+	if err := a.oltInterAdapter.Stop(ctx); err != nil {
+		logger.Errorw(ctx, "failure-stopping-olt-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
+	}
+
 	// Cleanup - applies only if we had a kvClient
 	if a.kvClient != nil {
 		// Release all reservations
@@ -346,6 +358,19 @@
 	return sOLT, nil
 }
 
+func (a *adapter) startOpenOLTInterAdapter(ctx context.Context, oo *ac.OpenOLT) (*ac.OpenOLTInterAdapter, error) {
+	logger.Info(ctx, "starting-open-olt-inter-adapter")
+	var err error
+	sOLTInterAdapter := ac.NewOpenOLTInterAdapter(oo)
+
+	if err = sOLTInterAdapter.Start(ctx); err != nil {
+		return nil, err
+	}
+
+	logger.Info(ctx, "open-olt-inter-adapter-started")
+	return sOLTInterAdapter, nil
+}
+
 func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
 	adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
 	logger.Infow(ctx, "registering-with-core", log.Fields{
diff --git a/cmd/openolt-adapter/main_test.go b/cmd/openolt-adapter/main_test.go
index a70949d..5c470ac 100644
--- a/cmd/openolt-adapter/main_test.go
+++ b/cmd/openolt-adapter/main_test.go
@@ -86,11 +86,12 @@
 	if ad.coreClient, err = vgrpc.NewClient(
 		"olt-endpoint",
 		ms.ApiEndpoint,
+		"core_service.CoreService",
 		ad.coreRestarted); err != nil {
 		t.Errorf("grpc client: expected error:nil, got error: %v", err)
 	}
 	// Start the core grpc client
-	go ad.coreClient.Start(ctx, setAndTestCoreServiceHandler)
+	go ad.coreClient.Start(ctx, getCoreServiceClientHandler)
 	defer ad.coreClient.Stop(ctx)
 	err = ad.registerWithCore(ctx, coreService, 1)
 	if err != nil {