[VOL-2164] Update rw-core to use the Async Kafka API

This commit consists of the following:

1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering.  With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced.  Testing did not
show and noticeable latencies.

2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable).  Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction.  The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.

3) Adapter requests are processed to completion before sending a
reponse back to the adapter.  Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter.  Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.

4) Some changes are made when retrieving a handler to execute
a device state transition.  This was necessary as there was some
transition overlap found.

Update after multiple reviews.

Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 82465ef..bffd9c4 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -17,13 +17,19 @@
 package utils
 
 import (
+	"context"
 	"os"
+	"sync"
 	"time"
 
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
+// ResponseCallback is the function signature for callbacks to execute after a response is received.
+type ResponseCallback func(rpc string, response interface{}, reqArgs ...interface{})
+
 // DeviceID represent device id attribute
 type DeviceID struct {
 	ID string
@@ -39,6 +45,97 @@
 	return os.Getenv("HOSTNAME")
 }
 
+type request struct {
+	channel chan struct{}
+	done    chan struct{}
+}
+
+func newRequest() *request {
+	return &request{
+		channel: make(chan struct{}),
+		done:    make(chan struct{}),
+	}
+}
+
+// RequestQueue represents a request processing queue where each request is processed to completion before another
+// request is given the green light to proceed.
+type RequestQueue struct {
+	queue                     chan *request
+	requestCompleteIndication chan struct{}
+	queueID                   string
+	stopOnce                  sync.Once
+	stopped                   bool
+}
+
+// NewRequestQueue creates a new request queue. maxQueueSize is the maximum size of the queue. queueID is used mostly
+// for logging.
+func NewRequestQueue(queueID string, maxQueueSize int) *RequestQueue {
+	return &RequestQueue{
+		queueID:                   queueID,
+		queue:                     make(chan *request, maxQueueSize),
+		requestCompleteIndication: make(chan struct{}),
+	}
+}
+
+// Start starts the request processing queue in its own go routine
+func (rq *RequestQueue) Start() {
+	go func() {
+		for {
+			req, ok := <-rq.queue
+			if !ok {
+				log.Warnw("request-sequencer-queue-closed", log.Fields{"id": rq.queueID})
+				break
+			}
+			// If the request is waiting then closing the reqChnl will trigger the request to proceed.  Otherwise,
+			// if the request was cancelled then this will just clean up.
+			close(req.channel)
+
+			// Wait for either a request complete indication or a request aborted due to timeout
+			select {
+			case <-req.done:
+			case <-rq.requestCompleteIndication:
+			}
+		}
+	}()
+}
+
+// WaitForGreenLight is invoked by a function processing a request to receive the green light before
+// proceeding.  The caller can also provide a context with timeout.  The timeout will be triggered if the wait is
+// too long (previous requests taking too long)
+func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
+	if rq.stopped {
+		return status.Errorf(codes.Aborted, "queue-already-stopped-%s", rq.queueID)
+	}
+	request := newRequest()
+	// Queue the request
+	rq.queue <- request
+	select {
+	case <-request.channel:
+		return nil
+	case <-ctx.Done():
+		close(request.done)
+		return ctx.Err()
+	}
+}
+
+// RequestComplete must be invoked by a process when it completes processing the request.  That process must have
+// invoked WaitForGreenLight() before.
+func (rq *RequestQueue) RequestComplete() {
+	if !rq.stopped {
+		rq.requestCompleteIndication <- struct{}{}
+	}
+}
+
+// Stop must only be invoked by the process that started the request queue.   Prior to invoking Stop, WaitForGreenLight
+// must be invoked.
+func (rq *RequestQueue) Stop() {
+	rq.stopOnce.Do(func() {
+		rq.stopped = true
+		close(rq.requestCompleteIndication)
+		close(rq.queue)
+	})
+}
+
 // Response -
 type Response struct {
 	*response
@@ -91,9 +188,9 @@
 //The error will be at the index corresponding to the order in which the channel appear in the parameter list.
 //If no errors is found then nil is returned.  This method also takes in a timeout in milliseconds. If a
 //timeout is obtained then this function will stop waiting for the remaining responses and abort.
-func WaitForNilOrErrorResponses(timeout int64, responses ...Response) []error {
+func WaitForNilOrErrorResponses(timeout time.Duration, responses ...Response) []error {
 	timedOut := make(chan struct{})
-	timer := time.AfterFunc(time.Duration(timeout)*time.Millisecond, func() { close(timedOut) })
+	timer := time.AfterFunc(timeout, func() { close(timedOut) })
 	defer timer.Stop()
 
 	gotError := false