[VOL-1997] Remove transaction timeout for a non-active rw_core

This commit cleans up the transaction processing between two
cores in a pair.  It prevents the core not processing the request
to grab the request based on a timeout only.

Since this update heavily relies on the etcd mechanism then customized
local tests (not unit as could not find a full-featured etcd mock)
were run against it as well as some basic manual tests with
kind-voltha.

There is a TODO item in this commit to implement a peer-probe
mechanism to guarantee that a core in a pair has actually died
before a switch over is done.

Minor updates after first review.
Comments updates after second review

Change-Id: Ifc1442471595a979b39251535b8ee9210e1a52df
(cherry picked from commit cc40904e208892dea8e1a2a73b52e6465d3c6d59)
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 88c13ae..3af1ef2 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -127,7 +127,15 @@
 
 	c.writeLock.Lock()
 	defer c.writeLock.Unlock()
-	_, err := c.ectdAPI.Put(ctx, key, val)
+
+	var err error
+	// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
+	// that KV key permanent instead of automatically removing it after a lease expiration
+	if leaseID, ok := c.keyReservations[key]; ok {
+		_, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
+	} else {
+		_, err = c.ectdAPI.Put(ctx, key, val)
+	}
 	cancel()
 	if err != nil {
 		switch err {
@@ -158,8 +166,8 @@
 	c.writeLock.Lock()
 	defer c.writeLock.Unlock()
 
-	// delete the keys
-	if _, err := c.ectdAPI.Delete(ctx, key, v3Client.WithPrefix()); err != nil {
+	// delete the key
+	if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
 		log.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
 		return err
 	}
@@ -309,7 +317,7 @@
 func (c *EtcdClient) Watch(key string) chan *Event {
 	w := v3Client.NewWatcher(c.ectdAPI)
 	ctx, cancel := context.WithCancel(context.Background())
-	channel := w.Watch(ctx, key, v3Client.WithPrefix())
+	channel := w.Watch(ctx, key)
 
 	// Create a new channel
 	ch := make(chan *Event, maxClientChannelBufferSize)
@@ -317,8 +325,6 @@
 	// Keep track of the created channels so they can be closed when required
 	channelMap := make(map[chan *Event]v3Client.Watcher)
 	channelMap[ch] = w
-	//c.writeLock.Lock()
-	//defer c.writeLock.Unlock()
 
 	channelMaps := c.addChannelMap(key, channelMap)
 
@@ -412,7 +418,6 @@
 	defer close(ch)
 	for resp := range channel {
 		for _, ev := range resp.Events {
-			//log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
 			ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value, ev.Kv.Version)
 		}
 	}
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 56b5fa1..f9b3319 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -46,6 +46,9 @@
 	FromTopic      = "fromTopic"
 )
 
+var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
+var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
+
 // requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
 // obtained from that channel, this interface is invoked.   This is used to handle
 // async requests into the Core via the kafka messaging bus
@@ -674,15 +677,20 @@
 				// Check for errors first
 				lastIndex := len(out) - 1
 				if out[lastIndex].Interface() != nil { // Error
-					if goError, ok := out[lastIndex].Interface().(error); ok {
-						returnError = &ic.Error{Reason: goError.Error()}
+					if retError, ok := out[lastIndex].Interface().(error); ok {
+						if retError.Error() == ErrorTransactionNotAcquired.Error() {
+							log.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
+							return // Ignore - process is in competing mode and ignored transaction
+						}
+						returnError = &ic.Error{Reason: retError.Error()}
 						returnedValues = append(returnedValues, returnError)
 					} else { // Should never happen
 						returnError = &ic.Error{Reason: "incorrect-error-returns"}
 						returnedValues = append(returnedValues, returnError)
 					}
 				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
-					return // Ignore case - when core is in competing mode
+					log.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
+					return // Ignore - should not happen
 				} else { // Non-error case
 					success = true
 					for idx, val := range out {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index bd84bdd..5247247 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -60,22 +60,6 @@
 	return &proxy
 }
 
-func (rhp *AdapterRequestHandlerProxy) acquireRequest(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
-	timeout := rhp.defaultRequestTimeout
-	if len(maxTimeout) > 0 {
-		timeout = maxTimeout[0]
-	}
-	txn := NewKVTransaction(transactionId)
-	if txn == nil {
-		return nil, errors.New("fail-to-create-transaction")
-	} else if txn.Acquired(timeout) {
-		log.Debugw("acquired-request", log.Fields{"xtrnsId": transactionId})
-		return txn, nil
-	} else {
-		return nil, errorTransactionNotAcquired
-	}
-}
-
 // This is a helper function that attempts to acquire the request by using the device ownership model
 func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionId string, devId string, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := rhp.defaultRequestTimeout
@@ -87,22 +71,24 @@
 		return nil, errors.New("fail-to-create-transaction")
 	}
 
-	if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}) {
-		log.Debugw("owned-by-me", log.Fields{"Id": devId})
-		if txn.Acquired(timeout) {
-			log.Debugw("processing-request", log.Fields{"Id": devId})
-			return txn, nil
-		} else {
-			return nil, errorTransactionNotAcquired
+	var acquired bool
+	var err error
+	if devId != "" {
+		var ownedByMe bool
+		if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}); err != nil {
+			log.Warnw("getting-ownership-failed", log.Fields{"deviceId": devId, "error": err})
+			return nil, kafka.ErrorTransactionInvalidId
 		}
+		acquired, err = txn.Acquired(timeout, ownedByMe)
 	} else {
-		log.Debugw("not-owned-by-me", log.Fields{"Id": devId})
-		if txn.Monitor(timeout) {
-			log.Debugw("timeout-processing-request", log.Fields{"Id": devId})
-			return txn, nil
-		} else {
-			return nil, errorTransactionNotAcquired
-		}
+		acquired, err = txn.Acquired(timeout)
+	}
+	if err == nil && acquired {
+		log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnId})
+		return txn, nil
+	} else {
+		log.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnId, "error": err})
+		return nil, kafka.ErrorTransactionNotAcquired
 	}
 }
 
