VOL-1025: Implement a Go language library for affinity proxy request/response handling
- Both etcd and consul KV stores are supported
- Incorporated feedback from last code inspection
- Connected Core object to a KV client
- Added docker compose file for etcd testing

Change-Id: I5e3c9637f4e57d6cf7fa1102e4b3507f17bc8979
diff --git a/compose/docker-compose-etcd.yml b/compose/docker-compose-etcd.yml
new file mode 100644
index 0000000..cbcb7d5
--- /dev/null
+++ b/compose/docker-compose-etcd.yml
@@ -0,0 +1,38 @@
+---
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2'
+services:
+  #
+  # Single-node etcd server
+  #
+  etcd:
+    image: "quay.io/coreos/etcd:v3.2.9"
+    command: [
+      "etcd",
+      "--name=etcd0",
+      "--advertise-client-urls=http://${DOCKER_HOST_IP}:2379,http://${DOCKER_HOST_IP}:4001",
+      "--listen-client-urls=http://0.0.0.0:2379,http://0.0.0.0:4001",
+      "--initial-advertise-peer-urls=http://${DOCKER_HOST_IP}:2380",
+      "--listen-peer-urls=http://0.0.0.0:2380",
+      "--initial-cluster-token=etcd-cluster-1",
+      "--initial-cluster=etcd0=http://${DOCKER_HOST_IP}:2380",
+      "--initial-cluster-state=new"
+    ]
+    ports:
+    - "2379:2379"
+    - 2380
+    - 4001
+
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index a999d7b..feb9879 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -33,10 +33,11 @@
 	default_KafkaAdapterPort = 9092
 	default_KafkaClusterHost = "10.176.215.107"
 	default_KafkaClusterPort = 9094
-	default_KVStoreType      = ConsulStoreName
+	default_KVStoreType      = EtcdStoreName
 	default_KVStoreTimeout   = 5 //in seconds
 	default_KVStoreHost      = "10.176.230.190"
-	default_KVStorePort      = 8500 // Etcd = 2379
+	default_KVStorePort      = 2379 // Consul = 8500; Etcd = 2379
+	default_KVTxnKeyDelTime  = 60
 	default_LogLevel         = 0
 	default_Banner           = false
 	default_CoreTopic        = "rwcore"
@@ -61,6 +62,7 @@
 	KVStoreTimeout   int // in seconds
 	KVStoreHost      string
 	KVStorePort      int
+	KVTxnKeyDelTime  int
 	CoreTopic        string
 	LogLevel         int
 	Banner           bool
@@ -88,6 +90,7 @@
 		KVStoreTimeout:   default_KVStoreTimeout,
 		KVStoreHost:      default_KVStoreHost,
 		KVStorePort:      default_KVStorePort,
+		KVTxnKeyDelTime:  default_KVTxnKeyDelTime,
 		CoreTopic:        default_CoreTopic,
 		LogLevel:         default_LogLevel,
 		Banner:           default_Banner,
@@ -139,6 +142,9 @@
 	help = fmt.Sprintf("KV store port")
 	flag.IntVar(&(cf.KVStorePort), "kv_store_port", default_KVStorePort, help)
 
+	help = fmt.Sprintf("The time to wait before deleting a completed transaction key")
+	flag.IntVar(&(cf.KVTxnKeyDelTime), "kv_txn_delete_time", default_KVTxnKeyDelTime, help)
+
 	help = fmt.Sprintf("Log level")
 	flag.IntVar(&(cf.LogLevel), "log_level", default_LogLevel, help)
 
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index a8d5abf..d75a44f 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -19,6 +19,7 @@
 	"context"
 	grpcserver "github.com/opencord/voltha-go/common/grpc"
 	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/db/kvstore"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/kafka"
 	"github.com/opencord/voltha-go/protos/voltha"
@@ -39,20 +40,31 @@
 	clusterDataProxy  *model.Proxy
 	localDataProxy    *model.Proxy
 	exitChannel       chan int
+	kvClient          kvstore.Client
 }
 
 func init() {
 	log.AddPackage(log.JSON, log.WarnLevel, nil)
 }
 
