[VOL-4381] Fix missing device unlock in the core when invoking
some image request. Also fixes a unit test issue.
Change-Id: I8afa4b4cb641509340e912d4f14ea93815135f71
diff --git a/VERSION b/VERSION
index 4a36342..05d78bc 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.0
+3.0.1-dev
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index 0fda20e..2fa8dac 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -358,10 +358,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
device := agent.getDeviceReadOnlyWithoutLock()
if !agent.proceedWithRequest(device) {
+ agent.requestQueue.RequestComplete()
return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
}
@@ -375,8 +375,10 @@
"device-type": agent.deviceType,
"adapter-endpoint": device.AdapterEndpoint,
})
+ agent.requestQueue.RequestComplete()
return nil, err
}
+ agent.requestQueue.RequestComplete()
return client.GetImageDownloadStatus(ctx, &ic.ImageDownloadMessage{
Device: device,
Image: img,
@@ -644,6 +646,7 @@
defer cancel()
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ agent.requestQueue.RequestComplete()
return client.GetOnuImageStatus(subCtx, request)
}
@@ -715,7 +718,6 @@
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
agent.requestQueue.RequestComplete()
-
return client.AbortOnuImageUpgrade(subCtx, request)
}
@@ -750,6 +752,7 @@
defer cancel()
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ agent.requestQueue.RequestComplete()
return client.CommitOnuImage(subCtx, request)
}
diff --git a/rw_core/test/core_nbi_handler_multi_test.go b/rw_core/test/core_nbi_handler_multi_test.go
index d65eb39..9a76069 100755
--- a/rw_core/test/core_nbi_handler_multi_test.go
+++ b/rw_core/test/core_nbi_handler_multi_test.go
@@ -1704,7 +1704,6 @@
}
func (nb *NBTest) testMPLSFlowsAddition(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
-
// Create and enable device with valid data
oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
assert.Nil(t, err)
@@ -1727,9 +1726,18 @@
nb.verifyLogicalDevices(t, oltDevice, nbi)
logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
- assert.NoError(t, err)
+ assert.Nil(t, err)
+ assert.NotNil(t, logicalDevices)
+ var logicalDevice *voltha.LogicalDevice
+ for _, ld := range logicalDevices.Items {
+ if ld.RootDeviceId == oltDevice.Id {
+ logicalDevice = ld
+ break
+ }
+ }
+ assert.NotNil(t, logicalDevice)
- testLogger.Infow(getContext(), "list-logical-devices", log.Fields{"logical-device": logicalDevices.GetItems()[0]})
+ testLogger.Infow(getContext(), "list-logical-devices", log.Fields{"logical-device": logicalDevice})
// Add a meter to the logical device, which the flow can refer to
meterMod := &ofp.OfpMeterMod{
Command: ofp.OfpMeterModCommand_OFPMC_ADD,
@@ -1749,14 +1757,14 @@
})
assert.NoError(t, err)
- meters, err := nbi.ListLogicalDeviceMeters(getContext(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+ meters, err := nbi.ListLogicalDeviceMeters(getContext(), &voltha.ID{Id: logicalDevice.Id})
assert.NoError(t, err)
for _, item := range meters.GetItems() {
testLogger.Infow(getContext(), "list-logical-device-meters", log.Fields{"meter-config": item.GetConfig()})
}
- logicalPorts, err := nbi.ListLogicalDevicePorts(context.Background(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+ logicalPorts, err := nbi.ListLogicalDevicePorts(context.Background(), &voltha.ID{Id: logicalDevice.Id})
assert.NoError(t, err)
m := jsonpb.Marshaler{}
logicalPortsJson, err := m.MarshalToString(logicalPorts)
@@ -1768,7 +1776,7 @@
getOLTDownstreamMplsDoubleTagRules, getOLTDownstreamRules, getOnuDownstreamRules}
for _, callable := range callables {
- _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &ofp.FlowTableUpdate{Id: logicalDevices.GetItems()[0].GetId(), FlowMod: callable()})
+ _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &ofp.FlowTableUpdate{Id: logicalDevice.Id, FlowMod: callable()})
assert.NoError(t, err)
}
@@ -2031,8 +2039,8 @@
// Wait for adapters to be fully running
var areAdaptersRunning isConditionSatisfied = func() bool {
ready := true
- nb.oltAdaptersLock.RLock()
- defer nb.oltAdaptersLock.RUnlock()
+ nb.onuAdaptersLock.RLock()
+ defer nb.onuAdaptersLock.RUnlock()
for _, adapters := range nb.onuAdapters {
for _, a := range adapters {
ready = ready && a.IsReady()
@@ -2041,8 +2049,8 @@
}
}
}
- nb.onuAdaptersLock.RLock()
- defer nb.onuAdaptersLock.RUnlock()
+ nb.oltAdaptersLock.RLock()
+ defer nb.oltAdaptersLock.RUnlock()
for _, adapters := range nb.oltAdapters {
for _, a := range adapters {
ready = ready && a.IsReady()
@@ -2061,6 +2069,39 @@
nb.testAdapterRegistration(t, nbi)
}
+func WaitForCoreConnectionToAdapters(ctx context.Context, t *testing.T, nb *NBTest, nbi voltha.VolthaServiceClient) {
+ // Create/register the adapters
+ start := time.Now()
+ numAdapters := 0
+ nb.oltAdaptersLock.RLock()
+ numAdapters += len(nb.onuAdapters)
+ nb.oltAdaptersLock.RUnlock()
+ nb.onuAdaptersLock.RLock()
+ numAdapters += len(nb.oltAdapters)
+ nb.onuAdaptersLock.RUnlock()
+
+ // Wait for adapters to be fully running
+ var isCoreConnectedToAdapters isConditionSatisfied = func() bool {
+ adpts, err := nbi.ListAdapters(getContext(), &empty.Empty{})
+ if err != nil || len(adpts.Items) < numAdapters {
+ return false
+ }
+ // Now check the last communication time
+ for _, adpt := range adpts.Items {
+ if time.Since(time.Unix(adpt.LastCommunication, 0)) > 5*time.Second {
+ return false
+ }
+ }
+ return true
+ }
+ err := waitUntilCondition(nb.internalTimeout, isCoreConnectedToAdapters)
+ assert.Nil(t, err)
+ logger.Infow(ctx, "core-connection-to-adapters-is-ready", log.Fields{"time-taken": time.Since(start)})
+
+ // Test adapter registration
+ nb.testAdapterRegistration(t, nbi)
+}
+
//TestLogDeviceUpdate is used to extract and format device updates. Not to be run on jenkins.
func TestLogDeviceUpdate(t *testing.T) {
t.Skip()
@@ -2134,6 +2175,9 @@
// Setup the adapters
setupAdapters(ctx, t, nb, coreAPIEndpoint, nbi)
+ // Wait until the Core can connect to the adapters
+ WaitForCoreConnectionToAdapters(ctx, t, nb, nbi)
+
// Start the change events listener and dispatcher to receive all change events from the Core
nb.changeEventLister = NewChangedEventListener(len(nb.oltAdapters))
ch := make(chan *ofp.ChangeEvent, (nb.numONUPerOLT+1)*len(nb.oltAdapters))