[VOL-4514] Addressing device reconciliation failure
See comments on https://jira.opencord.org/browse/VOL-4514
This change is dependent on the related proto and voltha lib go
changes to be merged first. Until then jenkins will fail.
Change-Id: I8d99c3619d630677d402b9fb4b4f0bc22dd9a9f0
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index c164826..d030e72 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -22,10 +22,10 @@
"sync"
"time"
- "github.com/golang/protobuf/ptypes/empty"
vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-protos/v5/go/adapter_service"
+ "github.com/opencord/voltha-protos/v5/go/common"
"github.com/opencord/voltha-protos/v5/go/health"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
@@ -40,32 +40,35 @@
adapterLock sync.RWMutex
onAdapterRestart vgrpc.RestartedHandler
liveProbeInterval time.Duration
+ coreEndpoint string
}
-func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
svc := adapter_service.NewAdapterServiceClient(conn)
- if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
- logger.Debugw(ctx, "connection-not-ready", log.Fields{"error": err, "health": h})
+ if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
+ logger.Debugw(ctx, "remote-connection-not-ready", log.Fields{"error": err, "health": h, "requester": clientConn, "target": conn.Target()})
return nil
}
return svc
}
-func newAdapterAgent(adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration) *agent {
+func newAdapterAgent(coreEndpoint string, adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration) *agent {
return &agent{
adapter: adapter,
onAdapterRestart: onAdapterRestart,
adapterAPIEndPoint: adapter.Endpoint,
liveProbeInterval: liveProbeInterval,
+ coreEndpoint: coreEndpoint,
}
}
func (aa *agent) start(ctx context.Context) error {
// Establish grpc connection to Core
var err error
- if aa.vClient, err = vgrpc.NewClient(aa.adapterAPIEndPoint,
- aa.onAdapterRestart,
- vgrpc.ActivityCheck(true)); err != nil {
+ if aa.vClient, err = vgrpc.NewClient(
+ aa.coreEndpoint,
+ aa.adapterAPIEndPoint,
+ aa.onAdapterRestart); err != nil {
return err
}
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index 790a670..258ff2a 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -52,6 +52,7 @@
lockDeviceTypesMap sync.RWMutex
lockAdapterEndPointsMap sync.RWMutex
liveProbeInterval time.Duration
+ coreEndpoint string
}
// SetAdapterRestartedCallback is used to set the callback that needs to be invoked on an adapter restart
@@ -60,6 +61,7 @@
}
func NewAdapterManager(
+ coreEndpoint string,
dbPath *model.Path,
coreInstanceID string,
backend *db.Backend,
@@ -73,6 +75,7 @@
adapterEndpoints: make(map[Endpoint]*agent),
endpointMgr: NewEndpointManager(backend),
liveProbeInterval: liveProbeInterval,
+ coreEndpoint: coreEndpoint,
}
}
@@ -178,7 +181,7 @@
// Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
// This handler just log the restart event. The actual action taken following an adapter restart
// will be done when an adapter re-registers itself.
- aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
+ aMgr.adapterAgents[adapter.Id] = newAdapterAgent(aMgr.coreEndpoint, clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
}
return nil
diff --git a/rw_core/core/api/grpc_nbi_handler.go b/rw_core/core/api/grpc_nbi_handler.go
index eb80522..8adcdc5 100755
--- a/rw_core/core/api/grpc_nbi_handler.go
+++ b/rw_core/core/api/grpc_nbi_handler.go
@@ -26,7 +26,6 @@
"github.com/opencord/voltha-go/rw_core/core/device"
"github.com/opencord/voltha-lib-go/v7/pkg/version"
"github.com/opencord/voltha-protos/v5/go/common"
- "github.com/opencord/voltha-protos/v5/go/health"
"github.com/opencord/voltha-protos/v5/go/omci"
"github.com/opencord/voltha-protos/v5/go/voltha"
)
@@ -50,10 +49,6 @@
}
}
-func (handler *APIHandler) GetHealthStatus(ctx context.Context, empty *empty.Empty) (*health.HealthStatus, error) {
- return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
-}
-
// GetVoltha currently just returns version information
func (handler *APIHandler) GetVoltha(ctx context.Context, _ *empty.Empty) (*voltha.Voltha, error) {
logger.Debug(ctx, "GetVoltha")
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0fee3e3..83282e3 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -141,7 +141,7 @@
dbPath := model.NewDBPath(backend)
// load adapters & device types while other things are starting
- adapterMgr := adapter.NewAdapterManager(dbPath, id, backend, cf.LiveProbeInterval)
+ adapterMgr := adapter.NewAdapterManager(cf.GrpcSBIAddress, dbPath, id, backend, cf.LiveProbeInterval)
adapterMgr.Start(ctx, adapterService)
// create the core of the system, the device managers
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 1adf88c..2c69b28 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -1367,11 +1367,15 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
+ logger.Infow(ctx, "aborting-current-running-requests-after-wait", log.Fields{"device-id": agent.deviceID})
+
defer agent.requestQueue.RequestComplete()
// If any reconciling is in progress just abort it. The adapter is gone.
agent.stopReconcile()
+ logger.Infow(ctx, "aborting-current-running-requests-after-sendstop", log.Fields{"device-id": agent.deviceID})
+
// Update the Core device transient state accordingly
var updatedState core.DeviceTransientState_Types
switch agent.getTransientState() {
@@ -1515,7 +1519,7 @@
//making here to keep lifecycle of this channel within the scope of retryReconcile
agent.stopReconcilingMutex.Lock()
- agent.stopReconciling = make(chan int)
+ agent.stopReconciling = make(chan int, 1)
agent.stopReconcilingMutex.Unlock()
// defined outside the retry loop so it can be cleaned
@@ -1591,6 +1595,8 @@
break retry
}
+ logger.Debugw(ctx, "reconcile-retry-ends", log.Fields{"adapter-endpoint": agent.adapterEndpoint})
+
// Retry loop is broken, so stop any timers and drain the channel
if backoffTimer != nil && !backoffTimer.Stop() {
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 5cc1f26..354f306 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -131,7 +131,7 @@
LivenessChannelInterval: cfg.LiveProbeInterval / 2}
proxy := model.NewDBPath(backend)
- dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, backend, 5)
+ dat.adapterMgr = adapter.NewAdapterManager("test-endpoint", proxy, dat.coreInstanceID, backend, 5)
eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, cfg, dat.coreInstanceID, eventProxy)
dat.adapterMgr.Start(context.Background(), "agent-test")
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index e611019..19ed77c 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -154,7 +154,7 @@
LivenessChannelInterval: cfg.LiveProbeInterval / 2}
proxy := model.NewDBPath(backend)
- adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, backend, 5)
+ adapterMgr := adapter.NewAdapterManager("test-endpoint", proxy, lda.coreInstanceID, backend, 5)
eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, cfg, lda.coreInstanceID, eventProxy)
adapterMgr.Start(context.Background(), "logical-test")
diff --git a/rw_core/core/device/manager_sbi.go b/rw_core/core/device/manager_sbi.go
index b518b2a..dcf6599 100644
--- a/rw_core/core/device/manager_sbi.go
+++ b/rw_core/core/device/manager_sbi.go
@@ -23,11 +23,17 @@
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-protos/v5/go/common"
ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
+ "github.com/opencord/voltha-protos/v5/go/health"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
+func (dMgr *Manager) GetHealthStatus(ctx context.Context, clientConn *common.Connection) (*health.HealthStatus, error) {
+ logger.Debugw(ctx, "get-health-status-from-remote", log.Fields{"remote-client": clientConn})
+ return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
+}
+
func (dMgr *Manager) PortCreated(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
ctx = utils.WithNewSpanAndRPCMetadataContext(ctx, "PortCreated")
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index 5b8cc1b..b99ca52 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -169,16 +169,16 @@
ta.Probe.UpdateStatus(ctx, serviceName, probe.ServiceStatusStopped)
}
-func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
svc := core_service.NewCoreServiceClient(conn)
- if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != health.HealthStatus_HEALTHY {
+ if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
return nil
}
return svc
}
// gRPC service
-func (ta *Adapter) GetHealthStatus(ctx context.Context, empty *empty.Empty) (*health.HealthStatus, error) {
+func (ta *Adapter) GetHealthStatus(ctx context.Context, clientConn *common.Connection) (*health.HealthStatus, error) {
return &health.HealthStatus{State: health.HealthStatus_HEALTHY}, nil
}
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 9bfb2d8..96dfb33 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -91,9 +91,10 @@
go oltA.startGRPCService(ctx, oltA.grpcServer, oltA, "olt-grpc-service")
// Establish grpc connection to Core
- if oltA.coreClient, err = vgrpc.NewClient(oltA.coreEnpoint,
- oltA.oltRestarted,
- vgrpc.ActivityCheck(true)); err != nil {
+ if oltA.coreClient, err = vgrpc.NewClient(
+ "olt-endpoint",
+ oltA.coreEnpoint,
+ oltA.oltRestarted); err != nil {
logger.Fatal(ctx, "grpc-client-not-created")
}
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index 7712b23..d5669c3 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -86,9 +86,10 @@
go onuA.startGRPCService(ctx, onuA.grpcServer, onuA, "onu-grpc-service")
// Establish grpc connection to Core
- if onuA.coreClient, err = vgrpc.NewClient(onuA.coreEnpoint,
- onuA.onuRestarted,
- vgrpc.ActivityCheck(true)); err != nil {
+ if onuA.coreClient, err = vgrpc.NewClient(
+ "onu-endpoint",
+ onuA.coreEnpoint,
+ onuA.onuRestarted); err != nil {
logger.Fatal(ctx, "grpc-client-not-created")
}
go onuA.coreClient.Start(probeCtx, setAndTestCoreServiceHandler)
diff --git a/rw_core/test/core_nbi_handler_multi_test.go b/rw_core/test/core_nbi_handler_multi_test.go
index a8f9a0e..2856744 100755
--- a/rw_core/test/core_nbi_handler_multi_test.go
+++ b/rw_core/test/core_nbi_handler_multi_test.go
@@ -2149,7 +2149,7 @@
}
func TestSuite(t *testing.T) {
- log.SetAllLogLevel(log.FatalLevel)
+ log.SetAllLogLevel(log.DebugLevel)
// Create a context to be cancelled at the end of all tests. This will trigger closing of any ressources used.
ctx, cancel := context.WithCancel(context.Background())