VOL-4697: Fixes for rolling update case

Change-Id: I4c529ed8ec90013be0dd953ba4b2bf5708872e63
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index c1148b5..b48e603 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -383,7 +383,8 @@
 // adapterRestarted is invoked whenever an adapter is restarted
 func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
 	logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapter-id": adapter.Id, "vendor": adapter.Vendor,
-		"current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas, "restarted-endpoint": adapter.Endpoint})
+		"current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas,
+		"restarted-endpoint": adapter.Endpoint, "current-version": adapter.Version})
 
 	numberOfDevicesToReconcile := 0
 	dMgr.deviceAgents.Range(func(key, value interface{}) bool {
@@ -856,6 +857,17 @@
 func (dMgr *Manager) adapterRestartedHandler(ctx context.Context, endpoint string) error {
 	// Get the adapter corresponding to that endpoint
 	if a, _ := dMgr.adapterMgr.GetAdapterWithEndpoint(ctx, endpoint); a != nil {
+		if rollingUpdate, _ := dMgr.adapterMgr.GetRollingUpdate(ctx, endpoint); rollingUpdate {
+			dMgr.adapterMgr.RegisterOnRxStreamCloseChMap(ctx, endpoint)
+			// Blocking call. wait for the old adapters rx stream to close.
+			// That is a signal that the old adapter is completely down
+			dMgr.adapterMgr.WaitOnRxStreamCloseCh(ctx, endpoint)
+			dMgr.adapterMgr.DeleteRollingUpdate(ctx, endpoint)
+			// In case of rolling update we need to start the connection towards the new adapter instance now
+			if err := dMgr.adapterMgr.StartAdapterWithEndPoint(ctx, endpoint); err != nil {
+				return err
+			}
+		}
 		return dMgr.adapterRestarted(ctx, a)
 	}
 	logger.Errorw(ctx, "restarted-adapter-not-found", log.Fields{"endpoint": endpoint})
diff --git a/rw_core/core/device/manager_sbi.go b/rw_core/core/device/manager_sbi.go
index 7a17d6d..c0e1b6b 100644
--- a/rw_core/core/device/manager_sbi.go
+++ b/rw_core/core/device/manager_sbi.go
@@ -548,12 +548,14 @@
 		tempClient, err = stream.Recv()
 		if err != nil {
 			logger.Warnw(ctx, "received-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+			dMgr.adapterMgr.SignalOnRxStreamCloseCh(ctx, remoteClient.Endpoint)
 			break loop
 		}
 		// Send a response back
 		err = stream.Send(&health.HealthStatus{State: health.HealthStatus_HEALTHY})
 		if err != nil {
 			logger.Warnw(ctx, "sending-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+			dMgr.adapterMgr.SignalOnRxStreamCloseCh(ctx, remoteClient.Endpoint)
 			break loop
 		}