[VOL-4514] Addressing device reconciliation failure

See comments on https://jira.opencord.org/browse/VOL-4514

This change is dependent on the related proto and voltha lib go
changes to be merged first.  Until then jenkins will fail.

Change-Id: I597eff075fc5c810e914d5685f9603dac2de8e78
diff --git a/internal/pkg/core/openonu.go b/internal/pkg/core/openonu.go
index c356d06..f0da31b 100755
--- a/internal/pkg/core/openonu.go
+++ b/internal/pkg/core/openonu.go
@@ -21,6 +21,7 @@
 	"context"
 	"errors"
 	"fmt"
+	"hash/fnv"
 	"sync"
 	"time"
 
@@ -32,6 +33,8 @@
 	"github.com/opencord/voltha-protos/v5/go/health"
 	"github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
@@ -52,6 +55,11 @@
 
 var onuKvStorePathPrefixes = []string{cmn.CBasePathOnuKVStore, pmmgr.CPmKvStorePrefixBase}
 
+type reachabilityFromRemote struct {
+	lastKeepAlive     time.Time
+	keepAliveInterval int64
+}
+
 //OpenONUAC structure holds the ONU core information
 type OpenONUAC struct {
 	deviceHandlers              map[string]*deviceHandler
@@ -60,6 +68,8 @@
 	coreClient                  *vgrpc.Client
 	parentAdapterClients        map[string]*vgrpc.Client
 	lockParentAdapterClients    sync.RWMutex
+	reachableFromRemote         map[string]*reachabilityFromRemote
+	lockReachableFromRemote     sync.RWMutex
 	eventProxy                  eventif.EventProxy
 	kvClient                    kvstore.Client
 	cm                          *conf.ConfigManager
@@ -74,19 +84,18 @@
 	HeartbeatCheckInterval      time.Duration
 	HeartbeatFailReportInterval time.Duration
 	AcceptIncrementalEvto       bool
-	//GrpcTimeoutInterval         time.Duration
-	pSupportedFsms             *cmn.OmciDeviceFsms
-	maxTimeoutInterAdapterComm time.Duration
-	maxTimeoutReconciling      time.Duration
-	pDownloadManager           *swupg.AdapterDownloadManager
-	pFileManager               *swupg.FileDownloadManager //let coexist 'old and new' DownloadManager as long as 'old' does not get obsolete
-	MetricsEnabled             bool
-	mibAuditInterval           time.Duration
-	omciTimeout                int // in seconds
-	alarmAuditInterval         time.Duration
-	dlToOnuTimeout4M           time.Duration
-	rpcTimeout                 time.Duration
-	maxConcurrentFlowsPerUni   int
+	pSupportedFsms              *cmn.OmciDeviceFsms
+	maxTimeoutInterAdapterComm  time.Duration
+	maxTimeoutReconciling       time.Duration
+	pDownloadManager            *swupg.AdapterDownloadManager
+	pFileManager                *swupg.FileDownloadManager //let coexist 'old and new' DownloadManager as long as 'old' does not get obsolete
+	MetricsEnabled              bool
+	mibAuditInterval            time.Duration
+	omciTimeout                 int // in seconds
+	alarmAuditInterval          time.Duration
+	dlToOnuTimeout4M            time.Duration
+	rpcTimeout                  time.Duration
+	maxConcurrentFlowsPerUni    int
 }
 
 //NewOpenONUAC returns a new instance of OpenONU_AC
@@ -97,6 +106,7 @@
 	openOnuAc.deviceHandlers = make(map[string]*deviceHandler)
 	openOnuAc.deviceHandlersCreateChan = make(map[string]chan bool)
 	openOnuAc.parentAdapterClients = make(map[string]*vgrpc.Client)
+	openOnuAc.reachableFromRemote = make(map[string]*reachabilityFromRemote)
 	openOnuAc.mutexDeviceHandlersMap = sync.RWMutex{}
 	openOnuAc.config = cfg
 	openOnuAc.cm = cm
@@ -216,7 +226,10 @@
 }
 
 // GetHealthStatus is used as a service readiness validation as a grpc connection
-func (oo *OpenONUAC) GetHealthStatus(ctx context.Context, empty *empty.Empty) (*health.HealthStatus, error) {
+func (oo *OpenONUAC) GetHealthStatus(ctx context.Context, clientConn *common.Connection) (*health.HealthStatus, error) {
+	// Update the remote reachability
+	oo.updateReachabilityFromRemote(ctx, clientConn)
+
 	return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
 }
 
@@ -249,7 +262,13 @@
 		logger.Warn(ctx, "reconcile-device-voltha-device-is-nil")
 		return nil, errors.New("nil-device")
 	}