@@ -120,7 +106,7 @@
 	}
 	adapter := &voltha.Adapter{}
 	deviceTypes := &voltha.DeviceTypes{}
-	transactionID := &ic.StrType{}
+	transactionId := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "adapter":
@@ -134,22 +120,23 @@
 				return nil, err
 			}
 		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+			if err := ptypes.UnmarshalAny(arg.Value, transactionId); err != nil {
 				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
 				return nil, err
 			}
 		}
 	}
-	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreId": rhp.coreInstanceId})
+	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionId": transactionId.Val, "coreId": rhp.coreInstanceId})
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// Update our adapters in memory
-			go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+		if txn, err := rhp.takeRequestOwnership(transactionId.Val, ""); err != nil {
+			if err.Error() == kafka.ErrorTransactionNotAcquired.Error() {
+				log.Debugw("Another core handled the request", log.Fields{"transactionId": transactionId})
+				// Update our adapters in memory
+				go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
+			}
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -189,9 +176,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -238,9 +224,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -300,9 +285,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -342,8 +326,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, proxyAddress.DeviceId); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -390,6 +374,16 @@
 		allPorts.Items = append(allPorts.Items, aPort)
 		return allPorts, nil
 	}
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
 }
 
@@ -421,9 +415,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -505,9 +498,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -565,9 +557,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -580,10 +571,6 @@
 	go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
 		voltha.ConnectStatus_ConnectStatus(connStatus.Val))
 
-	//if err := rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
-	//	voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
-	//	return nil, err
-	//}
 	return new(empty.Empty), nil
 }
 
@@ -627,9 +614,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -683,9 +669,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -746,9 +731,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -795,9 +779,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -839,9 +822,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -883,9 +865,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -936,9 +917,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -980,9 +960,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -1039,8 +1018,8 @@
 	// duplicates.
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -1087,9 +1066,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
@@ -1132,9 +1110,8 @@
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
-			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
-			return nil, nil
+			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+			return nil, err
 		} else {
 			defer txn.Close()
 		}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 91c359a..1b0f586 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -922,6 +922,10 @@
 	dMgr.addDeviceAgentToMap(agent)
 	agent.start(nil, false)
 
