[VOL-4442] grpc streaming connection monitoring

Change-Id: I8a361473a252f6d2b64578a97980b2b7b3618f55
diff --git a/cmd/openonu-adapter/main.go b/cmd/openonu-adapter/main.go
index 96bfabe..edc5088 100644
--- a/cmd/openonu-adapter/main.go
+++ b/cmd/openonu-adapter/main.go
@@ -38,9 +38,7 @@
 	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
 	"github.com/opencord/voltha-lib-go/v7/pkg/version"
 	"github.com/opencord/voltha-protos/v5/go/adapter_service"
-	"github.com/opencord/voltha-protos/v5/go/common"
 	"github.com/opencord/voltha-protos/v5/go/core_service"
-	"github.com/opencord/voltha-protos/v5/go/health"
 	"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc"
@@ -60,14 +58,15 @@
 
 type adapter struct {
 	//defaultAppName   string
-	instanceID  string
-	config      *config.AdapterFlags
-	kafkaClient kafka.Client
-	kvClient    kvstore.Client
-	eventProxy  eventif.EventProxy
-	grpcServer  *vgrpc.GrpcServer
-	onuAdapter  *ac.OpenONUAC
-	coreClient  *vgrpc.Client
+	instanceID      string
+	config          *config.AdapterFlags
+	kafkaClient     kafka.Client
+	kvClient        kvstore.Client
+	eventProxy      eventif.EventProxy
+	grpcServer      *vgrpc.GrpcServer
+	onuAdapter      *ac.OpenONUAC
+	onuInterAdapter *ac.OpenONUACInterAdapter
+	coreClient      *vgrpc.Client
 }
 
 func newAdapter(cf *config.AdapterFlags) *adapter {
@@ -133,15 +132,21 @@
 	if a.coreClient, err = vgrpc.NewClient(
 		a.config.AdapterEndpoint,
 		a.config.CoreEndpoint,
+		"core_service.CoreService",
 		a.coreRestarted); err != nil {
 		logger.Fatal(ctx, "grpc-client-not-created")
 	}
 	// Start the core grpc client
-	go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
+	go a.coreClient.Start(ctx, getCoreServiceClientHandler)
 
 	// Create the open ONU interface adapter
-	if a.onuAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
-		logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
+	if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
+		logger.Fatalw(ctx, "error-starting-startONUAdapter", log.Fields{"error": err})
+	}
+
+	// Create the open ONU Inter adapter
+	if a.onuInterAdapter, err = a.startONUInterAdapter(ctx, a.onuAdapter); err != nil {
+		logger.Fatalw(ctx, "error-starting-startONUInterAdapter", log.Fields{"error": err})
 	}
 
 	// Create and start the grpc server
@@ -151,7 +156,7 @@
 	a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
 
 	//Register the onu inter adapter  service
-	a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuAdapter)
+	a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuInterAdapter)
 
 	go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
 
@@ -171,16 +176,22 @@
 	return nil
 }
 
-// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
-func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
-	svc := core_service.NewCoreServiceClient(conn)
-	if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
+// getCoreServiceClientHandler is used to setup the remote gRPC service
+func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+	if conn == nil {
 		return nil
 	}
-	return svc
+	return core_service.NewCoreServiceClient(conn)
 }
 
 func (a *adapter) stop(ctx context.Context) {
+	// Cleanup the grpc services first
+	if err := a.onuAdapter.Stop(ctx); err != nil {
+		logger.Errorw(ctx, "failure-stopping-onu-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
+	}
+	if err := a.onuInterAdapter.Stop(ctx); err != nil {
+		logger.Errorw(ctx, "failure-stopping-onu-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
+	}
 	// Cleanup - applies only if we had a kvClient
 	if a.kvClient != nil {
 		// Release all reservations
@@ -248,7 +259,7 @@
 	return nil
 }
 
-func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
+func (a *adapter) startONUAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
 	cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
 	var err error
 	sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
@@ -262,6 +273,19 @@
 	return sAcONU, nil
 }
 
+func (a *adapter) startONUInterAdapter(ctx context.Context, onuA *ac.OpenONUAC) (*ac.OpenONUACInterAdapter, error) {
+	var err error
+	sAcONUInterAdapter := ac.NewOpenONUACAdapter(ctx, onuA)
+
+	if err = sAcONUInterAdapter.Start(ctx); err != nil {
+		logger.Fatalw(ctx, "error-starting-OpenONUACInterAdapter", log.Fields{"error": err})
+		return nil, err
+	}
+
+	logger.Info(ctx, "OpenONUACInterAdapter-started")
+	return sAcONUInterAdapter, nil
+}
+
 func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
 	adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
 	vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")