[VOL-4442] grpc streaming connection monitoring

Change-Id: I6b26a29c74be8833e7262eb59d266e6cce66f0c3
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 83282e3..c6ce503 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -42,6 +42,7 @@
 	Shutdown    context.CancelFunc
 	Stopped     chan struct{}
 	KafkaClient kafka.Client
+	adapterMgr  *adapter.Manager
 }
 
 const (
@@ -90,13 +91,6 @@
 func (core *Core) Start(ctx context.Context, id string, cf *config.RWCoreFlags) {
 	logger.Info(ctx, "starting-core-services", log.Fields{"coreId": id})
 
-	// deferred functions are used to run cleanup
-	// failing partway will stop anything that's been started
-	defer close(core.Stopped)
-	defer core.Shutdown()
-
-	logger.Info(ctx, "starting-rw-core-components")
-
 	// setup kv client
 	logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
 	kvClient, err := newKVClient(ctx, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
@@ -144,6 +138,10 @@
 	adapterMgr := adapter.NewAdapterManager(cf.GrpcSBIAddress, dbPath, id, backend, cf.LiveProbeInterval)
 	adapterMgr.Start(ctx, adapterService)
 
+	// We do not do a defer adapterMgr.Stop() here as we want this to be ran as soon as
+	// the core is stopped
+	core.adapterMgr = adapterMgr
+
 	// create the core of the system, the device managers
 	deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, cf, id, eventProxy)
 
@@ -153,6 +151,7 @@
 	if err != nil {
 		logger.Fatalw(ctx, "failure-starting-device-manager", log.Fields{"error": err})
 	}
+	defer deviceMgr.Stop(ctx, deviceService)
 
 	// Start the logical device manager to load the logical devices.
 	logicalDeviceMgr.Start(ctx, logicalDeviceService)
@@ -183,9 +182,11 @@
 }
 
 // Stop brings down core services
-func (core *Core) Stop() {
+func (core *Core) Stop(ctx context.Context) {
+	// Close all the grpc clients connections to the adapters first
+	core.adapterMgr.Stop(ctx)
 	core.Shutdown()
-	<-core.Stopped
+	close(core.Stopped)
 }
 
 // startGrpcSbiService creates the grpc core service handlers, registers it to the grpc server and starts the server