+	// Since this Core has handled this request then it therefore owns this child device.  Set the
+	// ownership of this device to this Core
+	dMgr.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: agent.deviceId})
+
 	// Activate the child device
 	if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
 		go agent.enableDevice(nil)
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index 4b30188..ade876b 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -110,7 +110,7 @@
 	return true
 }
 
-func (da *DeviceOwnership) MonitorOwnership(id string, chnl chan int) {
+func (da *DeviceOwnership) monitorOwnership(id string, chnl chan int) {
 	log.Debugw("start-device-monitoring", log.Fields{"id": id})
 	op := "starting"
 	exit := false
@@ -186,9 +186,9 @@
 	return deviceIds
 }
 
-// OwnedByMe returns where this Core instance active owns this device.   This function will automatically
+// OwnedByMe returns whether this Core instance active owns this device.   This function will automatically
 // trigger the process to monitor the device and update the device ownership regularly.
-func (da *DeviceOwnership) OwnedByMe(id interface{}) bool {
+func (da *DeviceOwnership) OwnedByMe(id interface{}) (bool, error) {
 	// Retrieve the ownership key based on the id
 	var ownershipKey string
 	var err error
@@ -196,7 +196,7 @@
 	var cache bool
 	if ownershipKey, idStr, cache, err = da.getOwnershipKey(id); err != nil {
 		log.Warnw("no-ownershipkey", log.Fields{"error": err})
-		return false
+		return false, err
 	}
 
 	// Update the deviceToKey map, if not from cache
@@ -215,7 +215,7 @@
 	deviceOwned, ownedByMe := da.getOwnership(ownershipKey)
 	if deviceOwned {
 		log.Debugw("ownership", log.Fields{"Id": ownershipKey, "owned": ownedByMe})
-		return ownedByMe
+		return ownedByMe, nil
 	}
 	// Not owned by me or maybe nobody else.  Try to reserve it
 	reservedByMe := da.tryToReserveKey(ownershipKey)
@@ -229,8 +229,8 @@
 	da.deviceMapLock.Unlock()
 
 	log.Debugw("set-new-ownership", log.Fields{"Id": ownershipKey, "owned": reservedByMe})
-	go da.MonitorOwnership(ownershipKey, myChnl)
-	return reservedByMe
+	go da.monitorOwnership(ownershipKey, myChnl)
+	return reservedByMe, nil
 }
 
 //AbandonDevice must be invoked whenever a device is deleted from the Core
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 987a3f6..482abfc 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -31,7 +31,6 @@
 	"google.golang.org/grpc/status"
 	"io"
 	"sync"
-	"time"
 )
 
 const (
@@ -124,62 +123,41 @@
 	return handler.coreInCompetingMode
 }
 
-// acquireRequest handles transaction processing for device creation and  list requests, i.e. when there are no
-// specific id requested (list scenario) or id present in the request (creation use case).
-func (handler *APIHandler) acquireRequest(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+// takeRequestOwnership creates a transaction in the dB for this request and handles the logic of transaction
+// acquisition.  If the device is owned by this Core (in a core-pair) then acquire the transaction with a
+// timeout value (in the event this Core dies the transaction times out in the dB causing the other Core in the
+// core-pair to proceed with the it).  If the device is not owned then this Core will just monitor the transaction
+// for potential timeouts.
+func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := handler.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
 	}
-	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
 	txn, err := handler.createKvTransaction(ctx)
 	if txn == nil {
 		return nil, err
-	} else if txn.Acquired(timeout) {
+	}
+	var acquired bool
+	if id != nil {
+		var ownedByMe bool
+		if ownedByMe, err = handler.core.deviceOwnership.OwnedByMe(id); err != nil {
+			log.Warnw("getting-ownership-failed", log.Fields{"deviceId": id, "error": err})
+			return nil, errorTransactionInvalidId
+		}
+		acquired, err = txn.Acquired(timeout, ownedByMe)
+	} else {
+		acquired, err = txn.Acquired(timeout)
+	}
+	if err == nil && acquired {
+		log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnId})
 		return txn, nil
 	} else {
+		log.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnId, "error": err})
 		return nil, errorTransactionNotAcquired
 	}
 }
 
