VOL-2850 Reworked RequestQueue so it no longer requires a separate thread.
Also removed start()/stop() functions, which are no longer needed.
Also changed to an unbounded queue (dequeue implementation).
Change-Id: I891dcf68b64c3a08088b6d10fa30dadb8eb6f28d
diff --git a/rw_core/utils/common.go b/rw_core/utils/common.go
deleted file mode 100644
index 679788f..0000000
--- a/rw_core/utils/common.go
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Package utils common Logger initialization
-package utils
-
-import (
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
-)
-
-var logger log.Logger
-
-func init() {
- // Setup this package so that it's log level can be modified at run time
- var err error
- logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "utils"})
- if err != nil {
- panic(err)
- }
-}
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 1030735..185a8e8 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -22,7 +22,6 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -46,94 +45,93 @@
}
type request struct {
- channel chan struct{}
- done chan struct{}
-}
-
-func newRequest() *request {
- return &request{
- channel: make(chan struct{}),
- done: make(chan struct{}),
- }
+ prev, next *request
+ notifyOnComplete chan<- struct{}
}
// RequestQueue represents a request processing queue where each request is processed to completion before another
// request is given the green light to proceed.
type RequestQueue struct {
- queue chan *request
- requestCompleteIndication chan struct{}
- queueID string
- stopOnce sync.Once
- stopped bool
+ mutex sync.Mutex
+
+ last, current *request
+ lastCompleteCh <-chan struct{}
}
-// NewRequestQueue creates a new request queue. maxQueueSize is the maximum size of the queue. queueID is used mostly
-// for logging.
-func NewRequestQueue(queueID string, maxQueueSize int) *RequestQueue {
- return &RequestQueue{
- queueID: queueID,
- queue: make(chan *request, maxQueueSize),
- requestCompleteIndication: make(chan struct{}),
- }
-}
-
-// Start starts the request processing queue in its own go routine
-func (rq *RequestQueue) Start() {
- go func() {
- for {
- req, ok := <-rq.queue
- if !ok {
- logger.Warnw("request-sequencer-queue-closed", log.Fields{"id": rq.queueID})
- break
- }
- // If the request is waiting then closing the reqChnl will trigger the request to proceed. Otherwise,
- // if the request was cancelled then this will just clean up.
- close(req.channel)
-
- // Wait for either a request complete indication or a request aborted due to timeout
- select {
- case <-req.done:
- case <-rq.requestCompleteIndication:
- }
- }
- }()
+// NewRequestQueue creates a new request queue
+func NewRequestQueue() *RequestQueue {
+ ch := make(chan struct{})
+ close(ch) // assume the "current" request is already complete
+ return &RequestQueue{lastCompleteCh: ch}
}
// WaitForGreenLight is invoked by a function processing a request to receive the green light before
// proceeding. The caller can also provide a context with timeout. The timeout will be triggered if the wait is
// too long (previous requests taking too long)
func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
- if rq.stopped {
- return status.Errorf(codes.Aborted, "queue-already-stopped-%s", rq.queueID)
+ // add ourselves to the end of the queue
+ rq.mutex.Lock()
+ waitingOn := rq.lastCompleteCh
+
+ ch := make(chan struct{})
+ rq.lastCompleteCh = ch
+ r := &request{notifyOnComplete: ch}
+
+ if rq.last != nil {
+ rq.last.next, r.prev = r, rq.last
}
- request := newRequest()
- // Queue the request
- rq.queue <- request
+ rq.last = r
+ rq.mutex.Unlock()
+
+ // wait for our turn
select {
- case <-request.channel:
- return nil
case <-ctx.Done():
- close(request.done)
+ // canceled, so cleanup
+ rq.mutex.Lock()
+ defer rq.mutex.Unlock()
+
+ if _, notified := <-waitingOn; !notified {
+ // on abort, skip our position in the queue
+ r.prev.notifyOnComplete = r.notifyOnComplete
+ // and remove ourselves from the queue
+ if r.next != nil { // if we are somewhere in the middle of the queue
+ r.prev.next = r.next
+ r.next.prev = r.prev
+ } else { // if we are at the end of the queue
+ rq.last = r.prev
+ r.prev.next = nil
+ }
+
+ } else {
+ // context is canceled, but lock has been acquired, so just release the lock immediately
+ rq.current = r
+ rq.releaseWithoutLock()
+ }
return ctx.Err()
+
+ case <-waitingOn:
+ // lock is acquired
+ rq.current = r
+ return nil
}
}
// RequestComplete must be invoked by a process when it completes processing the request. That process must have
// invoked WaitForGreenLight() before.
func (rq *RequestQueue) RequestComplete() {
- if !rq.stopped {
- rq.requestCompleteIndication <- struct{}{}
- }
+ rq.mutex.Lock()
+ defer rq.mutex.Unlock()
+
+ rq.releaseWithoutLock()
}
-// Stop must only be invoked by the process that started the request queue. Prior to invoking Stop, WaitForGreenLight
-// must be invoked.
-func (rq *RequestQueue) Stop() {
- rq.stopOnce.Do(func() {
- rq.stopped = true
- close(rq.requestCompleteIndication)
- close(rq.queue)
- })
+func (rq *RequestQueue) releaseWithoutLock() {
+ // Notify the next waiting request. This will panic if the lock is released more than once.
+ close(rq.current.notifyOnComplete)
+
+ if rq.current.next != nil {
+ rq.current.next.prev = nil
+ }
}
// Response -