[VOL-4442] grpc streaming connection monitoring
Change-Id: I435a03fdc0ac2b549dc4512220148cb19c16db19
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 0e46594..8248278 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -51,7 +51,6 @@
"github.com/opencord/voltha-protos/v5/go/common"
ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
"github.com/opencord/voltha-protos/v5/go/extension"
- "github.com/opencord/voltha-protos/v5/go/health"
ia "github.com/opencord/voltha-protos/v5/go/inter_adapter"
"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
of "github.com/opencord/voltha-protos/v5/go/openflow_13"
@@ -85,7 +84,7 @@
lockChildAdapterClients sync.RWMutex
EventProxy eventif.EventProxy
openOLT *OpenOLT
- exitChannel chan int
+ exitChannel chan struct{}
lockDevice sync.RWMutex
Client oop.OpenoltClient
transitionMap *TransitionMap
@@ -192,7 +191,7 @@
cloned := (proto.Clone(device)).(*voltha.Device)
dh.device = cloned
dh.openOLT = adapter
- dh.exitChannel = make(chan int, 1) // TODO: Why buffered?
+ dh.exitChannel = make(chan struct{})
dh.lockDevice = sync.RWMutex{}
dh.stopCollector = make(chan bool, 2) // TODO: Why buffered?
dh.stopHeartbeatCheck = make(chan bool, 2) // TODO: Why buffered?
@@ -229,13 +228,15 @@
logger.Debug(ctx, "device-agent-started")
}
-// stop stops the device dh. Not much to do for now
-func (dh *DeviceHandler) stop(ctx context.Context) {
+// Stop stops the device handler
+func (dh *DeviceHandler) Stop(ctx context.Context) {
dh.lockDevice.Lock()
defer dh.lockDevice.Unlock()
logger.Debug(ctx, "stopping-device-agent")
- dh.exitChannel <- 1
+ close(dh.exitChannel)
+ // Delete (which will stop also) all grpc connections to the child adapters
+ dh.deleteAdapterClients(ctx)
logger.Debug(ctx, "device-agent-stopped")
}
@@ -3219,11 +3220,13 @@
if dh.childAdapterClients[endpoint], err = vgrpc.NewClient(
dh.cfg.AdapterEndpoint,
endpoint,
- dh.onuAdapterRestarted); err != nil {
+ "onu_inter_adapter_service.OnuInterAdapterService",
+ dh.onuInterAdapterRestarted,
+ vgrpc.ClientContextData(dh.device.Id)); err != nil {
logger.Errorw(ctx, "grpc-client-not-created", log.Fields{"error": err, "endpoint": endpoint})
return err
}
- go dh.childAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), dh.setAndTestOnuInterAdapterServiceHandler)
+ go dh.childAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), dh.getOnuInterAdapterServiceClientHandler)
// Wait until we have a connection to the child adapter.
// Unlimited retries or until context expires
@@ -3281,30 +3284,18 @@
}
}
-// TODO: Any action the adapter needs to do following a onu adapter restart?
-func (dh *DeviceHandler) onuAdapterRestarted(ctx context.Context, endPoint string) error {
- logger.Warnw(ctx, "onu-adapter-reconnected", log.Fields{"endpoint": endPoint})
+// TODO: Any action the adapter needs to do following a onu adapter inter adapter service restart?
+func (dh *DeviceHandler) onuInterAdapterRestarted(ctx context.Context, endPoint string) error {
+ logger.Warnw(ctx, "onu-inter-adapter-service-reconnected", log.Fields{"endpoint": endPoint})
return nil
}
-// setAndTestOnuInterAdapterServiceHandler is used to test whether the remote gRPC service is up
-func (dh *DeviceHandler) setAndTestOnuInterAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
- // The onu adapter needs to know whether the olt adapter can connect to it. Since the olt adapter
- // has a grpc client connection per device handler (i.e. per olt device) to the onu adapter
- // then the onu adapter needs to know whether that specific client can connect to it. Because the
- // client uses a polling mechanism then not all grpc clients could be connected at the same time,
- // a maximum difference of 5 sec. We therefore add the parent device as additional contextual information
- // to this request.
- dh.lockDevice.RLock()
- if dh.device != nil {
- clientConn.ContextInfo = dh.device.Id
- }
- dh.lockDevice.RUnlock()
- svc := onu_inter_adapter_service.NewOnuInterAdapterServiceClient(conn)
- if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
+// getOnuInterAdapterServiceClientHandler is used to setup the remote gRPC service
+func (dh *DeviceHandler) getOnuInterAdapterServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+ if conn == nil {
return nil
}
- return svc
+ return onu_inter_adapter_service.NewOnuInterAdapterServiceClient(conn)
}
func (dh *DeviceHandler) setDeviceDeletionInProgressFlag(flag bool) {