VOL-2850 Reworked RequestQueue so it no longer requires a separate thread.
Also removed start()/stop() functions, which are no longer needed.
Also changed to an unbounded queue (dequeue implementation).
Change-Id: I891dcf68b64c3a08088b6d10fa30dadb8eb6f28d
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 1df4572..37a79c8 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -38,10 +38,6 @@
"google.golang.org/grpc/status"
)
-const (
- maxOrderedDeviceRequestQueueSize = 1000
-)
-
// DeviceAgent represents device agent attributes
type DeviceAgent struct {
deviceID string
@@ -81,7 +77,7 @@
agent.clusterDataProxy = cdProxy
agent.defaultTimeout = timeout
agent.device = proto.Clone(device).(*voltha.Device)
- agent.requestQueue = coreutils.NewRequestQueue(agent.deviceID, maxOrderedDeviceRequestQueueSize)
+ agent.requestQueue = coreutils.NewRequestQueue()
return &agent
}
@@ -102,10 +98,6 @@
}
}()
- // Start the request queue. If this start fails then stop will be invoked and it requires
- // that the request sequencer is present
- agent.requestQueue.Start()
-
var device *voltha.Device
if deviceToCreate == nil {
// Load the existing device
@@ -190,9 +182,6 @@
logger.Debugw("device-already-removed", log.Fields{"device-id": agent.deviceID})
}
- // Stop the request queue - no more requests can be processed
- agent.requestQueue.Stop()
-
close(agent.exitChannel)
agent.stopped = true
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 9dc873b..4208e32 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -39,10 +39,6 @@
"google.golang.org/grpc/status"
)
-const (
- maxOrderedLogicalDeviceRequestQueueSize = 1000
-)
-
// LogicalDeviceAgent represent attributes of logical device agent
type LogicalDeviceAgent struct {
logicalDeviceID string
@@ -83,7 +79,7 @@
agent.portProxies = make(map[string]*model.Proxy)
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
- agent.requestQueue = coreutils.NewRequestQueue(agent.serialNumber, maxOrderedLogicalDeviceRequestQueueSize)
+ agent.requestQueue = coreutils.NewRequestQueue()
return &agent
}
@@ -105,9 +101,6 @@
}
}()
- // Launch the request queue - it will launch a go routine
- agent.requestQueue.Start()
-
var ld *voltha.LogicalDevice
if !loadFromDB {
//Build the logical device based on information retrieved from the device adapter
@@ -246,9 +239,6 @@
logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
}
- // Stop the request queue and request complete indication
- agent.requestQueue.Stop()
-
close(agent.exitChannel)
logger.Info("logical_device-agent-stopped")
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 3f40d6a..0babfad 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -487,7 +487,6 @@
clonedLD.DatapathId = rand.Uint64()
lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
- lDeviceAgent.requestQueue.Start()
added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
assert.Nil(t, err)
assert.NotNil(t, added)
diff --git a/rw_core/utils/common.go b/rw_core/utils/common.go
deleted file mode 100644
index 679788f..0000000
--- a/rw_core/utils/common.go
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Package utils common Logger initialization
-package utils
-
-import (
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
-)
-
-var logger log.Logger
-
-func init() {
- // Setup this package so that it's log level can be modified at run time
- var err error
- logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "utils"})
- if err != nil {
- panic(err)
- }
-}
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 1030735..185a8e8 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -22,7 +22,6 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -46,94 +45,93 @@
}
type request struct {
- channel chan struct{}
- done chan struct{}
-}
-
-func newRequest() *request {
- return &request{
- channel: make(chan struct{}),
- done: make(chan struct{}),
- }
+ prev, next *request
+ notifyOnComplete chan<- struct{}
}
// RequestQueue represents a request processing queue where each request is processed to completion before another
// request is given the green light to proceed.
type RequestQueue struct {
- queue chan *request
- requestCompleteIndication chan struct{}
- queueID string
- stopOnce sync.Once
- stopped bool
+ mutex sync.Mutex
+
+ last, current *request
+ lastCompleteCh <-chan struct{}
}
-// NewRequestQueue creates a new request queue. maxQueueSize is the maximum size of the queue. queueID is used mostly
-// for logging.
-func NewRequestQueue(queueID string, maxQueueSize int) *RequestQueue {
- return &RequestQueue{
- queueID: queueID,
- queue: make(chan *request, maxQueueSize),
- requestCompleteIndication: make(chan struct{}),
- }
-}
-
-// Start starts the request processing queue in its own go routine
-func (rq *RequestQueue) Start() {
- go func() {
- for {
- req, ok := <-rq.queue
- if !ok {
- logger.Warnw("request-sequencer-queue-closed", log.Fields{"id": rq.queueID})
- break
- }
- // If the request is waiting then closing the reqChnl will trigger the request to proceed. Otherwise,
- // if the request was cancelled then this will just clean up.
- close(req.channel)
-
- // Wait for either a request complete indication or a request aborted due to timeout
- select {
- case <-req.done:
- case <-rq.requestCompleteIndication:
- }
- }
- }()
+// NewRequestQueue creates a new request queue
+func NewRequestQueue() *RequestQueue {
+ ch := make(chan struct{})
+ close(ch) // assume the "current" request is already complete
+ return &RequestQueue{lastCompleteCh: ch}
}
// WaitForGreenLight is invoked by a function processing a request to receive the green light before
// proceeding. The caller can also provide a context with timeout. The timeout will be triggered if the wait is
// too long (previous requests taking too long)
func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
- if rq.stopped {
- return status.Errorf(codes.Aborted, "queue-already-stopped-%s", rq.queueID)
+ // add ourselves to the end of the queue
+ rq.mutex.Lock()
+ waitingOn := rq.lastCompleteCh
+
+ ch := make(chan struct{})
+ rq.lastCompleteCh = ch
+ r := &request{notifyOnComplete: ch}
+
+ if rq.last != nil {
+ rq.last.next, r.prev = r, rq.last
}
- request := newRequest()
- // Queue the request
- rq.queue <- request
+ rq.last = r
+ rq.mutex.Unlock()
+
+ // wait for our turn
select {
- case <-request.channel:
- return nil
case <-ctx.Done():
- close(request.done)
+ // canceled, so cleanup
+ rq.mutex.Lock()
+ defer rq.mutex.Unlock()
+
+ if _, notified := <-waitingOn; !notified {
+ // on abort, skip our position in the queue
+ r.prev.notifyOnComplete = r.notifyOnComplete
+ // and remove ourselves from the queue
+ if r.next != nil { // if we are somewhere in the middle of the queue
+ r.prev.next = r.next
+ r.next.prev = r.prev
+ } else { // if we are at the end of the queue
+ rq.last = r.prev
+ r.prev.next = nil
+ }
+
+ } else {
+ // context is canceled, but lock has been acquired, so just release the lock immediately
+ rq.current = r
+ rq.releaseWithoutLock()
+ }
return ctx.Err()
+
+ case <-waitingOn:
+ // lock is acquired
+ rq.current = r
+ return nil
}
}
// RequestComplete must be invoked by a process when it completes processing the request. That process must have
// invoked WaitForGreenLight() before.
func (rq *RequestQueue) RequestComplete() {
- if !rq.stopped {
- rq.requestCompleteIndication <- struct{}{}
- }
+ rq.mutex.Lock()
+ defer rq.mutex.Unlock()
+
+ rq.releaseWithoutLock()
}
-// Stop must only be invoked by the process that started the request queue. Prior to invoking Stop, WaitForGreenLight
-// must be invoked.
-func (rq *RequestQueue) Stop() {
- rq.stopOnce.Do(func() {
- rq.stopped = true
- close(rq.requestCompleteIndication)
- close(rq.queue)
- })
+func (rq *RequestQueue) releaseWithoutLock() {
+ // Notify the next waiting request. This will panic if the lock is released more than once.
+ close(rq.current.notifyOnComplete)
+
+ if rq.current.next != nil {
+ rq.current.next.prev = nil
+ }
}
// Response -