VOL-2850 Reworked RequestQueue so it no longer requires a separate thread.
Also removed start()/stop() functions, which are no longer needed.
Also changed to an unbounded queue (dequeue implementation).
Change-Id: I891dcf68b64c3a08088b6d10fa30dadb8eb6f28d
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 1030735..185a8e8 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -22,7 +22,6 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -46,94 +45,93 @@
}
type request struct {
- channel chan struct{}
- done chan struct{}
-}
-
-func newRequest() *request {
- return &request{
- channel: make(chan struct{}),
- done: make(chan struct{}),
- }
+ prev, next *request
+ notifyOnComplete chan<- struct{}
}
// RequestQueue represents a request processing queue where each request is processed to completion before another
// request is given the green light to proceed.
type RequestQueue struct {
- queue chan *request
- requestCompleteIndication chan struct{}
- queueID string
- stopOnce sync.Once
- stopped bool
+ mutex sync.Mutex
+
+ last, current *request
+ lastCompleteCh <-chan struct{}
}
-// NewRequestQueue creates a new request queue. maxQueueSize is the maximum size of the queue. queueID is used mostly
-// for logging.
-func NewRequestQueue(queueID string, maxQueueSize int) *RequestQueue {
- return &RequestQueue{
- queueID: queueID,
- queue: make(chan *request, maxQueueSize),
- requestCompleteIndication: make(chan struct{}),
- }
-}
-
-// Start starts the request processing queue in its own go routine
-func (rq *RequestQueue) Start() {
- go func() {
- for {
- req, ok := <-rq.queue
- if !ok {
- logger.Warnw("request-sequencer-queue-closed", log.Fields{"id": rq.queueID})
- break
- }
- // If the request is waiting then closing the reqChnl will trigger the request to proceed. Otherwise,
- // if the request was cancelled then this will just clean up.
- close(req.channel)
-
- // Wait for either a request complete indication or a request aborted due to timeout
- select {
- case <-req.done:
- case <-rq.requestCompleteIndication:
- }
- }
- }()
+// NewRequestQueue creates a new request queue
+func NewRequestQueue() *RequestQueue {
+ ch := make(chan struct{})
+ close(ch) // assume the "current" request is already complete
+ return &RequestQueue{lastCompleteCh: ch}
}
// WaitForGreenLight is invoked by a function processing a request to receive the green light before
// proceeding. The caller can also provide a context with timeout. The timeout will be triggered if the wait is
// too long (previous requests taking too long)
func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
- if rq.stopped {
- return status.Errorf(codes.Aborted, "queue-already-stopped-%s", rq.queueID)
+ // add ourselves to the end of the queue
+ rq.mutex.Lock()
+ waitingOn := rq.lastCompleteCh
+
+ ch := make(chan struct{})
+ rq.lastCompleteCh = ch
+ r := &request{notifyOnComplete: ch}
+
+ if rq.last != nil {
+ rq.last.next, r.prev = r, rq.last
}
- request := newRequest()
- // Queue the request
- rq.queue <- request
+ rq.last = r
+ rq.mutex.Unlock()
+
+ // wait for our turn
select {
- case <-request.channel:
- return nil
case <-ctx.Done():
- close(request.done)
+ // canceled, so cleanup
+ rq.mutex.Lock()
+ defer rq.mutex.Unlock()
+
+ if _, notified := <-waitingOn; !notified {
+ // on abort, skip our position in the queue
+ r.prev.notifyOnComplete = r.notifyOnComplete
+ // and remove ourselves from the queue
+ if r.next != nil { // if we are somewhere in the middle of the queue
+ r.prev.next = r.next
+ r.next.prev = r.prev
+ } else { // if we are at the end of the queue
+ rq.last = r.prev
+ r.prev.next = nil
+ }
+
+ } else {
+ // context is canceled, but lock has been acquired, so just release the lock immediately
+ rq.current = r
+ rq.releaseWithoutLock()
+ }
return ctx.Err()
+
+ case <-waitingOn:
+ // lock is acquired
+ rq.current = r
+ return nil
}
}
// RequestComplete must be invoked by a process when it completes processing the request. That process must have
// invoked WaitForGreenLight() before.
func (rq *RequestQueue) RequestComplete() {
- if !rq.stopped {
- rq.requestCompleteIndication <- struct{}{}
- }
+ rq.mutex.Lock()
+ defer rq.mutex.Unlock()
+
+ rq.releaseWithoutLock()
}
-// Stop must only be invoked by the process that started the request queue. Prior to invoking Stop, WaitForGreenLight
-// must be invoked.
-func (rq *RequestQueue) Stop() {
- rq.stopOnce.Do(func() {
- rq.stopped = true
- close(rq.requestCompleteIndication)
- close(rq.queue)
- })
+func (rq *RequestQueue) releaseWithoutLock() {
+ // Notify the next waiting request. This will panic if the lock is released more than once.
+ close(rq.current.notifyOnComplete)
+
+ if rq.current.next != nil {
+ rq.current.next.prev = nil
+ }
}
// Response -