[VOL-1913] Replies with errors of the form "transaction-not-acquired" are now ignored.

Also had to add a conditional branch in case no valid responses are received (the last thread to exit must clean up the connection).
The rw cores will now respond with a "transaction-not-acquired" error in place of "failed-to-seize-request" or COMPLETED_BY_OTHER.

Change-Id: I199d4a0091ba4fc1db5b8097adbad951408e5034
diff --git a/afrouter/afrouter/request.go b/afrouter/afrouter/request.go
index 0af20e3..8fc15aa 100644
--- a/afrouter/afrouter/request.go
+++ b/afrouter/afrouter/request.go
@@ -22,6 +22,8 @@
 	"errors"
 	"github.com/opencord/voltha-go/common/log"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 	"io"
 	"sync"
 )
@@ -48,6 +50,8 @@
 	isStreamingResponse bool
 }
 
+var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error()
+
 // catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
 func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
 	r.streams[connName] = stream
@@ -78,6 +82,10 @@
 	activeStream := false
 	for {
 		err = stream.RecvMsg(&frame)
+		// if this is an inactive responder, ignore everything it sends
+		if err != nil && err.Error() == transactionNotAcquiredErrorString {
+			break
+		}
 		// the first thread to reach this point (first to receive a response frame) will become the active stream
 		r.activeResponseStreamOnce.Do(func() { activeStream = true })
 		if err != nil {
@@ -123,6 +131,11 @@
 	delete(r.streams, connName)
 	streamsLeft := len(r.streams)
 
+	// handle the case where no cores are the active responder.  Should never happen, but just in case...
+	if streamsLeft == 0 {
+		r.activeResponseStreamOnce.Do(func() { activeStream = true })
+	}
+
 	// if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
 	if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
 		// request is complete, cleanup
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 2b28fa5..bd84bdd 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -72,7 +72,7 @@
 		log.Debugw("acquired-request", log.Fields{"xtrnsId": transactionId})
 		return txn, nil
 	} else {
-		return nil, errors.New("failed-to-seize-request")
+		return nil, errorTransactionNotAcquired
 	}
 }
 
@@ -93,7 +93,7 @@
 			log.Debugw("processing-request", log.Fields{"Id": devId})
 			return txn, nil
 		} else {
-			return nil, errors.New("failed-to-seize-request")
+			return nil, errorTransactionNotAcquired
 		}
 	} else {
 		log.Debugw("not-owned-by-me", log.Fields{"Id": devId})
@@ -101,7 +101,7 @@
 			log.Debugw("timeout-processing-request", log.Fields{"Id": devId})
 			return txn, nil
 		} else {
-			return nil, errors.New("device-not-owned")
+			return nil, errorTransactionNotAcquired
 		}
 	}
 }
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 7f45875..987a3f6 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -138,7 +138,7 @@
 	} else if txn.Acquired(timeout) {
 		return txn, nil
 	} else {
-		return nil, errors.New("failed-to-seize-request")
+		return nil, errorTransactionNotAcquired
 	}
 }
 
@@ -166,7 +166,7 @@
 			log.Debugw("acquired-transaction", log.Fields{"transaction-timeout": timeout})
 			return txn, nil
 		} else {
-			return nil, errors.New("failed-to-seize-request")
+			return nil, errorTransactionNotAcquired
 		}
 	} else {
 		if txn.Monitor(timeout) {
@@ -174,7 +174,7 @@
 			return txn, nil
 		} else {
 			log.Debugw("transaction-completed-by-other", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
-			return nil, errors.New(string(COMPLETED_BY_OTHER))
+			return nil, errorTransactionNotAcquired
 		}
 	}
 }
@@ -557,8 +557,8 @@
 
 	if handler.competeForTransaction() {
 		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
-			// Remove the device in memory
-			if err.Error() == (errors.New(string(COMPLETED_BY_OTHER)).Error()) {
+			if err == errorTransactionNotAcquired && !handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}) {
+				// Remove the device in memory
 				handler.deviceMgr.stopManagingDevice(id.Id)
 			}
 			return new(empty.Empty), err
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 0d30340..b2d01a4 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -37,6 +37,8 @@
 import (
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 	"time"
 )
 
@@ -50,6 +52,8 @@
 	STOPPED_WAITING_FOR_KEY
 )
 
+var errorTransactionNotAcquired = status.Error(codes.Canceled, "transaction-not-acquired")
+
 const (
 	TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
 )