[VOL-1997] Remove transaction timeout for a non-active rw_core

This commit cleans up the transaction processing between two
cores in a pair.  It prevents the core not processing the request
to grab the request based on a timeout only.

Since this update heavily relies on the etcd mechanism then customized
local tests (not unit as could not find a full-featured etcd mock)
were run against it as well as some basic manual tests with
kind-voltha.

There is a TODO item in this commit to implement a peer-probe
mechanism to guarantee that a core in a pair has actually died
before a switch over is done.

Minor updates after first review.
Comments updates after second review

Change-Id: Ifc1442471595a979b39251535b8ee9210e1a52df
(cherry picked from commit cc40904e208892dea8e1a2a73b52e6465d3c6d59)
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index bd84bdd..5247247 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -60,22 +60,6 @@
 	return &proxy
 }
 
-func (rhp *AdapterRequestHandlerProxy) acquireRequest(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
-	timeout := rhp.defaultRequestTimeout
-	if len(maxTimeout) > 0 {
-		timeout = maxTimeout[0]
-	}
-	txn := NewKVTransaction(transactionId)
-	if txn == nil {
-		return nil, errors.New("fail-to-create-transaction")
-	} else if txn.Acquired(timeout) {
-		log.Debugw("acquired-request", log.Fields{"xtrnsId": transactionId})
-		return txn, nil
-	} else {
-		return nil, errorTransactionNotAcquired
-	}
-}
-
 // This is a helper function that attempts to acquire the request by using the device ownership model
 func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionId string, devId string, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := rhp.defaultRequestTimeout
@@ -87,22 +71,24 @@
 		return nil, errors.New("fail-to-create-transaction")
 	}
 
-	if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}) {
-		log.Debugw("owned-by-me", log.Fields{"Id": devId})
-		if txn.Acquired(timeout) {
-			log.Debugw("processing-request", log.Fields{"Id": devId})
-			return txn, nil
-		} else {
-			return nil, errorTransactionNotAcquired
+	var acquired bool
+	var err error
+	if devId != "" {
+		var ownedByMe bool
+		if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}); err != nil {
+			log.Warnw("getting-ownership-failed", log.Fields{"deviceId": devId, "error": err})
+			return nil, kafka.ErrorTransactionInvalidId
 		}
+		acquired, err = txn.Acquired(timeout, ownedByMe)
 	} else {
-		log.Debugw("not-owned-by-me", log.Fields{"Id": devId})
-		if txn.Monitor(timeout) {
-			log.Debugw("timeout-processing-request", log.Fields{"Id": devId})
-			return txn, nil
-		} else {
-			return nil, errorTransactionNotAcquired
-		}
+		acquired, err = txn.Acquired(timeout)
+	}
+	if err == nil && acquired {
+		log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnId})
+		return txn, nil
+	} else {
+		log.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnId, "error": err})
+		return nil, kafka.ErrorTransactionNotAcquired
 	}
 }
 
@@ -120,7 +106,7 @@
 	}
 	adapter := &voltha.Adapter{}
 	deviceTypes := &voltha.DeviceTypes{}
-	transactionID := &ic.StrType{}
+	transactionId := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "adapter":
@@ -134,22 +120,23 @@
 				return nil, err
 			}
 		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+			if err := ptypes.UnmarshalAny(arg.Value, transactionId); err != nil {
 				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
 				return nil, err
 			}
 		}
 	}
-	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreId": rhp.coreInstanceId})
+	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionId": transactionId.Val, "coreId": rhp.coreInstanceId})
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// Update our adapters in memory
-			go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+		if txn, err := rhp.takeRequestOwnership(transactionId.Val, ""); err != nil {
+			if err.Error() == kafka.ErrorTransactionNotAcquired.Error() {
+				log.Debugw("Another core handled the request", log.Fields{"transactionId": transactionId})
+				// Update our adapters in memory
+				go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
+			}
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -189,9 +176,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -238,9 +224,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -300,9 +285,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -342,8 +326,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, proxyAddress.DeviceId); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -390,6 +374,16 @@
 		allPorts.Items = append(allPorts.Items, aPort)
 		return allPorts, nil
 	}
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
 }
 
@@ -421,9 +415,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -505,9 +498,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -565,9 +557,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -580,10 +571,6 @@
 	go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
 		voltha.ConnectStatus_ConnectStatus(connStatus.Val))
 
-	//if err := rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
-	//	voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
-	//	return nil, err
-	//}
 	return new(empty.Empty), nil
 }
 
@@ -627,9 +614,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -683,9 +669,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -746,9 +731,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -795,9 +779,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -839,9 +822,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -883,9 +865,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -936,9 +917,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -980,9 +960,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -1039,8 +1018,8 @@
 	// duplicates.
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -1087,9 +1066,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -1132,9 +1110,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}