VOL-4697: Fixes for rolling update case
Change-Id: I4c529ed8ec90013be0dd953ba4b2bf5708872e63
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index e7c3d2b..0a2b632 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -95,6 +95,7 @@
if err != nil {
return nil, err
}
+
c, ok := client.(adapter_service.AdapterServiceClient)
if ok {
return c, nil
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index f5a6cac..9a1ff1c 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -53,6 +53,10 @@
lockAdapterEndPointsMap sync.RWMutex
liveProbeInterval time.Duration
coreEndpoint string
+ rollingUpdateMap map[string]bool
+ rollingUpdateLock sync.RWMutex
+ rxStreamCloseChMap map[string]chan bool
+ rxStreamCloseChLock sync.RWMutex
}
// SetAdapterRestartedCallback is used to set the callback that needs to be invoked on an adapter restart
@@ -68,14 +72,16 @@
liveProbeInterval time.Duration,
) *Manager {
return &Manager{
- adapterDbProxy: dbPath.Proxy("adapters"),
- deviceTypeDbProxy: dbPath.Proxy("device_types"),
- deviceTypes: make(map[string]*voltha.DeviceType),
- adapterAgents: make(map[string]*agent),
- adapterEndpoints: make(map[Endpoint]*agent),
- endpointMgr: NewEndpointManager(backend),
- liveProbeInterval: liveProbeInterval,
- coreEndpoint: coreEndpoint,
+ adapterDbProxy: dbPath.Proxy("adapters"),
+ deviceTypeDbProxy: dbPath.Proxy("device_types"),
+ deviceTypes: make(map[string]*voltha.DeviceType),
+ adapterAgents: make(map[string]*agent),
+ adapterEndpoints: make(map[Endpoint]*agent),
+ endpointMgr: NewEndpointManager(backend),
+ liveProbeInterval: liveProbeInterval,
+ coreEndpoint: coreEndpoint,
+ rollingUpdateMap: make(map[string]bool),
+ rxStreamCloseChMap: make(map[string]chan bool),
}
}
@@ -196,6 +202,38 @@
return nil
}
+func (aMgr *Manager) updateAdapter(ctx context.Context, adapter *voltha.Adapter, saveToDb bool) error {
+ aMgr.lockAdapterAgentsMap.Lock()
+ defer aMgr.lockAdapterAgentsMap.Unlock()
+ logger.Debugw(ctx, "updating-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint,
+ "version": adapter.Version})
+ if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
+ logger.Errorw(ctx, "adapter-does-not-exist", log.Fields{"adapterName": adapter.Id})
+ return fmt.Errorf("does-not-exist")
+ }
+ if saveToDb {
+ // Update the adapter to the KV store
+ if err := aMgr.adapterDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, adapter); err != nil {
+ logger.Errorw(ctx, "failed-to-update-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas,
+ "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas,
+ "version": adapter.Version})
+ return err
+ }
+ logger.Debugw(ctx, "adapter-updated-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint,
+ "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas, "version": adapter.Version})
+ }
+ clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
+ // Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
+ // This handler just log the restart event. The actual action taken following an adapter restart
+ // will be done when an adapter re-registers itself.
+ aMgr.adapterAgents[adapter.Id] = newAdapterAgent(aMgr.coreEndpoint, clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
+ aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
+ return nil
+}
+
func (aMgr *Manager) addDeviceTypes(ctx context.Context, deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
if deviceTypes == nil {
return fmt.Errorf("no-device-type")
@@ -304,16 +342,27 @@
}
if adpt, _ := aMgr.getAdapter(ctx, adapter.Id); adpt != nil {
- // Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
- logger.Warnw(ctx, "adapter-restarted", log.Fields{"adapter": adpt.Id, "endpoint": adpt.Endpoint})
-
- // First reset the adapter connection
agt, err := aMgr.getAgent(ctx, adpt.Id)
if err != nil {
logger.Errorw(ctx, "no-adapter-agent", log.Fields{"error": err})
return nil, err
}
- agt.resetConnection(ctx)
+ if adapter.Version != adpt.Version {
+ // Rolling update scenario - could be downgrade or upgrade
+ logger.Infow(ctx, "rolling-update",
+ log.Fields{"adapter": adpt.Id, "endpoint": adpt.Endpoint, "old-version": adpt.Version, "new-version": adapter.Version})
+ // Stop the gRPC connection to the old adapter
+ agt.stop(ctx)
+ if err = aMgr.updateAdapter(ctx, adapter, true); err != nil {
+ return nil, err
+ }
+ aMgr.SetRollingUpdate(ctx, adapter.Endpoint, true)
+ } else {
+ // Adapter registered and version is the same. The adapter may have restarted.
+ // Trigger the reconcile process for that adapter
+ logger.Warnw(ctx, "adapter-restarted", log.Fields{"adapter": adpt.Id, "endpoint": adpt.Endpoint})
+ agt.resetConnection(ctx)
+ }
go func() {
err := aMgr.onAdapterRestart(log.WithSpanFromContext(context.Background(), ctx), adpt.Endpoint)
@@ -355,6 +404,23 @@
return &empty.Empty{}, nil
}
+func (aMgr *Manager) StartAdapterWithEndPoint(ctx context.Context, endpoint string) error {
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ subCtx := log.WithSpanFromContext(context.Background(), ctx)
+ for _, adapterAgent := range aMgr.adapterAgents {
+ if adapterAgent.adapter.Endpoint == endpoint {
+ if err := adapterAgent.start(subCtx); err != nil {
+ logger.Errorw(subCtx, "failed-to-start-adapter", log.Fields{"error": err})
+ return err
+ }
+ return nil
+ }
+ }
+ logger.Errorw(ctx, "adapter-agent-not-found-for-endpoint", log.Fields{"endpoint": endpoint})
+ return fmt.Errorf("adapter-agent-not-found-for-endpoint-%s", endpoint)
+}
+
func (aMgr *Manager) GetAdapterTypeByVendorID(vendorID string) (string, error) {
aMgr.lockDeviceTypesMap.RLock()
defer aMgr.lockDeviceTypesMap.RUnlock()
@@ -421,6 +487,77 @@
return result, nil
}
+func (aMgr *Manager) GetRollingUpdate(ctx context.Context, endpoint string) (bool, bool) {
+ aMgr.rollingUpdateLock.RLock()
+ defer aMgr.rollingUpdateLock.RUnlock()
+ val, ok := aMgr.rollingUpdateMap[endpoint]
+ return val, ok
+}
+
+func (aMgr *Manager) SetRollingUpdate(ctx context.Context, endpoint string, status bool) {
+ aMgr.rollingUpdateLock.Lock()
+ defer aMgr.rollingUpdateLock.Unlock()
+ if res, ok := aMgr.rollingUpdateMap[endpoint]; ok {
+ logger.Warnw(ctx, "possible duplicate rolling update - overwriting", log.Fields{"old-status": res, "endpoint": endpoint})
+ }
+ aMgr.rollingUpdateMap[endpoint] = status
+}
+
+func (aMgr *Manager) DeleteRollingUpdate(ctx context.Context, endpoint string) {
+ aMgr.rollingUpdateLock.Lock()
+ defer aMgr.rollingUpdateLock.Unlock()
+ delete(aMgr.rollingUpdateMap, endpoint)
+}
+
+func (aMgr *Manager) RegisterOnRxStreamCloseChMap(ctx context.Context, endpoint string) {
+ aMgr.rxStreamCloseChLock.Lock()
+ defer aMgr.rxStreamCloseChLock.Unlock()
+ if _, ok := aMgr.rxStreamCloseChMap[endpoint]; ok {
+ logger.Warnw(ctx, "duplicate entry on rxStreamCloseChMap - overwriting", log.Fields{"endpoint": endpoint})
+ // First close the old channel
+ close(aMgr.rxStreamCloseChMap[endpoint])
+ }
+ aMgr.rxStreamCloseChMap[endpoint] = make(chan bool, 1)
+}
+
+func (aMgr *Manager) SignalOnRxStreamCloseCh(ctx context.Context, endpoint string) {
+ var closeCh chan bool
+ ok := false
+ aMgr.rxStreamCloseChLock.RLock()
+ if closeCh, ok = aMgr.rxStreamCloseChMap[endpoint]; !ok {
+ logger.Infow(ctx, "no entry on rxStreamCloseChMap", log.Fields{"endpoint": endpoint})
+ aMgr.rxStreamCloseChLock.RUnlock()
+ return
+ }
+ aMgr.rxStreamCloseChLock.RUnlock()
+
+ // close the rx channel
+ closeCh <- true
+
+ aMgr.rxStreamCloseChLock.Lock()
+ defer aMgr.rxStreamCloseChLock.Unlock()
+ delete(aMgr.rxStreamCloseChMap, endpoint)
+}
+
+func (aMgr *Manager) WaitOnRxStreamCloseCh(ctx context.Context, endpoint string) {
+ var closeCh chan bool
+ ok := false
+ aMgr.rxStreamCloseChLock.RLock()
+ if closeCh, ok = aMgr.rxStreamCloseChMap[endpoint]; !ok {
+ logger.Warnw(ctx, "no entry on rxStreamCloseChMap", log.Fields{"endpoint": endpoint})
+ aMgr.rxStreamCloseChLock.RUnlock()
+ return
+ }
+ aMgr.rxStreamCloseChLock.RUnlock()
+
+ select {
+ case <-closeCh:
+ logger.Infow(ctx, "rx stream closed for endpoint", log.Fields{"endpoint": endpoint})
+ case <-time.After(60 * time.Second):
+ logger.Warnw(ctx, "timeout waiting for rx stream close", log.Fields{"endpoint": endpoint})
+ }
+}
+
func (aMgr *Manager) getAgent(ctx context.Context, adapterID string) (*agent, error) {
aMgr.lockAdapterAgentsMap.RLock()
defer aMgr.lockAdapterAgentsMap.RUnlock()
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index c1148b5..b48e603 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -383,7 +383,8 @@
// adapterRestarted is invoked whenever an adapter is restarted
func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapter-id": adapter.Id, "vendor": adapter.Vendor,
- "current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas, "restarted-endpoint": adapter.Endpoint})
+ "current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas,
+ "restarted-endpoint": adapter.Endpoint, "current-version": adapter.Version})
numberOfDevicesToReconcile := 0
dMgr.deviceAgents.Range(func(key, value interface{}) bool {
@@ -856,6 +857,17 @@
func (dMgr *Manager) adapterRestartedHandler(ctx context.Context, endpoint string) error {
// Get the adapter corresponding to that endpoint
if a, _ := dMgr.adapterMgr.GetAdapterWithEndpoint(ctx, endpoint); a != nil {
+ if rollingUpdate, _ := dMgr.adapterMgr.GetRollingUpdate(ctx, endpoint); rollingUpdate {
+ dMgr.adapterMgr.RegisterOnRxStreamCloseChMap(ctx, endpoint)
+ // Blocking call. wait for the old adapters rx stream to close.
+ // That is a signal that the old adapter is completely down
+ dMgr.adapterMgr.WaitOnRxStreamCloseCh(ctx, endpoint)
+ dMgr.adapterMgr.DeleteRollingUpdate(ctx, endpoint)
+ // In case of rolling update we need to start the connection towards the new adapter instance now
+ if err := dMgr.adapterMgr.StartAdapterWithEndPoint(ctx, endpoint); err != nil {
+ return err
+ }
+ }
return dMgr.adapterRestarted(ctx, a)
}
logger.Errorw(ctx, "restarted-adapter-not-found", log.Fields{"endpoint": endpoint})
diff --git a/rw_core/core/device/manager_sbi.go b/rw_core/core/device/manager_sbi.go
index 7a17d6d..c0e1b6b 100644
--- a/rw_core/core/device/manager_sbi.go
+++ b/rw_core/core/device/manager_sbi.go
@@ -548,12 +548,14 @@
tempClient, err = stream.Recv()
if err != nil {
logger.Warnw(ctx, "received-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+ dMgr.adapterMgr.SignalOnRxStreamCloseCh(ctx, remoteClient.Endpoint)
break loop
}
// Send a response back
err = stream.Send(&health.HealthStatus{State: health.HealthStatus_HEALTHY})
if err != nil {
logger.Warnw(ctx, "sending-stream-error", log.Fields{"remote-client": remoteClient, "error": err})
+ dMgr.adapterMgr.SignalOnRxStreamCloseCh(ctx, remoteClient.Endpoint)
break loop
}
diff --git a/rw_core/main.go b/rw_core/main.go
index 8a9675d..715285d 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -140,5 +140,6 @@
core.Stop(shutdownCtx)
elapsed := time.Since(start)
+
logger.Infow(ctx, "rw-core-run-time", log.Fields{"core": instanceID, "time": elapsed / time.Second})
}