[VOL-1512] Set device ownership

This commit consists of the following:
1) Set device ownership per Core in a Core-pair such that only 1
Core actively process a device (i.e. handles all the requests for
that device) while the other Core in the pair passively watch for
updates on that device and will take over in case the owner Core
failed to process the transaction.
2) Cleanup the lock mechanisms to ensure we use a read lock when
needed instead of just a lock.
3) Update logical port additions such that ports are added only when
the device is enabled.
4) Update the port Ids for the logical ports.
5) Update some sarama client configs for performance - this is an
ongoing tune up.
6) Update the adapter request handler in the Core to send back an
ACK immediately to the adapter request instead of processing the
request fully and then sending an ACK.  This reduces the latency
over kafka and therefore reduces the likelihood of timeouts.

Change-Id: I9149bf3ba6fbad38e3a29c76ea8dba2f9f731d29
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index ec7e4ca..12bf93e 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -46,7 +46,8 @@
 	SEIZED_BY_SELF
 	COMPLETED_BY_OTHER
 	ABANDONED_BY_OTHER
-	STOPPED_WAITING_FOR_OTHER
+	STOPPED_WATCHING_KEY
+	STOPPED_WAITING_FOR_KEY
 )
 
 const (
@@ -69,7 +70,8 @@
 	"SEIZED-BY-SELF",
 	"COMPLETED-BY-OTHER",
 	"ABANDONED-BY-OTHER",
-	"STOPPED-WAITING-FOR-OTHER"}
+	"STOPPED-WATCHING-KEY",
+	"STOPPED-WAITING-FOR-KEY"}
 
 func init() {
 	log.AddPackage(log.JSON, log.DebugLevel, nil)
@@ -166,7 +168,7 @@
 	// Setting value to nil leads to watch mode
 	if value != nil {
 		if currOwner, err = kvstore.ToString(value); err != nil {
-			log.Error("unexpected-owner-type")
+			log.Errorw("unexpected-owner-type", log.Fields{"txn": c.txnId})
 			value = nil
 		}
 	}
@@ -177,16 +179,16 @@
 		// Another core instance has reserved the request
 		// Watch for reservation expiry or successful request completion
 		log.Debugw("watch-other-server",
-			log.Fields{"owner": currOwner, "timeout": duration})
+			log.Fields{"txn": c.txnId, "owner": currOwner, "timeout": duration})
 
 		res = c.Watch(duration)
 	}
 	// Clean-up: delete the transaction key after a long delay
 	go c.deleteTransactionKey()
 
-	log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
+	log.Debugw("acquire-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
 	switch res {
-	case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+	case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY:
 		acquired = true
 	default:
 		acquired = false
@@ -209,7 +211,6 @@
 func (c *KVTransaction) Monitor(duration int64) bool {
 	var acquired bool
 	var res int
-	var timeElapsed int64
 
 	// Convert milliseconds to seconds, rounding up
 	// The reservation TTL is specified in seconds
@@ -217,33 +218,15 @@
 	if remainder := duration % 1000; remainder > 0 {
 		durationInSecs++
 	}
-	// Check if transaction key has been set
-	keyExists := false
-	for timeElapsed = 0; timeElapsed < duration; timeElapsed = timeElapsed + ctx.monitorLoopTime {
-		kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
-		if err == nil && kvp == nil {
-			// This core has received the request before the core that actually
-			// owns the device. The owning core has yet to seize the transaction.
-			time.Sleep(time.Duration(ctx.monitorLoopTime) * time.Millisecond)
-		} else {
-			keyExists = true
-			log.Debug("waited-for-other-to-reserve-transaction")
-			break
-		}
-	}
-	if keyExists {
-		// Watch for reservation expiry or successful request completion
-		log.Debugw("watch-other-server", log.Fields{"timeout": duration})
-		res = c.Watch(duration)
-	} else {
-		res = STOPPED_WAITING_FOR_OTHER
-	}
+
+	res = c.Watch(duration)
+
 	// Clean-up: delete the transaction key after a long delay
 	go c.deleteTransactionKey()
 
-	log.Debugw("own-transaction", log.Fields{"result": txnState[res]})
+	log.Debugw("monitor-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
 	switch res {
-	case ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+	case ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY, STOPPED_WAITING_FOR_KEY:
 		acquired = true
 	default:
 		acquired = false
@@ -266,18 +249,18 @@
 		// In case of missing events, let's check the transaction key
 		kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
 		if err == nil && kvp == nil {
-			log.Debug("missed-deleted-event")
+			log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
 			res = ABANDONED_BY_OTHER
 		} else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
-			log.Debugw("missed-put-event",
-				log.Fields{"key": c.txnKey, "value": val})
+			log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
 			res = COMPLETED_BY_OTHER
 		} else {
-			res = STOPPED_WAITING_FOR_OTHER
+			log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
+			res = STOPPED_WATCHING_KEY
 		}
 
 	case event := <-events:
-		log.Debugw("received-event", log.Fields{"type": event.EventType})
+		log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
 		if event.EventType == kvstore.DELETE {
 			// The other core failed to process the request
 			res = ABANDONED_BY_OTHER
@@ -296,19 +279,19 @@
 }
 
 func (c *KVTransaction) deleteTransactionKey() {
-	log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
+	log.Debugw("schedule-key-deletion", log.Fields{"txnId": c.txnId, "txnkey": c.txnKey})
 	time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
-	log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
+	log.Debugw("background-key-deletion", log.Fields{"txn": c.txnId, "txnkey": c.txnKey})
 	ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
 }
 
 func (c *KVTransaction) Close() error {
-	log.Debugw("close", log.Fields{"key": c.txnKey})
+	log.Debugw("close", log.Fields{"txn": c.txnId})
 	return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
 }
 
 func (c *KVTransaction) Delete() error {
-	log.Debugw("delete", log.Fields{"key": c.txnKey})
+	log.Debugw("delete", log.Fields{"txn": c.txnId})
 	err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
 	return err
 }