[VOL-4514] Addressing device reconciliation failure

See comments on https://jira.opencord.org/browse/VOL-4514

This change is dependent on the related proto and voltha lib go
changes to be merged first.  Until then jenkins will fail.

Change-Id: I8d99c3619d630677d402b9fb4b4f0bc22dd9a9f0
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go
index add2b28..9b66d85 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go
@@ -27,6 +27,7 @@
 	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
 	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+	"github.com/opencord/voltha-protos/v5/go/common"
 	"github.com/opencord/voltha-protos/v5/go/core_service"
 	"github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
 	"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
@@ -36,7 +37,7 @@
 
 type event byte
 type state byte
-type SetAndTestServiceHandler func(context.Context, *grpc.ClientConn) interface{}
+type SetAndTestServiceHandler func(context.Context, *grpc.ClientConn, *common.Connection) interface{}
 type RestartedHandler func(ctx context.Context, endPoint string) error
 
 type contextKey string
@@ -83,7 +84,8 @@
 )
 
 type Client struct {
-	apiEndPoint            string
+	clientEndpoint         string
+	serverEndPoint         string
 	connection             *grpc.ClientConn
 	connectionLock         sync.RWMutex
 	stateLock              sync.RWMutex
@@ -94,25 +96,17 @@
 	backoffInitialInterval time.Duration
 	backoffMaxInterval     time.Duration
 	backoffMaxElapsedTime  time.Duration
-	activityCheck          bool
 	monitorInterval        time.Duration
-	activeCh               chan struct{}
-	activeChMutex          sync.RWMutex
 	done                   bool
 	livenessCallback       func(timestamp time.Time)
 }
 
 type ClientOption func(*Client)
 
