[VOL-1997] Remove transaction timeout for a non-active rw_core

This commit cleans up the transaction processing between two
cores in a pair.  It prevents the core not processing the request
to grab the request based on a timeout only.

Since this update heavily relies on the etcd mechanism then customized
local tests (not unit as could not find a full-featured etcd mock)
were run against it as well as some basic manual tests with
kind-voltha.

There is a TODO item in this commit to implement a peer-probe
mechanism to guarantee that a core in a pair has actually died
before a switch over is done.

Minor updates after first review.
Comments updates after second review

Change-Id: Ifc1442471595a979b39251535b8ee9210e1a52df
(cherry picked from commit cc40904e208892dea8e1a2a73b52e6465d3c6d59)
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 11f3d4e..1b4370d 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -25,12 +25,6 @@
  * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
  * and then deletes the transaction key.
  *
- * To ensure the key is removed despite possible standby core failures, a KV operation is
- * scheduled in the background on both cores to delete the key well after the transaction is
- * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
- * long enough, on the order of many seconds, to ensure the standby sees the transaction
- * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
- * the KV store.
  */
 package core
 
@@ -48,23 +42,36 @@
 	SEIZED_BY_SELF
 	COMPLETED_BY_OTHER
 	ABANDONED_BY_OTHER
-	STOPPED_WATCHING_KEY
-	STOPPED_WAITING_FOR_KEY
+	ABANDONED_WATCH_BY_SELF
 )
 
 var errorTransactionNotAcquired = status.Error(codes.Canceled, "transaction-not-acquired")
+var errorTransactionInvalidId = status.Error(codes.Canceled, "transaction-invalid-id")
 
 const (
 	TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
 )
 
+// Transaction constants used to guarantee the Core processing a request hold on to the transaction until
+// it either completes it (either successfully or times out) or the Core itself crashes (e.g. a server failure).
+// If a request has a timeout of x seconds then the Core processing the request will renew the transaction lease
+// every x/NUM_TXN_RENEWAL_PER_REQUEST seconds. After the Core completes the request it stops renewing the
+// transaction and sets the transaction value to TRANSACTION_COMPLETE. If the processing Core crashes then it
+// will not renew the transaction causing the KV store to delete the transaction after its renewal period.  The
+// Core watching the transaction will then take over.
+// Since the MIN_TXN_RENEWAL_INTERVAL_IN_SEC is 3 seconds then for any transaction that completes within 3 seconds
+// there won't be a transaction renewal done.
+const (
+	NUM_TXN_RENEWAL_PER_REQUEST         = 2
+	MIN_TXN_RENEWAL_INTERVAL_IN_SEC     = 3
+	MIN_TXN_RESERVATION_DURATION_IN_SEC = 5
+)
+
 type TransactionContext struct {
-	kvClient                  kvstore.Client
-	kvOperationTimeout        int
-	monitorLoopTime           int64
-	owner                     string
-	timeToDeleteCompletedKeys int
-	txnPrefix                 string
+	kvClient           kvstore.Client
+	kvOperationTimeout int
+	owner              string
+	txnPrefix          string
 }
 
 var ctx *TransactionContext
@@ -74,8 +81,7 @@
 	"SEIZED-BY-SELF",
 	"COMPLETED-BY-OTHER",
 	"ABANDONED-BY-OTHER",
