VOL-1512: Set device Active ownership per Core in a Core pair
- Changed NB & SB APIs to seize requests based on device ownership
- Added queue support for change-events
- Need to make prefix & timeout for the device ownership key configurable,
  currently hard-coded
- Need to make KV Transaction Monitor timeout configurable,
  currently hard-coded
- Need to clean up AdapterRequestHandlerProxy & LogicalDeviceManager
  constructors

Change-Id: Ieeb3df6d70baa529b87c8253cb9f0f5b2a94382a
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index e7125d3..ec7e4ca 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -56,6 +56,7 @@
 type TransactionContext struct {
 	kvClient                  kvstore.Client
 	kvOperationTimeout        int
+	monitorLoopTime           int64
 	owner                     string
 	timeToDeleteCompletedKeys int
 	txnPrefix                 string
@@ -71,7 +72,7 @@
 	"STOPPED-WAITING-FOR-OTHER"}
 
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
 func NewTransactionContext(
@@ -79,13 +80,15 @@
 	txnPrefix string,
 	kvClient kvstore.Client,
 	kvOpTimeout int,
-	keyDeleteTime int) *TransactionContext {
+	keyDeleteTime int,
+	monLoopTime int64) *TransactionContext {
 
 	return &TransactionContext{
 		owner:                     owner,
 		txnPrefix:                 txnPrefix,
 		kvClient:                  kvClient,
 		kvOperationTimeout:        kvOpTimeout,
+		monitorLoopTime:           monLoopTime,
 		timeToDeleteCompletedKeys: keyDeleteTime}
 }
 
@@ -99,18 +102,21 @@
  *                   will be created (e.g. "service/voltha/transactions")
  * :param kvClient: The client API used for all interactions with the KV store. Currently
  *                  only the etcd client is supported.
- * :param: kvOpTimeout: The maximum time to be taken by any KV operation used by this
- *                      package
- * :param keyDeleteTime: The time to wait, in the background, before deleting a
- *                       TRANSACTION_COMPLETE key
+ * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
+ *                      used by this package
+ * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
+ *                       a TRANSACTION_COMPLETE key
+ * :param monLoopTime: The time in milliseconds that the monitor sleeps between
+ *                     checks for the existence of the transaction key
  */
 func SetTransactionContext(owner string,
 	txnPrefix string,
 	kvClient kvstore.Client,
 	kvOpTimeout int,
-	keyDeleteTime int) error {
+	keyDeleteTime int,
+	monLoopTime int64) error {
 
-	ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
+	ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
 	return nil
 }
 
@@ -170,42 +176,10 @@
 	} else {
 		// Another core instance has reserved the request
 		// Watch for reservation expiry or successful request completion
-		events := ctx.kvClient.Watch(c.txnKey)
 		log.Debugw("watch-other-server",
 			log.Fields{"owner": currOwner, "timeout": duration})
 
-		select {
-		// Add a timeout here in case we miss an event from the KV
-		case <-time.After(time.Duration(duration) * time.Millisecond):
-			// In case of missing events, let's check the transaction key
-			kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
-			if err == nil && kvp == nil {
-				log.Debug("missed-deleted-event")
-				res = ABANDONED_BY_OTHER
-			} else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
-				log.Debugw("missed-put-event",
-					log.Fields{"key": c.txnKey, "value": val})
-				res = COMPLETED_BY_OTHER
-			} else {
-				res = STOPPED_WAITING_FOR_OTHER
-			}
-
-		case event := <-events:
-			log.Debugw("received-event", log.Fields{"type": event.EventType})
-			if event.EventType == kvstore.DELETE {
-				// The other core failed to process the request; step up
-				res = ABANDONED_BY_OTHER
-			} else if event.EventType == kvstore.PUT {
-				key, e1 := kvstore.ToString(event.Key)
-				val, e2 := kvstore.ToString(event.Value)
-				if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
-					res = COMPLETED_BY_OTHER
-					// Successful request completion has been detected
-					// Remove the transaction key
-					c.Delete()
-				}
-			}
-		}
+		res = c.Watch(duration)
 	}
 	// Clean-up: delete the transaction key after a long delay
 	go c.deleteTransactionKey()
@@ -224,6 +198,103 @@
 	return acquired
 }
 
+/*
+ * This function monitors the progress of a request that's been reserved by another
+ * Voltha core.
+ *
+ * :param duration: The duration of the reservation in milliseconds
+ * :return: true - reservation abandoned by the other core, process the request
+ *          false - reservation not owned, request being processed by another core
+ */
+func (c *KVTransaction) Monitor(duration int64) bool {
+	var acquired bool
+	var res int
+	var timeElapsed int64
+
+	// Convert milliseconds to seconds, rounding up
+	// The reservation TTL is specified in seconds
+	durationInSecs := duration / 1000
+	if remainder := duration % 1000; remainder > 0 {
+		durationInSecs++
+	}
+	// Check if transaction key has been set
+	keyExists := false
+	for timeElapsed = 0; timeElapsed < duration; timeElapsed = timeElapsed + ctx.monitorLoopTime {
+		kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
+		if err == nil && kvp == nil {
+			// This core has received the request before the core that actually
+			// owns the device. The owning core has yet to seize the transaction.
+			time.Sleep(time.Duration(ctx.monitorLoopTime) * time.Millisecond)
+		} else {
+			keyExists = true
+			log.Debug("waited-for-other-to-reserve-transaction")
+			break
+		}
+	}
+	if keyExists {
+		// Watch for reservation expiry or successful request completion
+		log.Debugw("watch-other-server", log.Fields{"timeout": duration})
+		res = c.Watch(duration)
+	} else {
+		res = STOPPED_WAITING_FOR_OTHER
+	}
+	// Clean-up: delete the transaction key after a long delay
+	go c.deleteTransactionKey()
+
+	log.Debugw("own-transaction", log.Fields{"result": txnState[res]})
+	switch res {
+	case ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+		acquired = true
+	default:
+		acquired = false
+	}
+	// Ensure the request watcher does not reply before the request server
+	if !acquired {
+		time.Sleep(1 * time.Second)
+	}
+	return acquired
+}
+
+// duration in milliseconds
+func (c *KVTransaction) Watch(duration int64) int {
+	var res int
+
+	events := ctx.kvClient.Watch(c.txnKey)
+	select {
+	// Add a timeout here in case we miss an event from the KV
+	case <-time.After(time.Duration(duration) * time.Millisecond):
+		// In case of missing events, let's check the transaction key
+		kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
+		if err == nil && kvp == nil {
+			log.Debug("missed-deleted-event")
+			res = ABANDONED_BY_OTHER
+		} else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
+			log.Debugw("missed-put-event",
+				log.Fields{"key": c.txnKey, "value": val})
+			res = COMPLETED_BY_OTHER
+		} else {
+			res = STOPPED_WAITING_FOR_OTHER
+		}
+
+	case event := <-events:
+		log.Debugw("received-event", log.Fields{"type": event.EventType})
+		if event.EventType == kvstore.DELETE {
+			// The other core failed to process the request
+			res = ABANDONED_BY_OTHER
+		} else if event.EventType == kvstore.PUT {
+			key, e1 := kvstore.ToString(event.Key)
+			val, e2 := kvstore.ToString(event.Value)
+			if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
+				res = COMPLETED_BY_OTHER
+				// Successful request completion has been detected
+				// Remove the transaction key
+				c.Delete()
+			}
+		}
+	}
+	return res
+}
+
 func (c *KVTransaction) deleteTransactionKey() {
 	log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
 	time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)