VOL-2850 Reworked RequestQueue so it no longer requires a separate thread.

Also removed start()/stop() functions, which are no longer needed.
Also changed to an unbounded queue (dequeue implementation).

Change-Id: I891dcf68b64c3a08088b6d10fa30dadb8eb6f28d
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 1df4572..37a79c8 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -38,10 +38,6 @@
 	"google.golang.org/grpc/status"
 )
 
-const (
-	maxOrderedDeviceRequestQueueSize = 1000
-)
-
 // DeviceAgent represents device agent attributes
 type DeviceAgent struct {
 	deviceID         string
@@ -81,7 +77,7 @@
 	agent.clusterDataProxy = cdProxy
 	agent.defaultTimeout = timeout
 	agent.device = proto.Clone(device).(*voltha.Device)
-	agent.requestQueue = coreutils.NewRequestQueue(agent.deviceID, maxOrderedDeviceRequestQueueSize)
+	agent.requestQueue = coreutils.NewRequestQueue()
 	return &agent
 }
 
@@ -102,10 +98,6 @@
 		}
 	}()
 
-	// Start the request queue.  If this start fails then stop will be invoked and it requires
-	// that the request sequencer is present
-	agent.requestQueue.Start()
-
 	var device *voltha.Device
 	if deviceToCreate == nil {
 		// Load the existing device
@@ -190,9 +182,6 @@
 		logger.Debugw("device-already-removed", log.Fields{"device-id": agent.deviceID})
 	}
 
-	// Stop the request queue - no more requests can be processed
-	agent.requestQueue.Stop()
-
 	close(agent.exitChannel)
 
 	agent.stopped = true
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 9dc873b..4208e32 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -39,10 +39,6 @@
 	"google.golang.org/grpc/status"
 )
 
-const (
-	maxOrderedLogicalDeviceRequestQueueSize = 1000
-)
-
 // LogicalDeviceAgent represent attributes of logical device agent
 type LogicalDeviceAgent struct {
 	logicalDeviceID    string
@@ -83,7 +79,7 @@
 	agent.portProxies = make(map[string]*model.Proxy)
 	agent.logicalPortsNo = make(map[uint32]bool)
 	agent.defaultTimeout = timeout
-	agent.requestQueue = coreutils.NewRequestQueue(agent.serialNumber, maxOrderedLogicalDeviceRequestQueueSize)
+	agent.requestQueue = coreutils.NewRequestQueue()
 	return &agent
 }
 
@@ -105,9 +101,6 @@
 		}
 	}()
 
-	// Launch the request queue - it will launch a go routine
-	agent.requestQueue.Start()
-
 	var ld *voltha.LogicalDevice
 	if !loadFromDB {
 		//Build the logical device based on information retrieved from the device adapter
@@ -246,9 +239,6 @@
 			logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
 		}
 
-		// Stop the request queue and request complete indication
-		agent.requestQueue.Stop()
-
 		close(agent.exitChannel)
 
 		logger.Info("logical_device-agent-stopped")
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 3f40d6a..0babfad 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -487,7 +487,6 @@
 	clonedLD.DatapathId = rand.Uint64()
 	lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
 	lDeviceAgent.logicalDevice = clonedLD
-	lDeviceAgent.requestQueue.Start()
 	added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
 	assert.Nil(t, err)
 	assert.NotNil(t, added)
diff --git a/rw_core/utils/common.go b/rw_core/utils/common.go
deleted file mode 100644
index 679788f..0000000
--- a/rw_core/utils/common.go
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Package utils common Logger initialization
-package utils
-
-import (
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-)
-
-var logger log.Logger
-
-func init() {
-	// Setup this package so that it's log level can be modified at run time
-	var err error
-	logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "utils"})
-	if err != nil {
-		panic(err)
-	}
-}
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 1030735..185a8e8 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -22,7 +22,6 @@
 	"sync"
 	"time"
 
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -46,94 +45,93 @@
 }
 
 type request struct {
-	channel chan struct{}
-	done    chan struct{}
-}
-
-func newRequest() *request {
-	return &request{
-		channel: make(chan struct{}),
-		done:    make(chan struct{}),
-	}
+	prev, next       *request
+	notifyOnComplete chan<- struct{}
 }
 
 // RequestQueue represents a request processing queue where each request is processed to completion before another
 // request is given the green light to proceed.
 type RequestQueue struct {
-	queue                     chan *request
-	requestCompleteIndication chan struct{}
-	queueID                   string
-	stopOnce                  sync.Once
-	stopped                   bool
+	mutex sync.Mutex
+
+	last, current  *request
+	lastCompleteCh <-chan struct{}
 }
 
