[VOL-4514] Addressing device reconciliation failure

See comments on https://jira.opencord.org/browse/VOL-4514

This change is dependent on the related proto and voltha lib go
changes to be merged first.  Until then jenkins will fail.

Change-Id: I8d99c3619d630677d402b9fb4b4f0bc22dd9a9f0
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index c164826..d030e72 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -22,10 +22,10 @@
 	"sync"
 	"time"
 
-	"github.com/golang/protobuf/ptypes/empty"
 	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
 	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"github.com/opencord/voltha-protos/v5/go/adapter_service"
+	"github.com/opencord/voltha-protos/v5/go/common"
 	"github.com/opencord/voltha-protos/v5/go/health"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc"
@@ -40,32 +40,35 @@
 	adapterLock        sync.RWMutex
 	onAdapterRestart   vgrpc.RestartedHandler
 	liveProbeInterval  time.Duration
+	coreEndpoint       string
 }
 
-func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
 	svc := adapter_service.NewAdapterServiceClient(conn)
-	if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
-		logger.Debugw(ctx, "connection-not-ready", log.Fields{"error": err, "health": h})
+	if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
+		logger.Debugw(ctx, "remote-connection-not-ready", log.Fields{"error": err, "health": h, "requester": clientConn, "target": conn.Target()})
 		return nil
 	}
 	return svc
 }
 
-func newAdapterAgent(adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration) *agent {
+func newAdapterAgent(coreEndpoint string, adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration) *agent {
 	return &agent{
 		adapter:            adapter,
 		onAdapterRestart:   onAdapterRestart,
 		adapterAPIEndPoint: adapter.Endpoint,
 		liveProbeInterval:  liveProbeInterval,
+		coreEndpoint:       coreEndpoint,
 	}
 }
 
 func (aa *agent) start(ctx context.Context) error {
 	// Establish grpc connection to Core
 	var err error
-	if aa.vClient, err = vgrpc.NewClient(aa.adapterAPIEndPoint,
-		aa.onAdapterRestart,
-		vgrpc.ActivityCheck(true)); err != nil {
+	if aa.vClient, err = vgrpc.NewClient(
+		aa.coreEndpoint,
+		aa.adapterAPIEndPoint,
+		aa.onAdapterRestart); err != nil {
 		return err
 	}
 
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index 790a670..258ff2a 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -52,6 +52,7 @@
 	lockDeviceTypesMap      sync.RWMutex
 	lockAdapterEndPointsMap sync.RWMutex
 	liveProbeInterval       time.Duration
+	coreEndpoint            string
 }
 
 // SetAdapterRestartedCallback is used to set the callback that needs to be invoked on an adapter restart
@@ -60,6 +61,7 @@
 }
 
 func NewAdapterManager(
+	coreEndpoint string,
 	dbPath *model.Path,
 	coreInstanceID string,
 	backend *db.Backend,
@@ -73,6 +75,7 @@
 		adapterEndpoints:  make(map[Endpoint]*agent),
 		endpointMgr:       NewEndpointManager(backend),
 		liveProbeInterval: liveProbeInterval,
+		coreEndpoint:      coreEndpoint,
 	}
 }
 
@@ -178,7 +181,7 @@
 		// Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
 		// This handler just log the restart event.  The actual action taken following an adapter restart
 		// will be done when an adapter re-registers itself.
-		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
+		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(aMgr.coreEndpoint, clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
 		aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
 	}
 	return nil
diff --git a/rw_core/core/api/grpc_nbi_handler.go b/rw_core/core/api/grpc_nbi_handler.go
index eb80522..8adcdc5 100755
--- a/rw_core/core/api/grpc_nbi_handler.go
+++ b/rw_core/core/api/grpc_nbi_handler.go
@@ -26,7 +26,6 @@
 	"github.com/opencord/voltha-go/rw_core/core/device"
 	"github.com/opencord/voltha-lib-go/v7/pkg/version"
 	"github.com/opencord/voltha-protos/v5/go/common"
-	"github.com/opencord/voltha-protos/v5/go/health"
 	"github.com/opencord/voltha-protos/v5/go/omci"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
@@ -50,10 +49,6 @@
 	}
 }
 