-// takeRequestOwnership creates a transaction in the dB for this request and handles the logic of transaction
-// acquisition.  If the device is owned by this Core (in a core-pair) then acquire the transaction with a
-// timeout value (in the event of a timeout the other Core in the core-pair will proceed with the transaction).  If the
-// device is not owned then this Core will just monitor the transaction for potential timeouts.
-func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
-	t := time.Now()
-	timeout := handler.defaultRequestTimeout
-	if len(maxTimeout) > 0 {
-		timeout = maxTimeout[0]
-	}
-	txn, err := handler.createKvTransaction(ctx)
-	if txn == nil {
-		return nil, err
-	}
-
-	owned := false
-	if id != nil {
-		owned = handler.core.deviceOwnership.OwnedByMe(id)
-	}
-	if owned {
-		if txn.Acquired(timeout) {
-			log.Debugw("acquired-transaction", log.Fields{"transaction-timeout": timeout})
-			return txn, nil
-		} else {
-			return nil, errorTransactionNotAcquired
-		}
-	} else {
-		if txn.Monitor(timeout) {
-			log.Debugw("acquired-transaction-after-timeout", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
-			return txn, nil
-		} else {
-			log.Debugw("transaction-completed-by-other", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
-			return nil, errorTransactionNotAcquired
-		}
-	}
-}
-
-// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
+// waitForNilResponseOnSuccess is a helper function to wait for a response on channel monitorCh where an nil
 // response is expected in a successful scenario
 func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
 	select {
@@ -390,7 +368,7 @@
 func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
 	log.Debug("ListLogicalDevices-request")
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireRequest(ctx); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
 			return &voltha.LogicalDevices{}, err
 		} else {
 			defer txn.Close()
@@ -455,7 +433,7 @@
 
 	if handler.competeForTransaction() {
 		// There are no device Id present in this function.
-		if txn, err := handler.acquireRequest(ctx); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
 			return &voltha.Device{}, err
 		} else {
 			defer txn.Close()
@@ -557,9 +535,11 @@
 
 	if handler.competeForTransaction() {
 		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
-			if err == errorTransactionNotAcquired && !handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}) {
-				// Remove the device in memory
-				handler.deviceMgr.stopManagingDevice(id.Id)
+			if err == errorTransactionNotAcquired {
+				if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}); !ownedByMe && err == nil {
+					// Remove the device in memory
+					handler.deviceMgr.stopManagingDevice(id.Id)
+				}
 			}
 			return new(empty.Empty), err
 		} else {
@@ -808,7 +788,7 @@
 	//TODO: Update this logic once the OF Controller (OFAgent in this case) can include a transaction Id in its
 	// request.  For performance reason we can let both Cores in a Core-Pair forward the Packet to the adapters and
 	// let once of the shim layer (kafka proxy or adapter request handler filters out the duplicate packet)
-	if handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: packet.Id}) {
+	if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: packet.Id}); ownedByMe && err == nil {
 		agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
 		agent.packetOut(packet.PacketOut)
 	}
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 11f3d4e..1b4370d 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -25,12 +25,6 @@
  * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
  * and then deletes the transaction key.
  *
- * To ensure the key is removed despite possible standby core failures, a KV operation is
- * scheduled in the background on both cores to delete the key well after the transaction is
- * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
- * long enough, on the order of many seconds, to ensure the standby sees the transaction
- * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
- * the KV store.
  */
 package core
 
@@ -48,23 +42,36 @@
 	SEIZED_BY_SELF
 	COMPLETED_BY_OTHER
 	ABANDONED_BY_OTHER
-	STOPPED_WATCHING_KEY
-	STOPPED_WAITING_FOR_KEY
+	ABANDONED_WATCH_BY_SELF
 )
 
 var errorTransactionNotAcquired = status.Error(codes.Canceled, "transaction-not-acquired")
+var errorTransactionInvalidId = status.Error(codes.Canceled, "transaction-invalid-id")
 
 const (
 	TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
 )
 