-	"STOPPED-WATCHING-KEY",
-	"STOPPED-WAITING-FOR-KEY"}
+	"ABANDONED_WATCH_BY_SELF"}
 
 func init() {
 	log.AddPackage(log.JSON, log.DebugLevel, nil)
@@ -85,17 +91,13 @@
 	owner string,
 	txnPrefix string,
 	kvClient kvstore.Client,
-	kvOpTimeout int,
-	keyDeleteTime int,
-	monLoopTime int64) *TransactionContext {
+	kvOpTimeout int) *TransactionContext {
 
 	return &TransactionContext{
-		owner:                     owner,
-		txnPrefix:                 txnPrefix,
-		kvClient:                  kvClient,
-		kvOperationTimeout:        kvOpTimeout,
-		monitorLoopTime:           monLoopTime,
-		timeToDeleteCompletedKeys: keyDeleteTime}
+		owner:              owner,
+		txnPrefix:          txnPrefix,
+		kvClient:           kvClient,
+		kvOperationTimeout: kvOpTimeout}
 }
 
 /*
@@ -110,26 +112,20 @@
  *                  only the etcd client is supported.
  * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
  *                      used by this package
- * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
- *                       a TRANSACTION_COMPLETE key
- * :param monLoopTime: The time in milliseconds that the monitor sleeps between
- *                     checks for the existence of the transaction key
  */
 func SetTransactionContext(owner string,
 	txnPrefix string,
 	kvClient kvstore.Client,
-	kvOpTimeout int,
-	keyDeleteTime int,
-	monLoopTime int64) error {
+	kvOpTimeout int) error {
 
-	ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
+	ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout)
 	return nil
 }
 
 type KVTransaction struct {
-	ch     chan int
-	txnId  string
-	txnKey string
+	monitorCh chan int
+	txnId     string
+	txnKey    string
 }
 
 /*
@@ -145,128 +141,117 @@
 }
 
 /*
- * This function returns a boolean indicating whether or not the caller should process
- * the request. True is returned in one of two cases:
- * (1) The current core successfully reserved the request's serial number with the KV store
- * (2) The current core failed in its reservation attempt but observed that the serving core
- *     has abandoned processing the request
+ * Acquired is invoked by a Core, upon reception of a request, to reserve the transaction key in the KV store. The
+ * request may be resource specific (i.e will include an ID for that resource) or may be generic (i.e. list a set of
+ * resources). If the request is resource specific then this function should be invoked with the ownedByMe flag to
+ * indicate whether this Core owns this resource.  In the case where this Core owns this resource or it is a generic
+ * request then we will proceed to reserve the transaction key in the KV store for a minimum time specified by the
+ * minDuration param.  If the reservation request fails (i.e. the other Core got the reservation before this one - this
+ * can happen only for generic request) then the Core will start to watch for changes to the key to determine
+ * whether the other Core completed the transaction successfully or the Core just died.  If the Core does not own the
+ * resource then we will proceed to watch the transaction key.
  *
- * :param duration: The duration of the reservation in milliseconds
- * :return: true - reservation acquired, process the request
- *          false - reservation not acquired, request being processed by another core
+ * :param minDuration: minimum time to reserve the transaction key in the KV store
+ * :param ownedByMe: specify whether the request is about a resource owned or not. If it's absent then this is a
+ * generic request that has no specific resource ID (e.g. list)
+ *
+ * :return: A boolean specifying whether the resource was acquired. An error is return in case this function is invoked
+ * for a resource that is nonexistent.
  */
