[VOL-1997] Remove transaction timeout for a non-active rw_core

This commit cleans up the transaction processing between two
cores in a pair.  It prevents the core not processing the request
to grab the request based on a timeout only.

Since this update heavily relies on the etcd mechanism then customized
local tests (not unit as could not find a full-featured etcd mock)
were run against it as well as some basic manual tests with
kind-voltha.

There is a TODO item in this commit to implement a peer-probe
mechanism to guarantee that a core in a pair has actually died
before a switch over is done.

Minor updates after first review.
Comments updates after second review

Change-Id: Ifc1442471595a979b39251535b8ee9210e1a52df
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 987a3f6..482abfc 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -31,7 +31,6 @@
 	"google.golang.org/grpc/status"
 	"io"
 	"sync"
-	"time"
 )
 
 const (
@@ -124,62 +123,41 @@
 	return handler.coreInCompetingMode
 }
 
-// acquireRequest handles transaction processing for device creation and  list requests, i.e. when there are no
-// specific id requested (list scenario) or id present in the request (creation use case).
-func (handler *APIHandler) acquireRequest(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+// takeRequestOwnership creates a transaction in the dB for this request and handles the logic of transaction
+// acquisition.  If the device is owned by this Core (in a core-pair) then acquire the transaction with a
+// timeout value (in the event this Core dies the transaction times out in the dB causing the other Core in the
+// core-pair to proceed with the it).  If the device is not owned then this Core will just monitor the transaction
+// for potential timeouts.
+func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := handler.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
 	}
-	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
 	txn, err := handler.createKvTransaction(ctx)
 	if txn == nil {
 		return nil, err
-	} else if txn.Acquired(timeout) {
+	}
+	var acquired bool
+	if id != nil {
+		var ownedByMe bool
+		if ownedByMe, err = handler.core.deviceOwnership.OwnedByMe(id); err != nil {
+			log.Warnw("getting-ownership-failed", log.Fields{"deviceId": id, "error": err})
+			return nil, errorTransactionInvalidId
+		}
+		acquired, err = txn.Acquired(timeout, ownedByMe)
+	} else {
+		acquired, err = txn.Acquired(timeout)
+	}
+	if err == nil && acquired {
+		log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnId})
 		return txn, nil
 	} else {
+		log.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnId, "error": err})
 		return nil, errorTransactionNotAcquired
 	}
 }
 
-// takeRequestOwnership creates a transaction in the dB for this request and handles the logic of transaction
-// acquisition.  If the device is owned by this Core (in a core-pair) then acquire the transaction with a
-// timeout value (in the event of a timeout the other Core in the core-pair will proceed with the transaction).  If the
-// device is not owned then this Core will just monitor the transaction for potential timeouts.
-func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
-	t := time.Now()
-	timeout := handler.defaultRequestTimeout
-	if len(maxTimeout) > 0 {
-		timeout = maxTimeout[0]
-	}
-	txn, err := handler.createKvTransaction(ctx)
-	if txn == nil {
-		return nil, err
-	}
-
-	owned := false
-	if id != nil {
-		owned = handler.core.deviceOwnership.OwnedByMe(id)
-	}
-	if owned {
-		if txn.Acquired(timeout) {
-			log.Debugw("acquired-transaction", log.Fields{"transaction-timeout": timeout})
-			return txn, nil
-		} else {
-			return nil, errorTransactionNotAcquired
-		}
-	} else {
-		if txn.Monitor(timeout) {
-			log.Debugw("acquired-transaction-after-timeout", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
-			return txn, nil
-		} else {
-			log.Debugw("transaction-completed-by-other", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
-			return nil, errorTransactionNotAcquired
-		}
-	}
-}
-
-// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
+// waitForNilResponseOnSuccess is a helper function to wait for a response on channel monitorCh where an nil
 // response is expected in a successful scenario
 func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
 	select {
@@ -390,7 +368,7 @@
 func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
 	log.Debug("ListLogicalDevices-request")
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireRequest(ctx); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
 			return &voltha.LogicalDevices{}, err
 		} else {
 			defer txn.Close()
@@ -455,7 +433,7 @@
 
 	if handler.competeForTransaction() {
 		// There are no device Id present in this function.
-		if txn, err := handler.acquireRequest(ctx); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
 			return &voltha.Device{}, err
 		} else {
 			defer txn.Close()
@@ -557,9 +535,11 @@
 
 	if handler.competeForTransaction() {
 		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
-			if err == errorTransactionNotAcquired && !handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}) {
-				// Remove the device in memory
-				handler.deviceMgr.stopManagingDevice(id.Id)
+			if err == errorTransactionNotAcquired {
+				if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}); !ownedByMe && err == nil {
+					// Remove the device in memory
+					handler.deviceMgr.stopManagingDevice(id.Id)
+				}
 			}
 			return new(empty.Empty), err
 		} else {
@@ -808,7 +788,7 @@
 	//TODO: Update this logic once the OF Controller (OFAgent in this case) can include a transaction Id in its
 	// request.  For performance reason we can let both Cores in a Core-Pair forward the Packet to the adapters and
 	// let once of the shim layer (kafka proxy or adapter request handler filters out the duplicate packet)
-	if handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: packet.Id}) {
+	if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: packet.Id}); ownedByMe && err == nil {
 		agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
 		agent.packetOut(packet.PacketOut)
 	}