VOL-1025: Implement a Go language library for affinity proxy request/response handling
- Both etcd and consul KV stores are supported
- Incorporated feedback from last code inspection
- Connected Core object to a KV client
- Added docker compose file for etcd testing

Change-Id: I5e3c9637f4e57d6cf7fa1102e4b3507f17bc8979
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 44d9dc1..c2a3634 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -17,9 +17,10 @@
 /*
  * Two voltha cores receive the same request; each tries to acquire ownership of the request
  * by writing its identifier (e.g. container name or pod name) to the transaction key named
- * after the serial number of the request. The core that loses the race to reserve the request
- * watches the progress of the core actually serving the request. Once the request is complete,
- * the serving core closes the transaction by invoking the KVTransaction's Close method, which
+ * after the serial number of the request. The core that loses the race for acquisition
+ * monitors the progress of the core actually serving the request by watching for changes
+ * in the value of the transaction key. Once the request is complete, the
+ * serving core closes the transaction by invoking the KVTransaction's Close method, which
  * replaces the value of the transaction (i.e. serial number) key with the string
  * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
  * and then deletes the transaction key.
@@ -34,8 +35,8 @@
 package core
 
 import (
-    "kvstore"
     "time"
+    "github.com/opencord/voltha-go/db/kvstore"
     log "github.com/opencord/voltha-go/common/log"
 )
 
@@ -53,7 +54,7 @@
 )
 
 type TransactionContext struct {
-    kvClient *kvstore.EtcdClient
+    kvClient kvstore.Client
     kvOperationTimeout int
     owner string
     timeToDeleteCompletedKeys int
@@ -75,7 +76,7 @@
 func NewTransactionContext(
     owner string,
     txnPrefix string,
-    kvClient *kvstore.EtcdClient,
+    kvClient kvstore.Client,
     kvOpTimeout int,
     keyDeleteTime int) *TransactionContext {
 
@@ -104,7 +105,7 @@
  */
 func SetTransactionContext(owner string,
     txnPrefix string,
-    kvClient *kvstore.EtcdClient,
+    kvClient kvstore.Client,
     kvOpTimeout int,
     keyDeleteTime int) error {
 
@@ -112,7 +113,7 @@
     return nil
 }
 
-type EtcdTransaction struct {
+type KVTransaction struct {
     ch chan int
     txnId string
     txnKey string
@@ -122,10 +123,10 @@
  * A KVTransaction constructor
  *
  * :param txnId: The serial number of a voltha request.
- * :return: A KVTransaction instance (currently an EtcdTransaction is returned)
+ * :return: A KVTransaction instance
  */
-func NewEtcdTransaction(txnId string) *EtcdTransaction {
-    return &EtcdTransaction{
+func NewKVTransaction(txnId string) *KVTransaction {
+    return &KVTransaction{
         txnId: txnId,
         txnKey: ctx.txnPrefix + txnId}
 }
@@ -141,7 +142,7 @@
  * :return: true - reservation acquired, process the request
  *          false - reservation not acquired, request being processed by another core
  */
-func (c *EtcdTransaction) Acquired(duration int64) bool {
+func (c *KVTransaction) Acquired(duration int64) bool {
     var acquired bool
     var currOwner string = ""
     var res int
@@ -158,7 +159,7 @@
     // Setting value to nil leads to watch mode
     if value != nil {
         if currOwner, err = kvstore.ToString(value); err != nil {
-            log.Fatal("Unexpected owner type")
+            log.Error("unexpected-owner-type")
             value = nil
         }
     }
@@ -166,23 +167,23 @@
         // Process the request immediately
         res = SEIZED_BY_SELF
     } else {
-        log.Debugw("Another owns the transaction", log.Fields{"owner": currOwner})
         // Another core instance has reserved the request
         // Watch for reservation expiry or successful request completion
-        // Add a timeout here in case we miss an event from the KV
-        log.Debugw("Wait for KV events", log.Fields{"timeout": duration})
         events := ctx.kvClient.Watch(c.txnKey)
+        log.Debugw("watch-other-server",
+            log.Fields{"owner": currOwner, "timeout": duration})
 
         select {
+        // Add a timeout here in case we miss an event from the KV
         case <-time.After(time.Duration(duration) * time.Millisecond):
             // In case of missing events, let's check the transaction key
             kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
             if err == nil && kvp == nil {
-                log.Debug("Missed DELETED event from KV")
+                log.Debug("missed-deleted-event")
                 res = ABANDONED_BY_OTHER
             } else if val, err := kvstore.ToString(kvp.Value);
                 err == nil && val == TRANSACTION_COMPLETE {
-                    log.Debugw("Missed PUT event from KV",
+                    log.Debugw("missed-put-event",
                         log.Fields{"key": c.txnKey, "value": val})
                     res = COMPLETED_BY_OTHER
             } else {
@@ -190,7 +191,7 @@
             }
 
         case event := <-events:
-            log.Debugw("Received KV event", log.Fields{"type": event.EventType})
+            log.Debugw("received-event", log.Fields{"type": event.EventType})
             if event.EventType == kvstore.DELETE {
                 // The other core failed to process the request; step up
                 res = ABANDONED_BY_OTHER
@@ -209,7 +210,7 @@
     // Clean-up: delete the transaction key after a long delay
     go c.deleteTransactionKey()
 
-    log.Debugw("Acquire transaction", log.Fields{"result": txnState[res]})
+    log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
     switch res {
     case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
         acquired = true
@@ -219,20 +220,20 @@
     return acquired
 }
 
-func (c *EtcdTransaction) deleteTransactionKey() {
-    log.Debugw("Schedule key deletion", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) deleteTransactionKey() {
+    log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
     time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
-    log.Debugw("Background key deletion", log.Fields{"key": c.txnKey})
+    log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
     ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
 }
 
-func (c *EtcdTransaction) Close() error {
-    log.Debugw("Close", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) Close() error {
+    log.Debugw("close", log.Fields{"key": c.txnKey})
     return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
 }
 
-func (c *EtcdTransaction) Delete() error {
-    log.Debugw("Delete", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) Delete() error {
+    log.Debugw("delete", log.Fields{"key": c.txnKey})
     err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
     return err
 }