-func ActivityCheck(enable bool) ClientOption {
-	return func(args *Client) {
-		args.activityCheck = enable
-	}
-}
-
-func NewClient(endpoint string, onRestart RestartedHandler, opts ...ClientOption) (*Client, error) {
+func NewClient(clientEndpoint, serverEndpoint string, onRestart RestartedHandler, opts ...ClientOption) (*Client, error) {
 	c := &Client{
-		apiEndPoint:            endpoint,
+		clientEndpoint:         clientEndpoint,
+		serverEndPoint:         serverEndpoint,
 		onRestart:              onRestart,
 		events:                 make(chan event, 1),
 		state:                  stateDisconnected,
@@ -156,7 +150,7 @@
 	c.connectionLock.RLock()
 	defer c.connectionLock.RUnlock()
 	if c.service == nil {
-		return nil, fmt.Errorf("no connection to %s", c.apiEndPoint)
+		return nil, fmt.Errorf("no connection to %s", c.serverEndPoint)
 	}
 	return c.service, nil
 }
@@ -167,7 +161,7 @@
 	c.connectionLock.RLock()
 	defer c.connectionLock.RUnlock()
 	if c.service == nil {
-		return nil, fmt.Errorf("no core connection to %s", c.apiEndPoint)
+		return nil, fmt.Errorf("no core connection to %s", c.serverEndPoint)
 	}
 	client, ok := c.service.(core_service.CoreServiceClient)
 	if ok {
@@ -182,7 +176,7 @@
 	c.connectionLock.RLock()
 	defer c.connectionLock.RUnlock()
 	if c.service == nil {
-		return nil, fmt.Errorf("no child adapter connection to %s", c.apiEndPoint)
+		return nil, fmt.Errorf("no child adapter connection to %s", c.serverEndPoint)
 	}
 	client, ok := c.service.(onu_inter_adapter_service.OnuInterAdapterServiceClient)
 	if ok {
@@ -197,7 +191,7 @@
 	c.connectionLock.RLock()
 	defer c.connectionLock.RUnlock()
 	if c.service == nil {
-		return nil, fmt.Errorf("no parent adapter connection to %s", c.apiEndPoint)
+		return nil, fmt.Errorf("no parent adapter connection to %s", c.serverEndPoint)
 	}
 	client, ok := c.service.(olt_inter_adapter_service.OltInterAdapterServiceClient)
 	if ok {
@@ -207,7 +201,7 @@
 }
 
 func (c *Client) Reset(ctx context.Context) {
-	logger.Debugw(ctx, "resetting-client-connection", log.Fields{"endpoint": c.apiEndPoint})
+	logger.Debugw(ctx, "resetting-client-connection", log.Fields{"endpoint": c.serverEndPoint})
 	c.stateLock.Lock()
 	defer c.stateLock.Unlock()
 	if c.state == stateConnected {
@@ -222,7 +216,7 @@
 	err := invoker(ctx, method, req, reply, cc, opts...)
 	// On connection failure, start the reconnect process depending on the error response
 	if err != nil {
-		logger.Errorw(ctx, "received-error", log.Fields{"error": err, "context": ctx, "endpoint": c.apiEndPoint})
+		logger.Errorw(ctx, "received-error", log.Fields{"error": err, "context": ctx, "endpoint": c.serverEndPoint})
 		if strings.Contains(err.Error(), connectionErrorSubString) ||
 			strings.Contains(err.Error(), connectionError) ||
 			strings.Contains(err.Error(), connectionSystemNotReady) ||
@@ -230,12 +224,12 @@
 			c.stateLock.Lock()
 			if c.state == stateConnected {
 				c.state = stateDisconnected
-				logger.Warnw(context.Background(), "sending-disconnect-event", log.Fields{"endpoint": c.apiEndPoint, "error": err, "curr-state": stateConnected, "new-state": c.state})
+				logger.Warnw(context.Background(), "sending-disconnect-event", log.Fields{"endpoint": c.serverEndPoint, "error": err, "curr-state": stateConnected, "new-state": c.state})
 				c.events <- eventDisconnected
 			}
 			c.stateLock.Unlock()
 		} else if strings.Contains(err.Error(), connectionClosedSubstring) {
-			logger.Errorw(context.Background(), "invalid-client-connection-closed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+			logger.Errorw(context.Background(), "invalid-client-connection-closed", log.Fields{"endpoint": c.serverEndPoint, "error": err})
 		}
 		return err
 	}
@@ -244,24 +238,17 @@
 	return nil
 }
 
-// updateActivity pushes an activity indication on the channel so that the monitoring routine does not validate
-// the gRPC connection when the connection is being used. Note that this update is done both when the connection
-// is alive or a connection error is returned. A separate routine takes care of doing the re-connect.
+// updateActivity updates the liveness channel
 func (c *Client) updateActivity(ctx context.Context) {
-	c.activeChMutex.RLock()
-	defer c.activeChMutex.RUnlock()
-	if c.activeCh != nil {
-		logger.Debugw(ctx, "update-activity", log.Fields{"api-endpoint": c.apiEndPoint})
-		c.activeCh <- struct{}{}
+	logger.Debugw(ctx, "update-activity", log.Fields{"api-endpoint": c.serverEndPoint})
 
-		// Update liveness only in connected state
-		if c.livenessCallback != nil {
-			c.stateLock.RLock()
-			if c.state == stateConnected {
-				c.livenessCallback(time.Now())
-			}
-			c.stateLock.RUnlock()
+	// Update liveness only in connected state
+	if c.livenessCallback != nil {
+		c.stateLock.RLock()
+		if c.state == stateConnected {
+			c.livenessCallback(time.Now())
 		}
+		c.stateLock.RUnlock()
 	}
 }
 
@@ -282,14 +269,7 @@
 // timeout, it will send a default API request on that connection.   If the connection is good then nothing
 // happens.  If it's bad this will trigger reconnection attempts.
 func (c *Client) monitorActivity(ctx context.Context, handler SetAndTestServiceHandler) {
-	logger.Infow(ctx, "start-activity-monitor", log.Fields{"endpoint": c.apiEndPoint})
-
-	// Create an activity monitor channel.  Unbuffered channel works well.  However, we use a buffered
-	// channel here as a safeguard of having the grpc interceptor publishing too many events that can be
-	// consumed by this monitoring thread
-	c.activeChMutex.Lock()
-	c.activeCh = make(chan struct{}, 10)
-	c.activeChMutex.Unlock()
+	logger.Infow(ctx, "start-activity-monitor", log.Fields{"endpoint": c.serverEndPoint})
 
 	grpcMonitorCheckRunning := false
 	var grpcMonitorCheckRunningLock sync.RWMutex
@@ -301,18 +281,14 @@
 		timeoutTimer := time.NewTimer(timeout)
 		select {
 
-		case <-c.activeCh:
-			logger.Debugw(ctx, "endpoint-reachable", log.Fields{"endpoint": c.apiEndPoint})
-
-			// Reset timer
+		case <-ctx.Done():
+			// Stop and drain timer
 			if !timeoutTimer.Stop() {
 				select {
 				case <-timeoutTimer.C:
 				default:
 				}
 			}
-
-		case <-ctx.Done():
 			break loop
 
 		case <-timeoutTimer.C:
@@ -328,21 +304,21 @@
 					grpcMonitorCheckRunningLock.Lock()
 					if grpcMonitorCheckRunning {
 						grpcMonitorCheckRunningLock.Unlock()
-						logger.Debugw(ctx, "connection-check-already-in-progress", log.Fields{"api-endpoint": c.apiEndPoint})
+						logger.Debugw(ctx, "connection-check-already-in-progress", log.Fields{"api-endpoint": c.serverEndPoint})
 						return
 					}
 					grpcMonitorCheckRunning = true
 					grpcMonitorCheckRunningLock.Unlock()
 
-					logger.Debugw(ctx, "connection-check-start", log.Fields{"api-endpoint": c.apiEndPoint})
+					logger.Debugw(ctx, "connection-check-start", log.Fields{"api-endpoint": c.serverEndPoint})
 					subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
 					defer cancel()
 					subCtx = WithGrpcMonitorContext(subCtx, "grpc-monitor")
 					c.connectionLock.RLock()
 					defer c.connectionLock.RUnlock()
 					if c.connection != nil {
-						response := handler(subCtx, c.connection)
-						logger.Debugw(ctx, "connection-check-response", log.Fields{"api-endpoint": c.apiEndPoint, "up": response != nil})
+						response := handler(subCtx, c.connection, &common.Connection{Endpoint: c.clientEndpoint, KeepAliveInterval: int64(c.monitorInterval)})
+						logger.Debugw(ctx, "connection-check-response", log.Fields{"api-endpoint": c.serverEndPoint, "up": response != nil})
 					}
 					grpcMonitorCheckRunningLock.Lock()
 					grpcMonitorCheckRunning = false
@@ -351,23 +327,21 @@
 			}
 		}
 	}
-	logger.Infow(ctx, "activity-monitor-stopping", log.Fields{"endpoint": c.apiEndPoint})
+	logger.Infow(ctx, "activity-monitor-stopping", log.Fields{"endpoint": c.serverEndPoint})
 }
 
 // Start kicks off the adapter agent by trying to connect to the adapter
 func (c *Client) Start(ctx context.Context, handler SetAndTestServiceHandler) {
-	logger.Debugw(ctx, "Starting GRPC - Client", log.Fields{"api-endpoint": c.apiEndPoint})
+	logger.Debugw(ctx, "Starting GRPC - Client", log.Fields{"api-endpoint": c.serverEndPoint})
 
 	// If the context contains a k8s probe then register services
 	p := probe.GetProbeFromContext(ctx)
 	if p != nil {
-		p.RegisterService(ctx, c.apiEndPoint)
+		p.RegisterService(ctx, c.serverEndPoint)
 	}
 
-	// Enable activity check, if required
-	if c.activityCheck {
-		go c.monitorActivity(ctx, handler)
-	}
+	// Enable activity check
+	go c.monitorActivity(ctx, handler)
 
 	initialConnection := true
 	c.events <- eventConnecting
@@ -377,22 +351,22 @@
 	for {
 		select {
 		case <-ctx.Done():
-			logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.apiEndPoint})
+			logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.serverEndPoint})
 			break loop
 		case event := <-c.events:
-			logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+			logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.serverEndPoint})
 			c.connectionLock.RLock()
 			// On a client stopped, just allow the stop event to go through
 			if c.done && event != eventStopped {
 				c.connectionLock.RUnlock()
-				logger.Debugw(ctx, "ignoring-event-on-client-stop", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+				logger.Debugw(ctx, "ignoring-event-on-client-stop", log.Fields{"event": event, "endpoint": c.serverEndPoint})
 				continue
 			}
 			c.connectionLock.RUnlock()
 			switch event {
 			case eventConnecting:
 				c.stateLock.Lock()
-				logger.Debugw(ctx, "connection-start", log.Fields{"endpoint": c.apiEndPoint, "attempts": attempt, "curr-state": c.state})
+				logger.Debugw(ctx, "connection-start", log.Fields{"endpoint": c.serverEndPoint, "attempts": attempt, "curr-state": c.state})
 				if c.state == stateConnected {
 					c.state = stateDisconnected
 				}
@@ -403,12 +377,12 @@
 							c.stateLock.Lock()
 							c.state = stateDisconnected
 							c.stateLock.Unlock()
-							logger.Errorw(ctx, "connection-failed", log.Fields{"endpoint": c.apiEndPoint, "attempt": attempt, "error": err})
+							logger.Errorw(ctx, "connection-failed", log.Fields{"endpoint": c.serverEndPoint, "attempt": attempt, "error": err})
 
 							// Retry connection after a delay
 							if err = backoff.Backoff(ctx); err != nil {
 								// Context has closed or reached maximum elapsed time, if set
-								logger.Errorw(ctx, "retry-aborted", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+								logger.Errorw(ctx, "retry-aborted", log.Fields{"endpoint": c.serverEndPoint, "error": err})
 								return
 							}
 							attempt += 1
@@ -427,19 +401,19 @@
 			case eventConnected:
 				attempt = 1
 				c.stateLock.Lock()
-				logger.Debugw(ctx, "endpoint-connected", log.Fields{"endpoint": c.apiEndPoint, "curr-state": c.state})
+				logger.Debugw(ctx, "endpoint-connected", log.Fields{"endpoint": c.serverEndPoint, "curr-state": c.state})
 				if c.state != stateConnected {
 					c.state = stateConnected
 					if initialConnection {
-						logger.Debugw(ctx, "initial-endpoint-connection", log.Fields{"endpoint": c.apiEndPoint})
+						logger.Debugw(ctx, "initial-endpoint-connection", log.Fields{"endpoint": c.serverEndPoint})
 						initialConnection = false
 					} else {
-						logger.Debugw(ctx, "endpoint-reconnection", log.Fields{"endpoint": c.apiEndPoint})
+						logger.Debugw(ctx, "endpoint-reconnection", log.Fields{"endpoint": c.serverEndPoint})
 						// Trigger any callback on a restart
 						go func() {
-							err := c.onRestart(log.WithSpanFromContext(context.Background(), ctx), c.apiEndPoint)
+							err := c.onRestart(log.WithSpanFromContext(context.Background(), ctx), c.serverEndPoint)
 							if err != nil {
-								logger.Errorw(ctx, "unable-to-restart-endpoint", log.Fields{"error": err, "endpoint": c.apiEndPoint})
+								logger.Errorw(ctx, "unable-to-restart-endpoint", log.Fields{"error": err, "endpoint": c.serverEndPoint})
 							}
 						}()
 					}
@@ -448,36 +422,36 @@
 
 			case eventDisconnected:
 				if p != nil {
-					p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusNotReady)
+					p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusNotReady)
 				}
 				c.stateLock.RLock()
-				logger.Debugw(ctx, "endpoint-disconnected", log.Fields{"endpoint": c.apiEndPoint, "curr-state": c.state})
+				logger.Debugw(ctx, "endpoint-disconnected", log.Fields{"endpoint": c.serverEndPoint, "curr-state": c.state})
 				c.stateLock.RUnlock()
 
 				// Try to connect again
 				c.events <- eventConnecting
 
 			case eventStopped:
-				logger.Debugw(ctx, "endPoint-stopped", log.Fields{"adapter": c.apiEndPoint})
+				logger.Debugw(ctx, "endPoint-stopped", log.Fields{"adapter": c.serverEndPoint})
 				go func() {
 					if err := c.closeConnection(ctx, p); err != nil {
-						logger.Errorw(ctx, "endpoint-closing-connection-failed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+						logger.Errorw(ctx, "endpoint-closing-connection-failed", log.Fields{"endpoint": c.serverEndPoint, "error": err})
 					}
 				}()
 				break loop
 			case eventError:
-				logger.Errorw(ctx, "endpoint-error-event", log.Fields{"endpoint": c.apiEndPoint})
+				logger.Errorw(ctx, "endpoint-error-event", log.Fields{"endpoint": c.serverEndPoint})
 			default:
-				logger.Errorw(ctx, "endpoint-unknown-event", log.Fields{"endpoint": c.apiEndPoint, "error": event})
+				logger.Errorw(ctx, "endpoint-unknown-event", log.Fields{"endpoint": c.serverEndPoint, "error": event})
 			}
 		}
 	}
-	logger.Infow(ctx, "endpoint-stopped", log.Fields{"endpoint": c.apiEndPoint})
+	logger.Infow(ctx, "endpoint-stopped", log.Fields{"endpoint": c.serverEndPoint})
 }
 
 func (c *Client) connectToEndpoint(ctx context.Context, handler SetAndTestServiceHandler, p *probe.Probe) error {
 	if p != nil {
-		p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusPreparing)
+		p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusPreparing)
 	}
 
 	c.connectionLock.Lock()