-	logger.Infow(ctx, "reconcile-device", log.Fields{"device-id": device.Id})
+	logger.Infow(ctx, "reconcile-device", log.Fields{"device-id": device.Id, "parent-id": device.ParentId})
+
+	// Check whether the grpc client in the adapter of the parent device can reach us yet
+	if !oo.isReachableFromRemote(device.ProxyAddress.AdapterEndpoint, device.ParentId) {
+		return nil, status.Errorf(codes.Unavailable, "adapter-not-reachable-from-parent-%s", device.ProxyAddress.AdapterEndpoint)
+	}
+
 	var handler *deviceHandler
 	if handler = oo.getDeviceHandler(ctx, device.Id, false); handler == nil {
 		handler := newDeviceHandler(ctx, oo.coreClient, oo.eventProxy, device, oo)
@@ -964,6 +983,38 @@
  * Parent GRPC clients
  */
 
+func getHash(endpoint, contextInfo string) string {
+	strToHash := endpoint + contextInfo
+	h := fnv.New128().Sum([]byte(strToHash))
+	return string(h)
+}
+
+func (oo *OpenONUAC) updateReachabilityFromRemote(ctx context.Context, remote *common.Connection) {
+	logger.Debugw(context.Background(), "updating-remote-connection-status", log.Fields{"remote": remote})
+	oo.lockReachableFromRemote.Lock()
+	defer oo.lockReachableFromRemote.Unlock()
+	endpointHash := getHash(remote.Endpoint, remote.ContextInfo)
+	if _, ok := oo.reachableFromRemote[endpointHash]; ok {
+		oo.reachableFromRemote[endpointHash].lastKeepAlive = time.Now()
+		oo.reachableFromRemote[endpointHash].keepAliveInterval = remote.KeepAliveInterval
+		return
+	}
+	logger.Debugw(context.Background(), "initial-remote-connection", log.Fields{"remote": remote})
+	oo.reachableFromRemote[endpointHash] = &reachabilityFromRemote{lastKeepAlive: time.Now(), keepAliveInterval: remote.KeepAliveInterval}
+}
+
+func (oo *OpenONUAC) isReachableFromRemote(endpoint string, contextInfo string) bool {
+	oo.lockReachableFromRemote.RLock()
+	defer oo.lockReachableFromRemote.RUnlock()
+	endpointHash := getHash(endpoint, contextInfo)
+	if _, ok := oo.reachableFromRemote[endpointHash]; ok {
+		// Assume the connection is down if we did not receive 2 keep alives in succession
+		maxKeepAliveWait := time.Duration(oo.reachableFromRemote[endpointHash].keepAliveInterval * 2)
+		return time.Since(oo.reachableFromRemote[endpointHash].lastKeepAlive) <= maxKeepAliveWait
+	}
+	return false
+}
+
 func (oo *OpenONUAC) setupParentInterAdapterClient(ctx context.Context, endpoint string) error {
 	logger.Infow(ctx, "setting-parent-adapter-connection", log.Fields{"parent-endpoint": endpoint})
 	oo.lockParentAdapterClients.Lock()
@@ -972,9 +1023,10 @@
 		return nil
 	}
 
-	childClient, err := vgrpc.NewClient(endpoint,
-		oo.oltAdapterRestarted,
-		vgrpc.ActivityCheck(true))
+	childClient, err := vgrpc.NewClient(
+		oo.config.AdapterEndpoint,
+		endpoint,
+		oo.oltAdapterRestarted)
 
 	if err != nil {
 		return err
@@ -982,7 +1034,7 @@
 
 	oo.parentAdapterClients[endpoint] = childClient
 
-	go oo.parentAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), setAndTestAdapterServiceHandler)
+	go oo.parentAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), setAndTestOltInterAdapterServiceHandler)
 
 	// Wait until we have a connection to the child adapter.
 	// Unlimited retries or until context expires
@@ -1037,10 +1089,10 @@
 	return nil
 }
 
-// setAndTestAdapterServiceHandler is used to test whether the remote gRPC service is up
-func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+// setAndTestOltInterAdapterServiceHandler is used to test whether the remote gRPC service is up
+func setAndTestOltInterAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
 	svc := olt_inter_adapter_service.NewOltInterAdapterServiceClient(conn)
-	if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
+	if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
 		return nil
 	}
 	return svc