VOL-2850 Reworked RequestQueue so it no longer requires a separate thread.
Also removed start()/stop() functions, which are no longer needed.
Also changed to an unbounded queue (dequeue implementation).
Change-Id: I891dcf68b64c3a08088b6d10fa30dadb8eb6f28d
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 1df4572..37a79c8 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -38,10 +38,6 @@
"google.golang.org/grpc/status"
)
-const (
- maxOrderedDeviceRequestQueueSize = 1000
-)
-
// DeviceAgent represents device agent attributes
type DeviceAgent struct {
deviceID string
@@ -81,7 +77,7 @@
agent.clusterDataProxy = cdProxy
agent.defaultTimeout = timeout
agent.device = proto.Clone(device).(*voltha.Device)
- agent.requestQueue = coreutils.NewRequestQueue(agent.deviceID, maxOrderedDeviceRequestQueueSize)
+ agent.requestQueue = coreutils.NewRequestQueue()
return &agent
}
@@ -102,10 +98,6 @@
}
}()
- // Start the request queue. If this start fails then stop will be invoked and it requires
- // that the request sequencer is present
- agent.requestQueue.Start()
-
var device *voltha.Device
if deviceToCreate == nil {
// Load the existing device
@@ -190,9 +182,6 @@
logger.Debugw("device-already-removed", log.Fields{"device-id": agent.deviceID})
}
- // Stop the request queue - no more requests can be processed
- agent.requestQueue.Stop()
-
close(agent.exitChannel)
agent.stopped = true
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 9dc873b..4208e32 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -39,10 +39,6 @@
"google.golang.org/grpc/status"
)
-const (
- maxOrderedLogicalDeviceRequestQueueSize = 1000
-)
-
// LogicalDeviceAgent represent attributes of logical device agent
type LogicalDeviceAgent struct {
logicalDeviceID string
@@ -83,7 +79,7 @@
agent.portProxies = make(map[string]*model.Proxy)
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
- agent.requestQueue = coreutils.NewRequestQueue(agent.serialNumber, maxOrderedLogicalDeviceRequestQueueSize)
+ agent.requestQueue = coreutils.NewRequestQueue()
return &agent
}
@@ -105,9 +101,6 @@
}
}()
- // Launch the request queue - it will launch a go routine
- agent.requestQueue.Start()
-
var ld *voltha.LogicalDevice
if !loadFromDB {
//Build the logical device based on information retrieved from the device adapter
@@ -246,9 +239,6 @@
logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
}
- // Stop the request queue and request complete indication
- agent.requestQueue.Stop()
-
close(agent.exitChannel)
logger.Info("logical_device-agent-stopped")
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 3f40d6a..0babfad 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -487,7 +487,6 @@
clonedLD.DatapathId = rand.Uint64()
lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
- lDeviceAgent.requestQueue.Start()
added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
assert.Nil(t, err)
assert.NotNil(t, added)