+// Transaction constants used to guarantee the Core processing a request hold on to the transaction until
+// it either completes it (either successfully or times out) or the Core itself crashes (e.g. a server failure).
+// If a request has a timeout of x seconds then the Core processing the request will renew the transaction lease
+// every x/NUM_TXN_RENEWAL_PER_REQUEST seconds. After the Core completes the request it stops renewing the
+// transaction and sets the transaction value to TRANSACTION_COMPLETE. If the processing Core crashes then it
+// will not renew the transaction causing the KV store to delete the transaction after its renewal period.  The
+// Core watching the transaction will then take over.
+// Since the MIN_TXN_RENEWAL_INTERVAL_IN_SEC is 3 seconds then for any transaction that completes within 3 seconds
+// there won't be a transaction renewal done.
+const (
+	NUM_TXN_RENEWAL_PER_REQUEST         = 2
+	MIN_TXN_RENEWAL_INTERVAL_IN_SEC     = 3
+	MIN_TXN_RESERVATION_DURATION_IN_SEC = 5
+)
+
 type TransactionContext struct {
-	kvClient                  kvstore.Client
-	kvOperationTimeout        int
-	monitorLoopTime           int64
-	owner                     string
-	timeToDeleteCompletedKeys int
-	txnPrefix                 string
+	kvClient           kvstore.Client
+	kvOperationTimeout int
+	owner              string
+	txnPrefix          string
 }
 
 var ctx *TransactionContext
@@ -74,8 +81,7 @@
 	"SEIZED-BY-SELF",
 	"COMPLETED-BY-OTHER",
 	"ABANDONED-BY-OTHER",
-	"STOPPED-WATCHING-KEY",
-	"STOPPED-WAITING-FOR-KEY"}
+	"ABANDONED_WATCH_BY_SELF"}
 
 func init() {
 	log.AddPackage(log.JSON, log.DebugLevel, nil)
@@ -85,17 +91,13 @@
 	owner string,
 	txnPrefix string,
 	kvClient kvstore.Client,
-	kvOpTimeout int,
-	keyDeleteTime int,
-	monLoopTime int64) *TransactionContext {
+	kvOpTimeout int) *TransactionContext {
 
 	return &TransactionContext{
-		owner:                     owner,
-		txnPrefix:                 txnPrefix,
-		kvClient:                  kvClient,
-		kvOperationTimeout:        kvOpTimeout,
-		monitorLoopTime:           monLoopTime,
-		timeToDeleteCompletedKeys: keyDeleteTime}
+		owner:              owner,
+		txnPrefix:          txnPrefix,
+		kvClient:           kvClient,
+		kvOperationTimeout: kvOpTimeout}
 }
 
 /*
@@ -110,26 +112,20 @@
  *                  only the etcd client is supported.
  * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
  *                      used by this package
- * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
- *                       a TRANSACTION_COMPLETE key
- * :param monLoopTime: The time in milliseconds that the monitor sleeps between
- *                     checks for the existence of the transaction key
  */
 func SetTransactionContext(owner string,
 	txnPrefix string,
 	kvClient kvstore.Client,
-	kvOpTimeout int,
-	keyDeleteTime int,
-	monLoopTime int64) error {
+	kvOpTimeout int) error {
 
-	ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
+	ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout)
 	return nil
 }
 
 type KVTransaction struct {
-	ch     chan int
-	txnId  string
-	txnKey string
+	monitorCh chan int
+	txnId     string
+	txnKey    string
 }
 
 /*
@@ -145,128 +141,117 @@
 }
 
 /*
- * This function returns a boolean indicating whether or not the caller should process
- * the request. True is returned in one of two cases:
- * (1) The current core successfully reserved the request's serial number with the KV store
- * (2) The current core failed in its reservation attempt but observed that the serving core
- *     has abandoned processing the request
+ * Acquired is invoked by a Core, upon reception of a request, to reserve the transaction key in the KV store. The
+ * request may be resource specific (i.e will include an ID for that resource) or may be generic (i.e. list a set of
+ * resources). If the request is resource specific then this function should be invoked with the ownedByMe flag to
+ * indicate whether this Core owns this resource.  In the case where this Core owns this resource or it is a generic
+ * request then we will proceed to reserve the transaction key in the KV store for a minimum time specified by the
+ * minDuration param.  If the reservation request fails (i.e. the other Core got the reservation before this one - this
+ * can happen only for generic request) then the Core will start to watch for changes to the key to determine
+ * whether the other Core completed the transaction successfully or the Core just died.  If the Core does not own the
+ * resource then we will proceed to watch the transaction key.
  *
- * :param duration: The duration of the reservation in milliseconds
- * :return: true - reservation acquired, process the request
- *          false - reservation not acquired, request being processed by another core
+ * :param minDuration: minimum time to reserve the transaction key in the KV store
+ * :param ownedByMe: specify whether the request is about a resource owned or not. If it's absent then this is a
+ * generic request that has no specific resource ID (e.g. list)
+ *
+ * :return: A boolean specifying whether the resource was acquired. An error is return in case this function is invoked
+ * for a resource that is nonexistent.
  */
