VOL-4697: Fixes for rolling update case
Change-Id: I4c529ed8ec90013be0dd953ba4b2bf5708872e63
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index c1148b5..b48e603 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -383,7 +383,8 @@
// adapterRestarted is invoked whenever an adapter is restarted
func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapter-id": adapter.Id, "vendor": adapter.Vendor,
- "current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas, "restarted-endpoint": adapter.Endpoint})
+ "current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas,
+ "restarted-endpoint": adapter.Endpoint, "current-version": adapter.Version})
numberOfDevicesToReconcile := 0
dMgr.deviceAgents.Range(func(key, value interface{}) bool {
@@ -856,6 +857,17 @@
func (dMgr *Manager) adapterRestartedHandler(ctx context.Context, endpoint string) error {
// Get the adapter corresponding to that endpoint
if a, _ := dMgr.adapterMgr.GetAdapterWithEndpoint(ctx, endpoint); a != nil {
+ if rollingUpdate, _ := dMgr.adapterMgr.GetRollingUpdate(ctx, endpoint); rollingUpdate {
+ dMgr.adapterMgr.RegisterOnRxStreamCloseChMap(ctx, endpoint)
+ // Blocking call. wait for the old adapters rx stream to close.
+ // That is a signal that the old adapter is completely down
+ dMgr.adapterMgr.WaitOnRxStreamCloseCh(ctx, endpoint)
+ dMgr.adapterMgr.DeleteRollingUpdate(ctx, endpoint)
+ // In case of rolling update we need to start the connection towards the new adapter instance now
+ if err := dMgr.adapterMgr.StartAdapterWithEndPoint(ctx, endpoint); err != nil {
+ return err
+ }
+ }
return dMgr.adapterRestarted(ctx, a)
}
logger.Errorw(ctx, "restarted-adapter-not-found", log.Fields{"endpoint": endpoint})
diff --git a/rw_core/core/device/manager_sbi.go b/rw_core/core/device/manager_sbi.go
index 7a17d6d..c0e1b6b 100644
--- a/rw_core/core/device/manager_sbi.go
+++ b/rw_core/core/device/manager_sbi.go
@@ -548,12 +548,14 @@
tempClient, err = stream.Recv()
if err != nil {
logger.Warnw(ctx, "received-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+ dMgr.adapterMgr.SignalOnRxStreamCloseCh(ctx, remoteClient.Endpoint)
break loop
}
// Send a response back
err = stream.Send(&health.HealthStatus{State: health.HealthStatus_HEALTHY})
if err != nil {
logger.Warnw(ctx, "sending-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+ dMgr.adapterMgr.SignalOnRxStreamCloseCh(ctx, remoteClient.Endpoint)
break loop
}