-func (handler *APIHandler) GetHealthStatus(ctx context.Context, empty *empty.Empty) (*health.HealthStatus, error) {
-	return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
-}
-
 // GetVoltha currently just returns version information
 func (handler *APIHandler) GetVoltha(ctx context.Context, _ *empty.Empty) (*voltha.Voltha, error) {
 	logger.Debug(ctx, "GetVoltha")
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0fee3e3..83282e3 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -141,7 +141,7 @@
 	dbPath := model.NewDBPath(backend)
 
 	// load adapters & device types while other things are starting
-	adapterMgr := adapter.NewAdapterManager(dbPath, id, backend, cf.LiveProbeInterval)
+	adapterMgr := adapter.NewAdapterManager(cf.GrpcSBIAddress, dbPath, id, backend, cf.LiveProbeInterval)
 	adapterMgr.Start(ctx, adapterService)
 
 	// create the core of the system, the device managers
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 1adf88c..2c69b28 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -1367,11 +1367,15 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
+	logger.Infow(ctx, "aborting-current-running-requests-after-wait", log.Fields{"device-id": agent.deviceID})
+
 	defer agent.requestQueue.RequestComplete()
 
 	// If any reconciling is in progress just abort it. The adapter is gone.
 	agent.stopReconcile()
 
+	logger.Infow(ctx, "aborting-current-running-requests-after-sendstop", log.Fields{"device-id": agent.deviceID})
+
 	// Update the Core device transient state accordingly
 	var updatedState core.DeviceTransientState_Types
 	switch agent.getTransientState() {
@@ -1515,7 +1519,7 @@
 
 	//making here to keep lifecycle of this channel within the scope of retryReconcile
 	agent.stopReconcilingMutex.Lock()
-	agent.stopReconciling = make(chan int)
+	agent.stopReconciling = make(chan int, 1)
 	agent.stopReconcilingMutex.Unlock()
 
 	// defined outside the retry loop so it can be cleaned
@@ -1591,6 +1595,8 @@
 		break retry
 	}
 
+	logger.Debugw(ctx, "reconcile-retry-ends", log.Fields{"adapter-endpoint": agent.adapterEndpoint})
+
 	// Retry loop is broken, so stop any timers and drain the channel
 	if backoffTimer != nil && !backoffTimer.Stop() {
 
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 5cc1f26..354f306 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -131,7 +131,7 @@
 		LivenessChannelInterval: cfg.LiveProbeInterval / 2}
 
 	proxy := model.NewDBPath(backend)
-	dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, backend, 5)
+	dat.adapterMgr = adapter.NewAdapterManager("test-endpoint", proxy, dat.coreInstanceID, backend, 5)
 	eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
 	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, cfg, dat.coreInstanceID, eventProxy)
 	dat.adapterMgr.Start(context.Background(), "agent-test")
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index e611019..19ed77c 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -154,7 +154,7 @@
 		LivenessChannelInterval: cfg.LiveProbeInterval / 2}
 
 	proxy := model.NewDBPath(backend)
-	adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, backend, 5)
+	adapterMgr := adapter.NewAdapterManager("test-endpoint", proxy, lda.coreInstanceID, backend, 5)
 	eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
 	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, cfg, lda.coreInstanceID, eventProxy)
 	adapterMgr.Start(context.Background(), "logical-test")
diff --git a/rw_core/core/device/manager_sbi.go b/rw_core/core/device/manager_sbi.go
index b518b2a..dcf6599 100644
--- a/rw_core/core/device/manager_sbi.go
+++ b/rw_core/core/device/manager_sbi.go
@@ -23,11 +23,17 @@
 	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"github.com/opencord/voltha-protos/v5/go/common"
 	ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
+	"github.com/opencord/voltha-protos/v5/go/health"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
+func (dMgr *Manager) GetHealthStatus(ctx context.Context, clientConn *common.Connection) (*health.HealthStatus, error) {
+	logger.Debugw(ctx, "get-health-status-from-remote", log.Fields{"remote-client": clientConn})
+	return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
+}
+
 func (dMgr *Manager) PortCreated(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
 	ctx = utils.WithNewSpanAndRPCMetadataContext(ctx, "PortCreated")
 
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index 5b8cc1b..b99ca52 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -169,16 +169,16 @@
 	ta.Probe.UpdateStatus(ctx, serviceName, probe.ServiceStatusStopped)
 }
 
-func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
 	svc := core_service.NewCoreServiceClient(conn)
-	if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
+	if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
 		return nil
 	}
 	return svc
 }
 
 // gRPC service
-func (ta *Adapter) GetHealthStatus(ctx context.Context, empty *empty.Empty) (*health.HealthStatus, error) {
+func (ta *Adapter) GetHealthStatus(ctx context.Context, clientConn *common.Connection) (*health.HealthStatus, error) {
 	return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
 }
 
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 9bfb2d8..96dfb33 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -91,9 +91,10 @@
 	go oltA.startGRPCService(ctx, oltA.grpcServer, oltA, "olt-grpc-service")
 
 	// Establish grpc connection to Core
-	if oltA.coreClient, err = vgrpc.NewClient(oltA.coreEnpoint,
-		oltA.oltRestarted,
-		vgrpc.ActivityCheck(true)); err != nil {
+	if oltA.coreClient, err = vgrpc.NewClient(
+		"olt-endpoint",
+		oltA.coreEnpoint,
+		oltA.oltRestarted); err != nil {
 		logger.Fatal(ctx, "grpc-client-not-created")
 	}
 
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index 7712b23..d5669c3 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -86,9 +86,10 @@
 	go onuA.startGRPCService(ctx, onuA.grpcServer, onuA, "onu-grpc-service")
 
 	// Establish grpc connection to Core
-	if onuA.coreClient, err = vgrpc.NewClient(onuA.coreEnpoint,
-		onuA.onuRestarted,
-		vgrpc.ActivityCheck(true)); err != nil {
+	if onuA.coreClient, err = vgrpc.NewClient(
+		"onu-endpoint",
+		onuA.coreEnpoint,
+		onuA.onuRestarted); err != nil {
 		logger.Fatal(ctx, "grpc-client-not-created")
 	}
 	go onuA.coreClient.Start(probeCtx, setAndTestCoreServiceHandler)
diff --git a/rw_core/test/core_nbi_handler_multi_test.go b/rw_core/test/core_nbi_handler_multi_test.go
index a8f9a0e..2856744 100755
--- a/rw_core/test/core_nbi_handler_multi_test.go
+++ b/rw_core/test/core_nbi_handler_multi_test.go
@@ -2149,7 +2149,7 @@
 }
 
 func TestSuite(t *testing.T) {
-	log.SetAllLogLevel(log.FatalLevel)
+	log.SetAllLogLevel(log.DebugLevel)
 
 	// Create a context to be cancelled at the end of all tests.  This will trigger closing of any ressources used.
 	ctx, cancel := context.WithCancel(context.Background())