-func NewCore(id string, cf *config.RWCoreFlags) *Core {
+func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client) *Core {
 	var core Core
 	core.instanceId = id
 	core.exitChannel = make(chan int, 1)
 	core.config = cf
-	// TODO: Setup the KV store
-	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
-	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
+	core.kvClient = kvClient
+
+	// Setup the KV store
+	// Do not call NewBackend constructor; it creates its own KV client
+	backend := model.Backend {
+		Client:     kvClient,
+		StoreType:  cf.KVStoreType,
+		Host:       cf.KVStoreHost,
+		Port:       cf.KVStorePort,
+		Timeout:    cf.KVStoreTimeout,
+		PathPrefix: "service/voltha"}
+	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
+	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
 	core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
 	core.localDataProxy = core.localDataRoot.GetProxy("/", false)
 	return &core
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 44d9dc1..c2a3634 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -17,9 +17,10 @@
 /*
  * Two voltha cores receive the same request; each tries to acquire ownership of the request
  * by writing its identifier (e.g. container name or pod name) to the transaction key named
- * after the serial number of the request. The core that loses the race to reserve the request
- * watches the progress of the core actually serving the request. Once the request is complete,
- * the serving core closes the transaction by invoking the KVTransaction's Close method, which
+ * after the serial number of the request. The core that loses the race for acquisition
+ * monitors the progress of the core actually serving the request by watching for changes
+ * in the value of the transaction key. Once the request is complete, the
+ * serving core closes the transaction by invoking the KVTransaction's Close method, which
  * replaces the value of the transaction (i.e. serial number) key with the string
  * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
  * and then deletes the transaction key.
@@ -34,8 +35,8 @@
 package core
 
 import (
-    "kvstore"
     "time"
+    "github.com/opencord/voltha-go/db/kvstore"
     log "github.com/opencord/voltha-go/common/log"
 )
 
@@ -53,7 +54,7 @@
 )
 
 type TransactionContext struct {
-    kvClient *kvstore.EtcdClient
+    kvClient kvstore.Client
     kvOperationTimeout int
     owner string
     timeToDeleteCompletedKeys int
@@ -75,7 +76,7 @@
 func NewTransactionContext(
     owner string,
     txnPrefix string,
-    kvClient *kvstore.EtcdClient,
+    kvClient kvstore.Client,
     kvOpTimeout int,
     keyDeleteTime int) *TransactionContext {
 
@@ -104,7 +105,7 @@
  */
 func SetTransactionContext(owner string,
     txnPrefix string,
-    kvClient *kvstore.EtcdClient,
+    kvClient kvstore.Client,
     kvOpTimeout int,
     keyDeleteTime int) error {
 
@@ -112,7 +113,7 @@
     return nil
 }
 
-type EtcdTransaction struct {
+type KVTransaction struct {
     ch chan int
     txnId string
     txnKey string
@@ -122,10 +123,10 @@
  * A KVTransaction constructor
  *
  * :param txnId: The serial number of a voltha request.
- * :return: A KVTransaction instance (currently an EtcdTransaction is returned)
+ * :return: A KVTransaction instance
  */
-func NewEtcdTransaction(txnId string) *EtcdTransaction {
-    return &EtcdTransaction{
+func NewKVTransaction(txnId string) *KVTransaction {
+    return &KVTransaction{
         txnId: txnId,
         txnKey: ctx.txnPrefix + txnId}
 }
@@ -141,7 +142,7 @@
  * :return: true - reservation acquired, process the request
  *          false - reservation not acquired, request being processed by another core
  */
-func (c *EtcdTransaction) Acquired(duration int64) bool {
+func (c *KVTransaction) Acquired(duration int64) bool {
     var acquired bool
     var currOwner string = ""
     var res int
@@ -158,7 +159,7 @@
     // Setting value to nil leads to watch mode
     if value != nil {
         if currOwner, err = kvstore.ToString(value); err != nil {
-            log.Fatal("Unexpected owner type")
+            log.Error("unexpected-owner-type")
             value = nil
         }
     }
@@ -166,23 +167,23 @@
         // Process the request immediately
         res = SEIZED_BY_SELF
     } else {
-        log.Debugw("Another owns the transaction", log.Fields{"owner": currOwner})
         // Another core instance has reserved the request
         // Watch for reservation expiry or successful request completion
-        // Add a timeout here in case we miss an event from the KV
-        log.Debugw("Wait for KV events", log.Fields{"timeout": duration})
         events := ctx.kvClient.Watch(c.txnKey)
+        log.Debugw("watch-other-server",
+            log.Fields{"owner": currOwner, "timeout": duration})
 
         select {
+        // Add a timeout here in case we miss an event from the KV
         case <-time.After(time.Duration(duration) * time.Millisecond):
             // In case of missing events, let's check the transaction key
             kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
             if err == nil && kvp == nil {
-                log.Debug("Missed DELETED event from KV")
+                log.Debug("missed-deleted-event")
                 res = ABANDONED_BY_OTHER
             } else if val, err := kvstore.ToString(kvp.Value);
                 err == nil && val == TRANSACTION_COMPLETE {
-                    log.Debugw("Missed PUT event from KV",
+                    log.Debugw("missed-put-event",
                         log.Fields{"key": c.txnKey, "value": val})
                     res = COMPLETED_BY_OTHER
             } else {
@@ -190,7 +191,7 @@
             }
 
         case event := <-events:
-            log.Debugw("Received KV event", log.Fields{"type": event.EventType})
+            log.Debugw("received-event", log.Fields{"type": event.EventType})
             if event.EventType == kvstore.DELETE {
                 // The other core failed to process the request; step up
                 res = ABANDONED_BY_OTHER
@@ -209,7 +210,7 @@
     // Clean-up: delete the transaction key after a long delay
     go c.deleteTransactionKey()
 
-    log.Debugw("Acquire transaction", log.Fields{"result": txnState[res]})
+    log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
     switch res {
     case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
         acquired = true
@@ -219,20 +220,20 @@
     return acquired
 }
 
-func (c *EtcdTransaction) deleteTransactionKey() {
-    log.Debugw("Schedule key deletion", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) deleteTransactionKey() {
+    log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
     time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
-    log.Debugw("Background key deletion", log.Fields{"key": c.txnKey})
+    log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
     ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
 }
 
-func (c *EtcdTransaction) Close() error {
-    log.Debugw("Close", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) Close() error {
+    log.Debugw("close", log.Fields{"key": c.txnKey})
     return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
 }
 
-func (c *EtcdTransaction) Delete() error {
-    log.Debugw("Delete", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) Delete() error {
+    log.Debugw("delete", log.Fields{"key": c.txnKey})
     err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
     return err
 }
diff --git a/rw_core/main.go b/rw_core/main.go
index f53a6ba..cd5dbe9 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -73,6 +73,7 @@
 	addr := rw.config.KVStoreHost + ":" + strconv.Itoa(rw.config.KVStorePort)
 	client, err := newKVClient(rw.config.KVStoreType, addr, rw.config.KVStoreTimeout)
 	if err != nil {
+		rw.kvClient = nil
 		log.Error(err)
 		return err
 	}
@@ -131,13 +132,23 @@
 	//	log.Fatalw("failed-to-start-kafka-proxy", log.Fields{"err":err})
 	//}
 
+	// Setup KV Client
+	log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
+	err := rw.setKVClient()
+	if err == nil {
+		// Setup KV transaction context
+		c.SetTransactionContext(rw.config.InstanceID,
+			"service/voltha/transactions/",
+			rw.kvClient,
+			rw.config.KVStoreTimeout,
+			rw.config.KVTxnKeyDelTime)
+	}
+
 	// Create the core service
-	rw.core = c.NewCore(rw.config.InstanceID, rw.config)
+	rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient)
 
 	// start the core
 	rw.core.Start(ctx)
-
-	// Setup KV Client
 }
 
 func (rw *rwCore) stop() {