[VOL-4412] ONU reconcile regression
This commit adds some more checks in the grpc client to ensure
no two monitoring process can run at a time.
Change-Id: Ifbcbda04305227abe0cdd077174f084ae6b8f19f
diff --git a/VERSION b/VERSION
index a50da18..4489f5a 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-7.0.3
+7.0.4
diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go
index de649d6..a3dec75 100644
--- a/pkg/grpc/client.go
+++ b/pkg/grpc/client.go
@@ -228,8 +228,8 @@
isGrpcMonitorKeyPresentInContext(ctx) {
c.stateLock.Lock()
if c.state == stateConnected {
- logger.Warnw(context.Background(), "sending-disconnect-event", log.Fields{"endpoint": c.apiEndPoint, "error": err})
c.state = stateDisconnected
+ logger.Warnw(context.Background(), "sending-disconnect-event", log.Fields{"endpoint": c.apiEndPoint, "error": err, "curr-state": stateConnected, "new-state": c.state})
c.events <- eventDisconnected
}
c.stateLock.Unlock()
@@ -290,6 +290,9 @@
c.activeCh = make(chan struct{}, 10)
c.activeChMutex.Unlock()
+ grpcMonitorCheckRunning := false
+ var grpcMonitorCheckRunningLock sync.RWMutex
+
// Interval to wait for no activity before probing the connection
timeout := c.monitorInterval
loop:
@@ -298,11 +301,14 @@
select {
case <-c.activeCh:
- logger.Debugw(ctx, "received-active-notification", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "endpoint-reachable", log.Fields{"endpoint": c.apiEndPoint})
// Reset timer
if !timeoutTimer.Stop() {
- <-timeoutTimer.C
+ select {
+ case <-timeoutTimer.C:
+ default:
+ }
}
case <-ctx.Done():
@@ -312,10 +318,21 @@
// Trigger an activity check if the state is connected. If the state is not connected then there is already
// a backoff retry mechanism in place to retry establishing connection.
c.stateLock.RLock()
- runCheck := c.state == stateConnected
+ grpcMonitorCheckRunningLock.RLock()
+ runCheck := (c.state == stateConnected) && !grpcMonitorCheckRunning
+ grpcMonitorCheckRunningLock.RUnlock()
c.stateLock.RUnlock()
if runCheck {
go func() {
+ grpcMonitorCheckRunningLock.Lock()
+ if grpcMonitorCheckRunning {
+ grpcMonitorCheckRunningLock.Unlock()
+ logger.Debugw(ctx, "connection-check-already-in-progress", log.Fields{"api-endpoint": c.apiEndPoint})
+ return
+ }
+ grpcMonitorCheckRunning = true
+ grpcMonitorCheckRunningLock.Unlock()
+
logger.Debugw(ctx, "connection-check-start", log.Fields{"api-endpoint": c.apiEndPoint})
subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
defer cancel()
@@ -326,6 +343,9 @@
response := handler(subCtx, c.connection)
logger.Debugw(ctx, "connection-check-response", log.Fields{"api-endpoint": c.apiEndPoint, "up": response != nil})
}
+ grpcMonitorCheckRunningLock.Lock()
+ grpcMonitorCheckRunning = false
+ grpcMonitorCheckRunningLock.Unlock()
}()
}
}
@@ -362,9 +382,8 @@
logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.apiEndPoint})
switch event {
case eventConnecting:
- logger.Debugw(ctx, "connection-start", log.Fields{"endpoint": c.apiEndPoint, "attempts": attempt})
-
c.stateLock.Lock()
+ logger.Debugw(ctx, "connection-start", log.Fields{"endpoint": c.apiEndPoint, "attempts": attempt, "curr-state": c.state})
if c.state == stateConnected {
c.state = stateDisconnected
}
@@ -393,9 +412,9 @@
c.stateLock.Unlock()
case eventConnected:
- logger.Debugw(ctx, "endpoint-connected", log.Fields{"endpoint": c.apiEndPoint})
attempt = 1
c.stateLock.Lock()
+ logger.Debugw(ctx, "endpoint-connected", log.Fields{"endpoint": c.apiEndPoint, "curr-state": c.state})
if c.state != stateConnected {
c.state = stateConnected
if initialConnection {
@@ -418,7 +437,9 @@
if p != nil {
p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusNotReady)
}
- logger.Debugw(ctx, "endpoint-disconnected", log.Fields{"endpoint": c.apiEndPoint, "status": c.state})
+ c.stateLock.RLock()
+ logger.Debugw(ctx, "endpoint-disconnected", log.Fields{"endpoint": c.apiEndPoint, "curr-state": c.state})
+ c.stateLock.RUnlock()
// Try to connect again
c.events <- eventConnecting