-func (c *KVTransaction) Acquired(duration int64) bool {
+func (c *KVTransaction) Acquired(minDuration int64, ownedByMe ...bool) (bool, error) {
 	var acquired bool
 	var currOwner string = ""
 	var res int
 
 	// Convert milliseconds to seconds, rounding up
 	// The reservation TTL is specified in seconds
-	durationInSecs := duration / 1000
-	if remainder := duration % 1000; remainder > 0 {
+	durationInSecs := minDuration / 1000
+	if remainder := minDuration % 1000; remainder > 0 {
 		durationInSecs++
 	}
-	value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
+	if durationInSecs < int64(MIN_TXN_RESERVATION_DURATION_IN_SEC) {
+		durationInSecs = int64(MIN_TXN_RESERVATION_DURATION_IN_SEC)
+	}
+	genericRequest := true
+	resourceOwned := false
+	if len(ownedByMe) > 0 {
+		genericRequest = false
+		resourceOwned = ownedByMe[0]
+	}
+	if resourceOwned || genericRequest {
+		// Keep the reservation longer that the minDuration (which is really the request timeout) to ensure the
+		// transaction key stays in the KV store until after the Core finalize a request timeout condition (which is
+		// a success from a request completion perspective).
+		if err := c.tryToReserveTxn(durationInSecs * 2); err == nil {
+			res = SEIZED_BY_SELF
+		} else {
+			log.Debugw("watch-other-server",
+				log.Fields{"transactionId": c.txnId, "owner": currOwner, "timeout": durationInSecs})
+			res = c.Watch(durationInSecs)
+		}
+	} else {
+		res = c.Watch(durationInSecs)
+	}
+	switch res {
+	case SEIZED_BY_SELF, ABANDONED_BY_OTHER:
+		acquired = true
+	default:
+		acquired = false
+	}
+	log.Debugw("acquire-transaction-status", log.Fields{"transactionId": c.txnId, "acquired": acquired, "result": txnState[res]})
+	return acquired, nil
+}
 
-	// If the reservation failed, do we simply abort or drop into watch mode anyway?
-	// Setting value to nil leads to watch mode
+func (c *KVTransaction) tryToReserveTxn(durationInSecs int64) error {
+	var currOwner string = ""
+	var res int
+	value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
 	if value != nil {
-		if currOwner, err = kvstore.ToString(value); err != nil {
-			log.Errorw("unexpected-owner-type", log.Fields{"txn": c.txnId})
-			value = nil
+		if currOwner, err = kvstore.ToString(value); err != nil { // This should never happen
+			log.Errorw("unexpected-owner-type", log.Fields{"transactionId": c.txnId, "error": err})
+			return err
+		}
+		if currOwner == ctx.owner {
+			log.Debugw("acquired-transaction", log.Fields{"transactionId": c.txnId, "result": txnState[res]})
+			// Setup the monitoring channel
+			c.monitorCh = make(chan int)
+			go c.holdOnToTxnUntilProcessingCompleted(c.txnKey, ctx.owner, durationInSecs)
+			return nil
 		}
 	}
-	if err == nil && value != nil && currOwner == ctx.owner {
-		// Process the request immediately
-		res = SEIZED_BY_SELF
-	} else {
-		// Another core instance has reserved the request
-		// Watch for reservation expiry or successful request completion
-		log.Debugw("watch-other-server",
-			log.Fields{"txn": c.txnId, "owner": currOwner, "timeout": duration})
-
-		res = c.Watch(duration)
-	}
-	// Clean-up: delete the transaction key after a long delay
-	go c.deleteTransactionKey()
-
-	log.Debugw("acquire-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
-	switch res {
-	case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY:
-		acquired = true
-	default:
-		acquired = false
-	}
-	// Ensure the request watcher does not reply before the request server
-	if !acquired {
-		log.Debugw("Transaction was not ACQUIRED", log.Fields{"txn": c.txnId})
-	}
-	return acquired
+	return status.Error(codes.PermissionDenied, "reservation-denied")
 }
 
-/*
- * This function monitors the progress of a request that's been reserved by another
- * Voltha core.
- *
- * :param duration: The duration of the reservation in milliseconds
- * :return: true - reservation abandoned by the other core, process the request
- *          false - reservation not owned, request being processed by another core
- */
-func (c *KVTransaction) Monitor(duration int64) bool {
-	var acquired bool
-	var res int
-
-	// Convert milliseconds to seconds, rounding up
-	// The reservation TTL is specified in seconds
-	durationInSecs := duration / 1000
-	if remainder := duration % 1000; remainder > 0 {
-		durationInSecs++
-	}
-
-	res = c.Watch(duration)
-
-	// Clean-up: delete the transaction key after a long delay
-	go c.deleteTransactionKey()
-
-	log.Debugw("monitor-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
-	switch res {
-	case ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY, STOPPED_WAITING_FOR_KEY:
-		acquired = true
-	default:
-		acquired = false
-	}
-	// Ensure the request watcher does not reply before the request server
-	if !acquired {
-		log.Debugw("Transaction was not acquired", log.Fields{"txn": c.txnId})
-	}
-	return acquired
-}
-
-// duration in milliseconds
-func (c *KVTransaction) Watch(duration int64) int {
+func (c *KVTransaction) Watch(durationInSecs int64) int {
 	var res int
 
 	events := ctx.kvClient.Watch(c.txnKey)
 	defer ctx.kvClient.CloseWatch(c.txnKey, events)
 
+	transactionWasAcquiredByOther := false
+
+	//Check whether the transaction was already completed by the other Core before we got here.
+	if kvp, _ := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout); kvp != nil {
+		transactionWasAcquiredByOther = true
+		if val, err := kvstore.ToString(kvp.Value); err == nil {
+			if val == TRANSACTION_COMPLETE {
+				res = COMPLETED_BY_OTHER
+				// Do an immediate delete of the transaction in the KV Store to free up KV Storage faster
+				c.Delete()
+				return res
+			}
+		} else {
+			// An unexpected value - let's get out of here as something did not go according to plan
+			res = ABANDONED_WATCH_BY_SELF
+			log.Debugw("cannot-read-transaction-value", log.Fields{"txn": c.txnId, "error": err})
+			return res
+		}
+	}
+
 	for {
 		select {
-		// Add a timeout here in case we miss an event from the KV
-		case <-time.After(time.Duration(duration) * time.Millisecond):
-			// In case of missing events, let's check the transaction key
-			kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
-			if err == nil && kvp == nil {
-				log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
-				res = ABANDONED_BY_OTHER
-			} else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
-				log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
-				res = COMPLETED_BY_OTHER
-			} else {
-				log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
-				res = STOPPED_WATCHING_KEY
-			}
-
 		case event := <-events:
+			transactionWasAcquiredByOther = true
 			log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
 			if event.EventType == kvstore.DELETE {
 				// The other core failed to process the request
@@ -274,39 +259,73 @@
 			} else if event.EventType == kvstore.PUT {
 				key, e1 := kvstore.ToString(event.Key)
 				val, e2 := kvstore.ToString(event.Value)
-				if e1 == nil && key == c.txnKey && e2 == nil {
+				if e1 == nil && e2 == nil && key == c.txnKey {
 					if val == TRANSACTION_COMPLETE {
 						res = COMPLETED_BY_OTHER
-						// Successful request completion has been detected
-						// Remove the transaction key
+						// Successful request completion has been detected. Remove the transaction key
 						c.Delete()
 					} else {
-						log.Debugf("Ignoring reservation PUT event with val %v for key %v",
-							val, key)
+						log.Debugw("Ignoring-PUT-event", log.Fields{"val": val, "key": key})
 						continue
 					}
+				} else {
+					log.Warnw("received-unexpected-PUT-event", log.Fields{"txn": c.txnId, "key": key, "ctxKey": c.txnKey})
 				}
 			}
+		case <-time.After(time.Duration(durationInSecs) * time.Second):
+			// Corner case: In the case where the Core owning the device dies and before this Core takes ownership of
+			// this device there is a window where new requests will end up being watched instead of being processed.
+			// Grab the request if the other Core did not create the transaction in the KV store.
+			// TODO: Use a peer-monitoring probe to switch over (still relies on the probe frequency). This will
+			// guarantee that the peer is actually gone instead of limiting the time the peer can get hold of a
+			// request.
+			if !transactionWasAcquiredByOther {
+				log.Debugw("timeout-no-peer", log.Fields{"txId": c.txnId})
+				res = ABANDONED_BY_OTHER
+			} else {
+				continue
+			}
 		}
 		break
 	}
 	return res
 }
 
-func (c *KVTransaction) deleteTransactionKey() {
-	log.Debugw("schedule-key-deletion", log.Fields{"txnId": c.txnId, "txnkey": c.txnKey})
-	time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
-	log.Debugw("background-key-deletion", log.Fields{"txn": c.txnId, "txnkey": c.txnKey})
-	ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
-}
-
 func (c *KVTransaction) Close() error {
 	log.Debugw("close", log.Fields{"txn": c.txnId})
-	return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
+	// Stop monitoring the key (applies only when there has been no transaction switch over)
+	if c.monitorCh != nil {
+		close(c.monitorCh)
+		ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
+	}
+	return nil
 }
 
 func (c *KVTransaction) Delete() error {
 	log.Debugw("delete", log.Fields{"txn": c.txnId})
-	err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
-	return err
+	return ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
+}
+
+// holdOnToTxnUntilProcessingCompleted renews the transaction lease until the transaction is complete.  durationInSecs
+// is used to calculate the frequency at which the Core processing the transaction renews the lease.  This function
+// exits only when the transaction is Closed, i.e completed.
+func (c *KVTransaction) holdOnToTxnUntilProcessingCompleted(key string, owner string, durationInSecs int64) {
+	log.Debugw("holdOnToTxnUntilProcessingCompleted", log.Fields{"txn": c.txnId})
+	renewInterval := durationInSecs / NUM_TXN_RENEWAL_PER_REQUEST
+	if renewInterval < MIN_TXN_RENEWAL_INTERVAL_IN_SEC {
+		renewInterval = MIN_TXN_RENEWAL_INTERVAL_IN_SEC
+	}
+forLoop:
+	for {
+		select {
+		case <-c.monitorCh:
+			log.Debugw("transaction-renewal-exits", log.Fields{"txn": c.txnId})
+			break forLoop
+		case <-time.After(time.Duration(renewInterval) * time.Second):
+			if err := ctx.kvClient.RenewReservation(c.txnKey); err != nil {
+				// Log and continue.
+				log.Warnw("transaction-renewal-failed", log.Fields{"txnId": c.txnKey, "error": err})
+			}
+		}
+	}
 }
diff --git a/rw_core/main.go b/rw_core/main.go
index 2311029..e74d1bb 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -131,9 +131,7 @@
 		if err = c.SetTransactionContext(instanceId,
 			txnPrefix,
 			rw.kvClient,
-			rw.config.KVStoreTimeout,
-			rw.config.KVTxnKeyDelTime,
-			1); err != nil {
+			rw.config.KVStoreTimeout); err != nil {
 			log.Fatal("creating-transaction-context-failed")
 		}
 	}