[VOL-4514] Addressing device reconciliation failure
See comments on https://jira.opencord.org/browse/VOL-4514
This change is dependent on the related proto and voltha lib go
changes to be merged first. Until then jenkins will fail.
Change-Id: I597eff075fc5c810e914d5685f9603dac2de8e78
diff --git a/internal/pkg/core/openonu.go b/internal/pkg/core/openonu.go
index c356d06..f0da31b 100755
--- a/internal/pkg/core/openonu.go
+++ b/internal/pkg/core/openonu.go
@@ -21,6 +21,7 @@
"context"
"errors"
"fmt"
+ "hash/fnv"
"sync"
"time"
@@ -32,6 +33,8 @@
"github.com/opencord/voltha-protos/v5/go/health"
"github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
@@ -52,6 +55,11 @@
var onuKvStorePathPrefixes = []string{cmn.CBasePathOnuKVStore, pmmgr.CPmKvStorePrefixBase}
+type reachabilityFromRemote struct {
+ lastKeepAlive time.Time
+ keepAliveInterval int64
+}
+
//OpenONUAC structure holds the ONU core information
type OpenONUAC struct {
deviceHandlers map[string]*deviceHandler
@@ -60,6 +68,8 @@
coreClient *vgrpc.Client
parentAdapterClients map[string]*vgrpc.Client
lockParentAdapterClients sync.RWMutex
+ reachableFromRemote map[string]*reachabilityFromRemote
+ lockReachableFromRemote sync.RWMutex
eventProxy eventif.EventProxy
kvClient kvstore.Client
cm *conf.ConfigManager
@@ -74,19 +84,18 @@
HeartbeatCheckInterval time.Duration
HeartbeatFailReportInterval time.Duration
AcceptIncrementalEvto bool
- //GrpcTimeoutInterval time.Duration
- pSupportedFsms *cmn.OmciDeviceFsms
- maxTimeoutInterAdapterComm time.Duration
- maxTimeoutReconciling time.Duration
- pDownloadManager *swupg.AdapterDownloadManager
- pFileManager *swupg.FileDownloadManager //let coexist 'old and new' DownloadManager as long as 'old' does not get obsolete
- MetricsEnabled bool
- mibAuditInterval time.Duration
- omciTimeout int // in seconds
- alarmAuditInterval time.Duration
- dlToOnuTimeout4M time.Duration
- rpcTimeout time.Duration
- maxConcurrentFlowsPerUni int
+ pSupportedFsms *cmn.OmciDeviceFsms
+ maxTimeoutInterAdapterComm time.Duration
+ maxTimeoutReconciling time.Duration
+ pDownloadManager *swupg.AdapterDownloadManager
+ pFileManager *swupg.FileDownloadManager //let coexist 'old and new' DownloadManager as long as 'old' does not get obsolete
+ MetricsEnabled bool
+ mibAuditInterval time.Duration
+ omciTimeout int // in seconds
+ alarmAuditInterval time.Duration
+ dlToOnuTimeout4M time.Duration
+ rpcTimeout time.Duration
+ maxConcurrentFlowsPerUni int
}
//NewOpenONUAC returns a new instance of OpenONU_AC
@@ -97,6 +106,7 @@
openOnuAc.deviceHandlers = make(map[string]*deviceHandler)
openOnuAc.deviceHandlersCreateChan = make(map[string]chan bool)
openOnuAc.parentAdapterClients = make(map[string]*vgrpc.Client)
+ openOnuAc.reachableFromRemote = make(map[string]*reachabilityFromRemote)
openOnuAc.mutexDeviceHandlersMap = sync.RWMutex{}
openOnuAc.config = cfg
openOnuAc.cm = cm
@@ -216,7 +226,10 @@
}
// GetHealthStatus is used as a service readiness validation as a grpc connection
-func (oo *OpenONUAC) GetHealthStatus(ctx context.Context, empty *empty.Empty) (*health.HealthStatus, error) {
+func (oo *OpenONUAC) GetHealthStatus(ctx context.Context, clientConn *common.Connection) (*health.HealthStatus, error) {
+ // Update the remote reachability
+ oo.updateReachabilityFromRemote(ctx, clientConn)
+
return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
}
@@ -249,7 +262,13 @@
logger.Warn(ctx, "reconcile-device-voltha-device-is-nil")
return nil, errors.New("nil-device")
}
- logger.Infow(ctx, "reconcile-device", log.Fields{"device-id": device.Id})
+ logger.Infow(ctx, "reconcile-device", log.Fields{"device-id": device.Id, "parent-id": device.ParentId})
+
+ // Check whether the grpc client in the adapter of the parent device can reach us yet
+ if !oo.isReachableFromRemote(device.ProxyAddress.AdapterEndpoint, device.ParentId) {
+ return nil, status.Errorf(codes.Unavailable, "adapter-not-reachable-from-parent-%s", device.ProxyAddress.AdapterEndpoint)
+ }
+
var handler *deviceHandler
if handler = oo.getDeviceHandler(ctx, device.Id, false); handler == nil {
handler := newDeviceHandler(ctx, oo.coreClient, oo.eventProxy, device, oo)
@@ -964,6 +983,38 @@
* Parent GRPC clients
*/
+func getHash(endpoint, contextInfo string) string {
+ strToHash := endpoint + contextInfo
+ h := fnv.New128().Sum([]byte(strToHash))
+ return string(h)
+}
+
+func (oo *OpenONUAC) updateReachabilityFromRemote(ctx context.Context, remote *common.Connection) {
+ logger.Debugw(context.Background(), "updating-remote-connection-status", log.Fields{"remote": remote})
+ oo.lockReachableFromRemote.Lock()
+ defer oo.lockReachableFromRemote.Unlock()
+ endpointHash := getHash(remote.Endpoint, remote.ContextInfo)
+ if _, ok := oo.reachableFromRemote[endpointHash]; ok {
+ oo.reachableFromRemote[endpointHash].lastKeepAlive = time.Now()
+ oo.reachableFromRemote[endpointHash].keepAliveInterval = remote.KeepAliveInterval
+ return
+ }
+ logger.Debugw(context.Background(), "initial-remote-connection", log.Fields{"remote": remote})
+ oo.reachableFromRemote[endpointHash] = &reachabilityFromRemote{lastKeepAlive: time.Now(), keepAliveInterval: remote.KeepAliveInterval}
+}
+
+func (oo *OpenONUAC) isReachableFromRemote(endpoint string, contextInfo string) bool {
+ oo.lockReachableFromRemote.RLock()
+ defer oo.lockReachableFromRemote.RUnlock()
+ endpointHash := getHash(endpoint, contextInfo)
+ if _, ok := oo.reachableFromRemote[endpointHash]; ok {
+ // Assume the connection is down if we did not receive 2 keep alives in succession
+ maxKeepAliveWait := time.Duration(oo.reachableFromRemote[endpointHash].keepAliveInterval * 2)
+ return time.Since(oo.reachableFromRemote[endpointHash].lastKeepAlive) <= maxKeepAliveWait
+ }
+ return false
+}
+
func (oo *OpenONUAC) setupParentInterAdapterClient(ctx context.Context, endpoint string) error {
logger.Infow(ctx, "setting-parent-adapter-connection", log.Fields{"parent-endpoint": endpoint})
oo.lockParentAdapterClients.Lock()
@@ -972,9 +1023,10 @@
return nil
}
- childClient, err := vgrpc.NewClient(endpoint,
- oo.oltAdapterRestarted,
- vgrpc.ActivityCheck(true))
+ childClient, err := vgrpc.NewClient(
+ oo.config.AdapterEndpoint,
+ endpoint,
+ oo.oltAdapterRestarted)
if err != nil {
return err
@@ -982,7 +1034,7 @@
oo.parentAdapterClients[endpoint] = childClient
- go oo.parentAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), setAndTestAdapterServiceHandler)
+ go oo.parentAdapterClients[endpoint].Start(log.WithSpanFromContext(context.TODO(), ctx), setAndTestOltInterAdapterServiceHandler)
// Wait until we have a connection to the child adapter.
// Unlimited retries or until context expires
@@ -1037,10 +1089,10 @@
return nil
}
-// setAndTestAdapterServiceHandler is used to test whether the remote gRPC service is up
-func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+// setAndTestOltInterAdapterServiceHandler is used to test whether the remote gRPC service is up
+func setAndTestOltInterAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
svc := olt_inter_adapter_service.NewOltInterAdapterServiceClient(conn)
- if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
+ if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
return nil
}
return svc