[VOL-4442] grpc streaming connection monitoring

Change-Id: I435a03fdc0ac2b549dc4512220148cb19c16db19
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 0e46594..8248278 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -51,7 +51,6 @@
 	"github.com/opencord/voltha-protos/v5/go/common"
 	ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
 	"github.com/opencord/voltha-protos/v5/go/extension"
-	"github.com/opencord/voltha-protos/v5/go/health"
 	ia "github.com/opencord/voltha-protos/v5/go/inter_adapter"
 	"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
 	of "github.com/opencord/voltha-protos/v5/go/openflow_13"
@@ -85,7 +84,7 @@
 	lockChildAdapterClients sync.RWMutex
 	EventProxy              eventif.EventProxy
 	openOLT                 *OpenOLT
-	exitChannel             chan int
+	exitChannel             chan struct{}
 	lockDevice              sync.RWMutex
 	Client                  oop.OpenoltClient
 	transitionMap           *TransitionMap
@@ -192,7 +191,7 @@
 	cloned := (proto.Clone(device)).(*voltha.Device)
 	dh.device = cloned
 	dh.openOLT = adapter
-	dh.exitChannel = make(chan int, 1) // TODO: Why buffered?
+	dh.exitChannel = make(chan struct{})
 	dh.lockDevice = sync.RWMutex{}
 	dh.stopCollector = make(chan bool, 2)      // TODO: Why buffered?
 	dh.stopHeartbeatCheck = make(chan bool, 2) // TODO: Why buffered?
@@ -229,13 +228,15 @@
 	logger.Debug(ctx, "device-agent-started")
 }
 
-// stop stops the device dh.  Not much to do for now
-func (dh *DeviceHandler) stop(ctx context.Context) {
+// Stop stops the device handler
+func (dh *DeviceHandler) Stop(ctx context.Context) {
 	dh.lockDevice.Lock()
 	defer dh.lockDevice.Unlock()
 	logger.Debug(ctx, "stopping-device-agent")
-	dh.exitChannel <- 1
+	close(dh.exitChannel)
 
+	// Delete (which will stop also) all grpc connections to the child adapters
+	dh.deleteAdapterClients(ctx)
 	logger.Debug(ctx, "device-agent-stopped")
 }
 
@@ -3219,11 +3220,13 @@
 	if dh.childAdapterClients[endpoint], err = vgrpc.NewClient(
 		dh.cfg.AdapterEndpoint,
 		endpoint,
-		dh.onuAdapterRestarted); err != nil {
+		"onu_inter_adapter_service.OnuInterAdapterService",
+		dh.onuInterAdapterRestarted,
+		vgrpc.ClientContextData(dh.device.Id)); err != nil {
 		logger.Errorw(ctx, "grpc-client-not-created", log.Fields{"error": err, "endpoint": endpoint})
 		return err
 	}
-	go dh.childAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), dh.setAndTestOnuInterAdapterServiceHandler)
+	go dh.childAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), dh.getOnuInterAdapterServiceClientHandler)
 
 	// Wait until we have a connection to the child adapter.
 	// Unlimited retries or until context expires
@@ -3281,30 +3284,18 @@
 	}
 }
 
-// TODO:  Any action the adapter needs to do following a onu adapter restart?
-func (dh *DeviceHandler) onuAdapterRestarted(ctx context.Context, endPoint string) error {
-	logger.Warnw(ctx, "onu-adapter-reconnected", log.Fields{"endpoint": endPoint})
+// TODO:  Any action the adapter needs to do following a onu adapter inter adapter service restart?
+func (dh *DeviceHandler) onuInterAdapterRestarted(ctx context.Context, endPoint string) error {
+	logger.Warnw(ctx, "onu-inter-adapter-service-reconnected", log.Fields{"endpoint": endPoint})
 	return nil
 }
 
-// setAndTestOnuInterAdapterServiceHandler is used to test whether the remote gRPC service is up
-func (dh *DeviceHandler) setAndTestOnuInterAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
-	// The onu adapter needs to know whether the olt adapter can connect to it.   Since the olt adapter
-	// has a grpc client connection per device handler (i.e. per olt device) to the onu adapter
-	// then the onu adapter needs to know whether that specific client can connect to it. Because the
-	// client uses a polling mechanism then not all grpc clients could be connected at the same time,
-	// a maximum difference of 5 sec.  We therefore add the parent device as additional contextual information
-	// to this request.
-	dh.lockDevice.RLock()
-	if dh.device != nil {
-		clientConn.ContextInfo = dh.device.Id
-	}
-	dh.lockDevice.RUnlock()
-	svc := onu_inter_adapter_service.NewOnuInterAdapterServiceClient(conn)
-	if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
+// getOnuInterAdapterServiceClientHandler is used to setup the remote gRPC service
+func (dh *DeviceHandler) getOnuInterAdapterServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+	if conn == nil {
 		return nil
 	}
-	return svc
+	return onu_inter_adapter_service.NewOnuInterAdapterServiceClient(conn)
 }
 
 func (dh *DeviceHandler) setDeviceDeletionInProgressFlag(flag bool) {