-func (c *KVTransaction) Acquired(duration int64) bool {
+func (c *KVTransaction) Acquired(minDuration int64, ownedByMe ...bool) (bool, error) {
 	var acquired bool
 	var currOwner string = ""
 	var res int
 
 	// Convert milliseconds to seconds, rounding up
 	// The reservation TTL is specified in seconds
-	durationInSecs := duration / 1000
-	if remainder := duration % 1000; remainder > 0 {
+	durationInSecs := minDuration / 1000
+	if remainder := minDuration % 1000; remainder > 0 {
 		durationInSecs++
 	}
-	value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
+	if durationInSecs < int64(MIN_TXN_RESERVATION_DURATION_IN_SEC) {
+		durationInSecs = int64(MIN_TXN_RESERVATION_DURATION_IN_SEC)
+	}
+	genericRequest := true
+	resourceOwned := false
+	if len(ownedByMe) > 0 {
+		genericRequest = false
+		resourceOwned = ownedByMe[0]
+	}
+	if resourceOwned || genericRequest {
+		// Keep the reservation longer that the minDuration (which is really the request timeout) to ensure the
+		// transaction key stays in the KV store until after the Core finalize a request timeout condition (which is
+		// a success from a request completion perspective).
+		if err := c.tryToReserveTxn(durationInSecs * 2); err == nil {
+			res = SEIZED_BY_SELF
+		} else {
+			log.Debugw("watch-other-server",
+				log.Fields{"transactionId": c.txnId, "owner": currOwner, "timeout": durationInSecs})
+			res = c.Watch(durationInSecs)
+		}
+	} else {
+		res = c.Watch(durationInSecs)
+	}
+	switch res {
+	case SEIZED_BY_SELF, ABANDONED_BY_OTHER:
+		acquired = true
+	default:
+		acquired = false
+	}
+	log.Debugw("acquire-transaction-status", log.Fields{"transactionId": c.txnId, "acquired": acquired, "result": txnState[res]})
+	return acquired, nil
+}
 
-	// If the reservation failed, do we simply abort or drop into watch mode anyway?
-	// Setting value to nil leads to watch mode
+func (c *KVTransaction) tryToReserveTxn(durationInSecs int64) error {
+	var currOwner string = ""
+	var res int
+	value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
 	if value != nil {
-		if currOwner, err = kvstore.ToString(value); err != nil {
-			log.Errorw("unexpected-owner-type", log.Fields{"txn": c.txnId})
-			value = nil
+		if currOwner, err = kvstore.ToString(value); err != nil { // This should never happen
+			log.Errorw("unexpected-owner-type", log.Fields{"transactionId": c.txnId, "error": err})
+			return err
+		}
+		if currOwner == ctx.owner {
+			log.Debugw("acquired-transaction", log.Fields{"transactionId": c.txnId, "result": txnState[res]})
+			// Setup the monitoring channel
+			c.monitorCh = make(chan int)
+			go c.holdOnToTxnUntilProcessingCompleted(c.txnKey, ctx.owner, durationInSecs)
+			return nil
 		}
 	}
-	if err == nil && value != nil && currOwner == ctx.owner {
-		// Process the request immediately
-		res = SEIZED_BY_SELF
-	} else {
-		// Another core instance has reserved the request
-		// Watch for reservation expiry or successful request completion
-		log.Debugw("watch-other-server",
-			log.Fields{"txn": c.txnId, "owner": currOwner, "timeout": duration})
-
-		res = c.Watch(duration)
-	}
-	// Clean-up: delete the transaction key after a long delay
-	go c.deleteTransactionKey()
-
-	log.Debugw("acquire-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
-	switch res {
-	case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY:
-		acquired = true
-	default:
-		acquired = false
-	}
-	// Ensure the request watcher does not reply before the request server
-	if !acquired {
-		log.Debugw("Transaction was not ACQUIRED", log.Fields{"txn": c.txnId})
-	}
-	return acquired
+	return status.Error(codes.PermissionDenied, "reservation-denied")
 }
 
-/*
- * This function monitors the progress of a request that's been reserved by another
- * Voltha core.
- *
- * :param duration: The duration of the reservation in milliseconds
- * :return: true - reservation abandoned by the other core, process the request
- *          false - reservation not owned, request being processed by another core
- */
-func (c *KVTransaction) Monitor(duration int64) bool {
-	var acquired bool
-	var res int
-
-	// Convert milliseconds to seconds, rounding up
-	// The reservation TTL is specified in seconds
-	durationInSecs := duration / 1000
-	if remainder := duration % 1000; remainder > 0 {
-		durationInSecs++
-	}
-
-	res = c.Watch(duration)
-
-	// Clean-up: delete the transaction key after a long delay
-	go c.deleteTransactionKey()
-
-	log.Debugw("monitor-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
-	switch res {
-	case ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY, STOPPED_WAITING_FOR_KEY:
-		acquired = true
-	default:
-		acquired = false
-	}
-	// Ensure the request watcher does not reply before the request server
-	if !acquired {
-		log.Debugw("Transaction was not acquired", log.Fields{"txn": c.txnId})
-	}
-	return acquired
-}
-
-// duration in milliseconds
-func (c *KVTransaction) Watch(duration int64) int {
+func (c *KVTransaction) Watch(durationInSecs int64) int {
 	var res int
 
 	events := ctx.kvClient.Watch(c.txnKey)
 	defer ctx.kvClient.CloseWatch(c.txnKey, events)
 
+	transactionWasAcquiredByOther := false
+
+	//Check whether the transaction was already completed by the other Core before we got here.
+	if kvp, _ := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout); kvp != nil {
+		transactionWasAcquiredByOther = true
+		if val, err := kvstore.ToString(kvp.Value); err == nil {
+			if val == TRANSACTION_COMPLETE {
+				res = COMPLETED_BY_OTHER
+				// Do an immediate delete of the transaction in the KV Store to free up KV Storage faster
+				c.Delete()
+				return res
+			}
+		} else {
+			// An unexpected value - let's get out of here as something did not go according to plan
+			res = ABANDONED_WATCH_BY_SELF
+			log.Debugw("cannot-read-transaction-value", log.Fields{"txn": c.txnId, "error": err})
+			return res
+		}
+	}
+
 	for {
 		select {
-		// Add a timeout here in case we miss an event from the KV
-		case <-time.After(time.Duration(duration) * time.Millisecond):
-			// In case of missing events, let's check the transaction key
-			kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
-			if err == nil && kvp == nil {
-				log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
-				res = ABANDONED_BY_OTHER
-			} else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
-				log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
-				res = COMPLETED_BY_OTHER
-			} else {
-				log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
-				res = STOPPED_WATCHING_KEY
-			}
-
 		case event := <-events:
+			transactionWasAcquiredByOther = true
 			log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
 			if event.EventType == kvstore.DELETE {
 				// The other core failed to process the request
@@ -274,39 +259,73 @@
 			} else if event.EventType == kvstore.PUT {
 				key, e1 := kvstore.ToString(event.Key)
 				val, e2 := kvstore.ToString(event.Value)
-				if e1 == nil && key == c.txnKey && e2 == nil {
+				if e1 == nil && e2 == nil && key == c.txnKey {
 					if val == TRANSACTION_COMPLETE {
 						res = COMPLETED_BY_OTHER
-						// Successful request completion has been detected
-						// Remove the transaction key
+						// Successful request completion has been detected. Remove the transaction key
 						c.Delete()
 					} else {
-						log.Debugf("Ignoring reservation PUT event with val %v for key %v",
-							val, key)
+						log.Debugw("Ignoring-PUT-event", log.Fields{"val": val, "key": key})
 						continue
 					}
+				} else {
+					log.Warnw("received-unexpected-PUT-event", log.Fields{"txn": c.txnId, "key": key, "ctxKey": c.txnKey})
 				}
 			}
+		case <-time.After(time.Duration(durationInSecs) * time.Second):
+			// Corner case: In the case where the Core owning the device dies and before this Core takes ownership of
+			// this device there is a window where new requests will end up being watched instead of being processed.
+			// Grab the request if the other Core did not create the transaction in the KV store.
+			// TODO: Use a peer-monitoring probe to switch over (still relies on the probe frequency). This will
+			// guarantee that the peer is actually gone instead of limiting the time the peer can get hold of a
+			// request.
+			if !transactionWasAcquiredByOther {
+				log.Debugw("timeout-no-peer", log.Fields{"txId": c.txnId})
+				res = ABANDONED_BY_OTHER
+			} else {
+				continue
+			}
 		}
 		break
 	}
 	return res
 }
 
-func (c *KVTransaction) deleteTransactionKey() {
-	log.Debugw("schedule-key-deletion", log.Fields{"txnId": c.txnId, "txnkey": c.txnKey})
-	time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
-	log.Debugw("background-key-deletion", log.Fields{"txn": c.txnId, "txnkey": c.txnKey})
-	ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
-}
-
 func (c *KVTransaction) Close() error {
 	log.Debugw("close", log.Fields{"txn": c.txnId})
-	return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
+	// Stop monitoring the key (applies only when there has been no transaction switch over)
+	if c.monitorCh != nil {
+		close(c.monitorCh)
+		ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
+	}
+	return nil
 }
 
 func (c *KVTransaction) Delete() error {
 	log.Debugw("delete", log.Fields{"txn": c.txnId})
-	err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
-	return err
+	return ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
+}
+
+// holdOnToTxnUntilProcessingCompleted renews the transaction lease until the transaction is complete.  durationInSecs
+// is used to calculate the frequency at which the Core processing the transaction renews the lease.  This function
+// exits only when the transaction is Closed, i.e completed.
+func (c *KVTransaction) holdOnToTxnUntilProcessingCompleted(key string, owner string, durationInSecs int64) {
+	log.Debugw("holdOnToTxnUntilProcessingCompleted", log.Fields{"txn": c.txnId})
+	renewInterval := durationInSecs / NUM_TXN_RENEWAL_PER_REQUEST
+	if renewInterval < MIN_TXN_RENEWAL_INTERVAL_IN_SEC {
+		renewInterval = MIN_TXN_RENEWAL_INTERVAL_IN_SEC
+	}
+forLoop:
+	for {
+		select {
+		case <-c.monitorCh:
+			log.Debugw("transaction-renewal-exits", log.Fields{"txn": c.txnId})
+			break forLoop
+		case <-time.After(time.Duration(renewInterval) * time.Second):
+			if err := ctx.kvClient.RenewReservation(c.txnKey); err != nil {
+				// Log and continue.
+				log.Warnw("transaction-renewal-failed", log.Fields{"txnId": c.txnKey, "error": err})
+			}
+		}
+	}
 }