@@ -494,7 +468,7 @@
 	// 1. automatically inject
 	// 2. publish Open Tracing Spans by this GRPC Client
 	// 3. detect connection failure on client calls such that the reconnection process can begin
-	conn, err := grpc.Dial(c.apiEndPoint,
+	conn, err := grpc.Dial(c.serverEndPoint,
 		grpc.WithInsecure(),
 		grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
 			grpc_opentracing.StreamClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
@@ -514,33 +488,33 @@
 	if err == nil {
 		subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
 		defer cancel()
-		svc := handler(subCtx, conn)
+		svc := handler(subCtx, conn, &common.Connection{Endpoint: c.clientEndpoint, KeepAliveInterval: int64(c.monitorInterval)})
 		if svc != nil {
 			c.connection = conn
 			c.service = svc
 			if p != nil {
-				p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusRunning)
+				p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusRunning)
 			}
-			logger.Infow(ctx, "connected-to-endpoint", log.Fields{"endpoint": c.apiEndPoint})
+			logger.Infow(ctx, "connected-to-endpoint", log.Fields{"endpoint": c.serverEndPoint})
 			c.events <- eventConnected
 			return nil
 		}
 	}
 	logger.Warnw(ctx, "Failed to connect to endpoint",
 		log.Fields{
-			"endpoint": c.apiEndPoint,
+			"endpoint": c.serverEndPoint,
 			"error":    err,
 		})
 
 	if p != nil {
-		p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusFailed)
+		p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusFailed)
 	}
-	return fmt.Errorf("no connection to endpoint %s", c.apiEndPoint)
+	return fmt.Errorf("no connection to endpoint %s", c.serverEndPoint)
 }
 
 func (c *Client) closeConnection(ctx context.Context, p *probe.Probe) error {
 	if p != nil {
-		p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusStopped)
+		p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusStopped)
 	}
 
 	c.connectionLock.Lock()
@@ -563,7 +537,7 @@
 		c.events <- eventStopped
 		close(c.events)
 	}
-	logger.Infow(ctx, "client-stopped", log.Fields{"endpoint": c.apiEndPoint})
+	logger.Infow(ctx, "client-stopped", log.Fields{"endpoint": c.serverEndPoint})
 }
 
 // SetService is used for testing only