-// NewRequestQueue creates a new request queue. maxQueueSize is the maximum size of the queue. queueID is used mostly
-// for logging.
-func NewRequestQueue(queueID string, maxQueueSize int) *RequestQueue {
-	return &RequestQueue{
-		queueID:                   queueID,
-		queue:                     make(chan *request, maxQueueSize),
-		requestCompleteIndication: make(chan struct{}),
-	}
-}
-
-// Start starts the request processing queue in its own go routine
-func (rq *RequestQueue) Start() {
-	go func() {
-		for {
-			req, ok := <-rq.queue
-			if !ok {
-				logger.Warnw("request-sequencer-queue-closed", log.Fields{"id": rq.queueID})
-				break
-			}
-			// If the request is waiting then closing the reqChnl will trigger the request to proceed.  Otherwise,
-			// if the request was cancelled then this will just clean up.
-			close(req.channel)
-
-			// Wait for either a request complete indication or a request aborted due to timeout
-			select {
-			case <-req.done:
-			case <-rq.requestCompleteIndication:
-			}
-		}
-	}()
+// NewRequestQueue creates a new request queue
+func NewRequestQueue() *RequestQueue {
+	ch := make(chan struct{})
+	close(ch) // assume the "current" request is already complete
+	return &RequestQueue{lastCompleteCh: ch}
 }
 
 // WaitForGreenLight is invoked by a function processing a request to receive the green light before
 // proceeding.  The caller can also provide a context with timeout.  The timeout will be triggered if the wait is
 // too long (previous requests taking too long)
 func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
-	if rq.stopped {
-		return status.Errorf(codes.Aborted, "queue-already-stopped-%s", rq.queueID)
+	// add ourselves to the end of the queue
+	rq.mutex.Lock()
+	waitingOn := rq.lastCompleteCh
+
+	ch := make(chan struct{})
+	rq.lastCompleteCh = ch
+	r := &request{notifyOnComplete: ch}
+
+	if rq.last != nil {
+		rq.last.next, r.prev = r, rq.last
 	}
-	request := newRequest()
-	// Queue the request
-	rq.queue <- request
+	rq.last = r
+	rq.mutex.Unlock()
+
+	// wait for our turn
 	select {
-	case <-request.channel:
-		return nil
 	case <-ctx.Done():
-		close(request.done)
+		// canceled, so cleanup
+		rq.mutex.Lock()
+		defer rq.mutex.Unlock()
+
+		if _, notified := <-waitingOn; !notified {
+			// on abort, skip our position in the queue
+			r.prev.notifyOnComplete = r.notifyOnComplete
+			// and remove ourselves from the queue
+			if r.next != nil { // if we are somewhere in the middle of the queue
+				r.prev.next = r.next
+				r.next.prev = r.prev
+			} else { // if we are at the end of the queue
+				rq.last = r.prev
+				r.prev.next = nil
+			}
+
+		} else {
+			// context is canceled, but lock has been acquired, so just release the lock immediately
+			rq.current = r
+			rq.releaseWithoutLock()
+		}
 		return ctx.Err()
+
+	case <-waitingOn:
+		// lock is acquired
+		rq.current = r
+		return nil
 	}
 }
 
 // RequestComplete must be invoked by a process when it completes processing the request.  That process must have
 // invoked WaitForGreenLight() before.
 func (rq *RequestQueue) RequestComplete() {
-	if !rq.stopped {
-		rq.requestCompleteIndication <- struct{}{}
-	}
+	rq.mutex.Lock()
+	defer rq.mutex.Unlock()
+
+	rq.releaseWithoutLock()
 }
 
-// Stop must only be invoked by the process that started the request queue.   Prior to invoking Stop, WaitForGreenLight
-// must be invoked.
-func (rq *RequestQueue) Stop() {
-	rq.stopOnce.Do(func() {
-		rq.stopped = true
-		close(rq.requestCompleteIndication)
-		close(rq.queue)
-	})
+func (rq *RequestQueue) releaseWithoutLock() {
+	// Notify the next waiting request.  This will panic if the lock is released more than once.
+	close(rq.current.notifyOnComplete)
+
+	if rq.current.next != nil {
+		rq.current.next.prev = nil
+	}
 }
 
 // Response -