[VOL-4514] Addressing device reconciliation failure
See comments on https://jira.opencord.org/browse/VOL-4514
This change is dependent on the related proto changes to be
merged first. Until then jenkins will fail.
Change-Id: Idc0219135388c6b1b6bbeb0ce419193e777c2ceb
diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go
index add2b28..9b66d85 100644
--- a/pkg/grpc/client.go
+++ b/pkg/grpc/client.go
@@ -27,6 +27,7 @@
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ "github.com/opencord/voltha-protos/v5/go/common"
"github.com/opencord/voltha-protos/v5/go/core_service"
"github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
@@ -36,7 +37,7 @@
type event byte
type state byte
-type SetAndTestServiceHandler func(context.Context, *grpc.ClientConn) interface{}
+type SetAndTestServiceHandler func(context.Context, *grpc.ClientConn, *common.Connection) interface{}
type RestartedHandler func(ctx context.Context, endPoint string) error
type contextKey string
@@ -83,7 +84,8 @@
)
type Client struct {
- apiEndPoint string
+ clientEndpoint string
+ serverEndPoint string
connection *grpc.ClientConn
connectionLock sync.RWMutex
stateLock sync.RWMutex
@@ -94,25 +96,17 @@
backoffInitialInterval time.Duration
backoffMaxInterval time.Duration
backoffMaxElapsedTime time.Duration
- activityCheck bool
monitorInterval time.Duration
- activeCh chan struct{}
- activeChMutex sync.RWMutex
done bool
livenessCallback func(timestamp time.Time)
}
type ClientOption func(*Client)
-func ActivityCheck(enable bool) ClientOption {
- return func(args *Client) {
- args.activityCheck = enable
- }
-}
-
-func NewClient(endpoint string, onRestart RestartedHandler, opts ...ClientOption) (*Client, error) {
+func NewClient(clientEndpoint, serverEndpoint string, onRestart RestartedHandler, opts ...ClientOption) (*Client, error) {
c := &Client{
- apiEndPoint: endpoint,
+ clientEndpoint: clientEndpoint,
+ serverEndPoint: serverEndpoint,
onRestart: onRestart,
events: make(chan event, 1),
state: stateDisconnected,
@@ -156,7 +150,7 @@
c.connectionLock.RLock()
defer c.connectionLock.RUnlock()
if c.service == nil {
- return nil, fmt.Errorf("no connection to %s", c.apiEndPoint)
+ return nil, fmt.Errorf("no connection to %s", c.serverEndPoint)
}
return c.service, nil
}
@@ -167,7 +161,7 @@
c.connectionLock.RLock()
defer c.connectionLock.RUnlock()
if c.service == nil {
- return nil, fmt.Errorf("no core connection to %s", c.apiEndPoint)
+ return nil, fmt.Errorf("no core connection to %s", c.serverEndPoint)
}
client, ok := c.service.(core_service.CoreServiceClient)
if ok {
@@ -182,7 +176,7 @@
c.connectionLock.RLock()
defer c.connectionLock.RUnlock()
if c.service == nil {
- return nil, fmt.Errorf("no child adapter connection to %s", c.apiEndPoint)
+ return nil, fmt.Errorf("no child adapter connection to %s", c.serverEndPoint)
}
client, ok := c.service.(onu_inter_adapter_service.OnuInterAdapterServiceClient)
if ok {
@@ -197,7 +191,7 @@
c.connectionLock.RLock()
defer c.connectionLock.RUnlock()
if c.service == nil {
- return nil, fmt.Errorf("no parent adapter connection to %s", c.apiEndPoint)
+ return nil, fmt.Errorf("no parent adapter connection to %s", c.serverEndPoint)
}
client, ok := c.service.(olt_inter_adapter_service.OltInterAdapterServiceClient)
if ok {
@@ -207,7 +201,7 @@
}
func (c *Client) Reset(ctx context.Context) {
- logger.Debugw(ctx, "resetting-client-connection", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "resetting-client-connection", log.Fields{"endpoint": c.serverEndPoint})
c.stateLock.Lock()
defer c.stateLock.Unlock()
if c.state == stateConnected {
@@ -222,7 +216,7 @@
err := invoker(ctx, method, req, reply, cc, opts...)
// On connection failure, start the reconnect process depending on the error response
if err != nil {
- logger.Errorw(ctx, "received-error", log.Fields{"error": err, "context": ctx, "endpoint": c.apiEndPoint})
+ logger.Errorw(ctx, "received-error", log.Fields{"error": err, "context": ctx, "endpoint": c.serverEndPoint})
if strings.Contains(err.Error(), connectionErrorSubString) ||
strings.Contains(err.Error(), connectionError) ||
strings.Contains(err.Error(), connectionSystemNotReady) ||
@@ -230,12 +224,12 @@
c.stateLock.Lock()
if c.state == stateConnected {
c.state = stateDisconnected
- logger.Warnw(context.Background(), "sending-disconnect-event", log.Fields{"endpoint": c.apiEndPoint, "error": err, "curr-state": stateConnected, "new-state": c.state})
+ logger.Warnw(context.Background(), "sending-disconnect-event", log.Fields{"endpoint": c.serverEndPoint, "error": err, "curr-state": stateConnected, "new-state": c.state})
c.events <- eventDisconnected
}
c.stateLock.Unlock()
} else if strings.Contains(err.Error(), connectionClosedSubstring) {
- logger.Errorw(context.Background(), "invalid-client-connection-closed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+ logger.Errorw(context.Background(), "invalid-client-connection-closed", log.Fields{"endpoint": c.serverEndPoint, "error": err})
}
return err
}
@@ -244,24 +238,17 @@
return nil
}
-// updateActivity pushes an activity indication on the channel so that the monitoring routine does not validate
-// the gRPC connection when the connection is being used. Note that this update is done both when the connection
-// is alive or a connection error is returned. A separate routine takes care of doing the re-connect.
+// updateActivity updates the liveness channel
func (c *Client) updateActivity(ctx context.Context) {
- c.activeChMutex.RLock()
- defer c.activeChMutex.RUnlock()
- if c.activeCh != nil {
- logger.Debugw(ctx, "update-activity", log.Fields{"api-endpoint": c.apiEndPoint})
- c.activeCh <- struct{}{}
+ logger.Debugw(ctx, "update-activity", log.Fields{"api-endpoint": c.serverEndPoint})
- // Update liveness only in connected state
- if c.livenessCallback != nil {
- c.stateLock.RLock()
- if c.state == stateConnected {
- c.livenessCallback(time.Now())
- }
- c.stateLock.RUnlock()
+ // Update liveness only in connected state
+ if c.livenessCallback != nil {
+ c.stateLock.RLock()
+ if c.state == stateConnected {
+ c.livenessCallback(time.Now())
}
+ c.stateLock.RUnlock()
}
}
@@ -282,14 +269,7 @@
// timeout, it will send a default API request on that connection. If the connection is good then nothing
// happens. If it's bad this will trigger reconnection attempts.
func (c *Client) monitorActivity(ctx context.Context, handler SetAndTestServiceHandler) {
- logger.Infow(ctx, "start-activity-monitor", log.Fields{"endpoint": c.apiEndPoint})
-
- // Create an activity monitor channel. Unbuffered channel works well. However, we use a buffered
- // channel here as a safeguard of having the grpc interceptor publishing too many events that can be
- // consumed by this monitoring thread
- c.activeChMutex.Lock()
- c.activeCh = make(chan struct{}, 10)
- c.activeChMutex.Unlock()
+ logger.Infow(ctx, "start-activity-monitor", log.Fields{"endpoint": c.serverEndPoint})
grpcMonitorCheckRunning := false
var grpcMonitorCheckRunningLock sync.RWMutex
@@ -301,18 +281,14 @@
timeoutTimer := time.NewTimer(timeout)
select {
- case <-c.activeCh:
- logger.Debugw(ctx, "endpoint-reachable", log.Fields{"endpoint": c.apiEndPoint})
-
- // Reset timer
+ case <-ctx.Done():
+ // Stop and drain timer
if !timeoutTimer.Stop() {
select {
case <-timeoutTimer.C:
default:
}
}
-
- case <-ctx.Done():
break loop
case <-timeoutTimer.C:
@@ -328,21 +304,21 @@
grpcMonitorCheckRunningLock.Lock()
if grpcMonitorCheckRunning {
grpcMonitorCheckRunningLock.Unlock()
- logger.Debugw(ctx, "connection-check-already-in-progress", log.Fields{"api-endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "connection-check-already-in-progress", log.Fields{"api-endpoint": c.serverEndPoint})
return
}
grpcMonitorCheckRunning = true
grpcMonitorCheckRunningLock.Unlock()
- logger.Debugw(ctx, "connection-check-start", log.Fields{"api-endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "connection-check-start", log.Fields{"api-endpoint": c.serverEndPoint})
subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
defer cancel()
subCtx = WithGrpcMonitorContext(subCtx, "grpc-monitor")
c.connectionLock.RLock()
defer c.connectionLock.RUnlock()
if c.connection != nil {
- response := handler(subCtx, c.connection)
- logger.Debugw(ctx, "connection-check-response", log.Fields{"api-endpoint": c.apiEndPoint, "up": response != nil})
+ response := handler(subCtx, c.connection, &common.Connection{Endpoint: c.clientEndpoint, KeepAliveInterval: int64(c.monitorInterval)})
+ logger.Debugw(ctx, "connection-check-response", log.Fields{"api-endpoint": c.serverEndPoint, "up": response != nil})
}
grpcMonitorCheckRunningLock.Lock()
grpcMonitorCheckRunning = false
@@ -351,23 +327,21 @@
}
}
}
- logger.Infow(ctx, "activity-monitor-stopping", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Infow(ctx, "activity-monitor-stopping", log.Fields{"endpoint": c.serverEndPoint})
}
// Start kicks off the adapter agent by trying to connect to the adapter
func (c *Client) Start(ctx context.Context, handler SetAndTestServiceHandler) {
- logger.Debugw(ctx, "Starting GRPC - Client", log.Fields{"api-endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "Starting GRPC - Client", log.Fields{"api-endpoint": c.serverEndPoint})
// If the context contains a k8s probe then register services
p := probe.GetProbeFromContext(ctx)
if p != nil {
- p.RegisterService(ctx, c.apiEndPoint)
+ p.RegisterService(ctx, c.serverEndPoint)
}
- // Enable activity check, if required
- if c.activityCheck {
- go c.monitorActivity(ctx, handler)
- }
+ // Enable activity check
+ go c.monitorActivity(ctx, handler)
initialConnection := true
c.events <- eventConnecting
@@ -377,22 +351,22 @@
for {
select {
case <-ctx.Done():
- logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.serverEndPoint})
break loop
case event := <-c.events:
- logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.serverEndPoint})
c.connectionLock.RLock()
// On a client stopped, just allow the stop event to go through
if c.done && event != eventStopped {
c.connectionLock.RUnlock()
- logger.Debugw(ctx, "ignoring-event-on-client-stop", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "ignoring-event-on-client-stop", log.Fields{"event": event, "endpoint": c.serverEndPoint})
continue
}
c.connectionLock.RUnlock()
switch event {
case eventConnecting:
c.stateLock.Lock()
- logger.Debugw(ctx, "connection-start", log.Fields{"endpoint": c.apiEndPoint, "attempts": attempt, "curr-state": c.state})
+ logger.Debugw(ctx, "connection-start", log.Fields{"endpoint": c.serverEndPoint, "attempts": attempt, "curr-state": c.state})
if c.state == stateConnected {
c.state = stateDisconnected
}
@@ -403,12 +377,12 @@
c.stateLock.Lock()
c.state = stateDisconnected
c.stateLock.Unlock()
- logger.Errorw(ctx, "connection-failed", log.Fields{"endpoint": c.apiEndPoint, "attempt": attempt, "error": err})
+ logger.Errorw(ctx, "connection-failed", log.Fields{"endpoint": c.serverEndPoint, "attempt": attempt, "error": err})
// Retry connection after a delay
if err = backoff.Backoff(ctx); err != nil {
// Context has closed or reached maximum elapsed time, if set
- logger.Errorw(ctx, "retry-aborted", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+ logger.Errorw(ctx, "retry-aborted", log.Fields{"endpoint": c.serverEndPoint, "error": err})
return
}
attempt += 1
@@ -427,19 +401,19 @@
case eventConnected:
attempt = 1
c.stateLock.Lock()
- logger.Debugw(ctx, "endpoint-connected", log.Fields{"endpoint": c.apiEndPoint, "curr-state": c.state})
+ logger.Debugw(ctx, "endpoint-connected", log.Fields{"endpoint": c.serverEndPoint, "curr-state": c.state})
if c.state != stateConnected {
c.state = stateConnected
if initialConnection {
- logger.Debugw(ctx, "initial-endpoint-connection", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "initial-endpoint-connection", log.Fields{"endpoint": c.serverEndPoint})
initialConnection = false
} else {
- logger.Debugw(ctx, "endpoint-reconnection", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Debugw(ctx, "endpoint-reconnection", log.Fields{"endpoint": c.serverEndPoint})
// Trigger any callback on a restart
go func() {
- err := c.onRestart(log.WithSpanFromContext(context.Background(), ctx), c.apiEndPoint)
+ err := c.onRestart(log.WithSpanFromContext(context.Background(), ctx), c.serverEndPoint)
if err != nil {
- logger.Errorw(ctx, "unable-to-restart-endpoint", log.Fields{"error": err, "endpoint": c.apiEndPoint})
+ logger.Errorw(ctx, "unable-to-restart-endpoint", log.Fields{"error": err, "endpoint": c.serverEndPoint})
}
}()
}
@@ -448,36 +422,36 @@
case eventDisconnected:
if p != nil {
- p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusNotReady)
+ p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusNotReady)
}
c.stateLock.RLock()
- logger.Debugw(ctx, "endpoint-disconnected", log.Fields{"endpoint": c.apiEndPoint, "curr-state": c.state})
+ logger.Debugw(ctx, "endpoint-disconnected", log.Fields{"endpoint": c.serverEndPoint, "curr-state": c.state})
c.stateLock.RUnlock()
// Try to connect again
c.events <- eventConnecting
case eventStopped:
- logger.Debugw(ctx, "endPoint-stopped", log.Fields{"adapter": c.apiEndPoint})
+ logger.Debugw(ctx, "endPoint-stopped", log.Fields{"adapter": c.serverEndPoint})
go func() {
if err := c.closeConnection(ctx, p); err != nil {
- logger.Errorw(ctx, "endpoint-closing-connection-failed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+ logger.Errorw(ctx, "endpoint-closing-connection-failed", log.Fields{"endpoint": c.serverEndPoint, "error": err})
}
}()
break loop
case eventError:
- logger.Errorw(ctx, "endpoint-error-event", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Errorw(ctx, "endpoint-error-event", log.Fields{"endpoint": c.serverEndPoint})
default:
- logger.Errorw(ctx, "endpoint-unknown-event", log.Fields{"endpoint": c.apiEndPoint, "error": event})
+ logger.Errorw(ctx, "endpoint-unknown-event", log.Fields{"endpoint": c.serverEndPoint, "error": event})
}
}
}
- logger.Infow(ctx, "endpoint-stopped", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Infow(ctx, "endpoint-stopped", log.Fields{"endpoint": c.serverEndPoint})
}
func (c *Client) connectToEndpoint(ctx context.Context, handler SetAndTestServiceHandler, p *probe.Probe) error {
if p != nil {
- p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusPreparing)
+ p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusPreparing)
}
c.connectionLock.Lock()
@@ -494,7 +468,7 @@
// 1. automatically inject
// 2. publish Open Tracing Spans by this GRPC Client
// 3. detect connection failure on client calls such that the reconnection process can begin
- conn, err := grpc.Dial(c.apiEndPoint,
+ conn, err := grpc.Dial(c.serverEndPoint,
grpc.WithInsecure(),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
@@ -514,33 +488,33 @@
if err == nil {
subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
defer cancel()
- svc := handler(subCtx, conn)
+ svc := handler(subCtx, conn, &common.Connection{Endpoint: c.clientEndpoint, KeepAliveInterval: int64(c.monitorInterval)})
if svc != nil {
c.connection = conn
c.service = svc
if p != nil {
- p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusRunning)
+ p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusRunning)
}
- logger.Infow(ctx, "connected-to-endpoint", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Infow(ctx, "connected-to-endpoint", log.Fields{"endpoint": c.serverEndPoint})
c.events <- eventConnected
return nil
}
}
logger.Warnw(ctx, "Failed to connect to endpoint",
log.Fields{
- "endpoint": c.apiEndPoint,
+ "endpoint": c.serverEndPoint,
"error": err,
})
if p != nil {
- p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusFailed)
+ p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusFailed)
}
- return fmt.Errorf("no connection to endpoint %s", c.apiEndPoint)
+ return fmt.Errorf("no connection to endpoint %s", c.serverEndPoint)
}
func (c *Client) closeConnection(ctx context.Context, p *probe.Probe) error {
if p != nil {
- p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusStopped)
+ p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusStopped)
}
c.connectionLock.Lock()
@@ -563,7 +537,7 @@
c.events <- eventStopped
close(c.events)
}
- logger.Infow(ctx, "client-stopped", log.Fields{"endpoint": c.apiEndPoint})
+ logger.Infow(ctx, "client-stopped", log.Fields{"endpoint": c.serverEndPoint})
}
// SetService is used for testing only
diff --git a/pkg/grpc/client_test.go b/pkg/grpc/client_test.go
index 0b880f1..440c137 100644
--- a/pkg/grpc/client_test.go
+++ b/pkg/grpc/client_test.go
@@ -24,7 +24,6 @@
"testing"
"time"
- "github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-lib-go/v7/pkg/probe"
"github.com/opencord/voltha-protos/v5/go/common"
@@ -131,32 +130,33 @@
return nil
}
- tc.client, err = NewClient(apiEndpoint,
- handler,
- ActivityCheck(true))
+ tc.client, err = NewClient(
+ "test-endpoint",
+ apiEndpoint,
+ handler)
if err != nil {
return nil
}
return tc
}
-func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
if conn == nil {
return nil
}
svc := core_service.NewCoreServiceClient(conn)
- if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
+ if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
return nil
}
return svc
}
-func idleConnectionTest(ctx context.Context, conn *grpc.ClientConn) interface{} {
+func idleConnectionTest(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
if conn == nil {
return nil
}
svc := core_service.NewCoreServiceClient(conn)
- if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
+ if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
return nil
}
testForNoActivityCh <- time.Now()
@@ -447,9 +447,9 @@
assert.Nil(t, err)
// Create a new client
tc.client, err = NewClient(
+ "test-ednpoint",
apiEndpoint,
- serverRestarted,
- ActivityCheck(true))
+ serverRestarted)
assert.Nil(t, err)
probeCtx := context.WithValue(ctx, probe.ProbeContextKey, tc.probe)
go tc.client.Start(probeCtx, idleConnectionTest)
diff --git a/pkg/grpc/mock_core_service.go b/pkg/grpc/mock_core_service.go
index 015f667..22becce 100644
--- a/pkg/grpc/mock_core_service.go
+++ b/pkg/grpc/mock_core_service.go
@@ -131,6 +131,6 @@
return &empty.Empty{}, nil
}
-func (handler *MockCoreServiceHandler) GetHealthStatus(ctx context.Context, empty *empty.Empty) (*health.HealthStatus, error) {
+func (handler *MockCoreServiceHandler) GetHealthStatus(ctx context.Context, conn *common.Connection) (*health.HealthStatus, error) {
return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
}