VOL-1025: Implement a Go language library for affinity proxy request/response handling
- Both etcd and consul KV stores are supported
- Incorporated feedback from last code inspection
- Connected Core object to a KV client
- Added docker compose file for etcd testing
Change-Id: I5e3c9637f4e57d6cf7fa1102e4b3507f17bc8979
diff --git a/compose/docker-compose-etcd.yml b/compose/docker-compose-etcd.yml
new file mode 100644
index 0000000..cbcb7d5
--- /dev/null
+++ b/compose/docker-compose-etcd.yml
@@ -0,0 +1,38 @@
+---
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2'
+services:
+ #
+ # Single-node etcd server
+ #
+ etcd:
+ image: "quay.io/coreos/etcd:v3.2.9"
+ command: [
+ "etcd",
+ "--name=etcd0",
+ "--advertise-client-urls=http://${DOCKER_HOST_IP}:2379,http://${DOCKER_HOST_IP}:4001",
+ "--listen-client-urls=http://0.0.0.0:2379,http://0.0.0.0:4001",
+ "--initial-advertise-peer-urls=http://${DOCKER_HOST_IP}:2380",
+ "--listen-peer-urls=http://0.0.0.0:2380",
+ "--initial-cluster-token=etcd-cluster-1",
+ "--initial-cluster=etcd0=http://${DOCKER_HOST_IP}:2380",
+ "--initial-cluster-state=new"
+ ]
+ ports:
+ - "2379:2379"
+ - 2380
+ - 4001
+
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index a999d7b..feb9879 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -33,10 +33,11 @@
default_KafkaAdapterPort = 9092
default_KafkaClusterHost = "10.176.215.107"
default_KafkaClusterPort = 9094
- default_KVStoreType = ConsulStoreName
+ default_KVStoreType = EtcdStoreName
default_KVStoreTimeout = 5 //in seconds
default_KVStoreHost = "10.176.230.190"
- default_KVStorePort = 8500 // Etcd = 2379
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_KVTxnKeyDelTime = 60
default_LogLevel = 0
default_Banner = false
default_CoreTopic = "rwcore"
@@ -61,6 +62,7 @@
KVStoreTimeout int // in seconds
KVStoreHost string
KVStorePort int
+ KVTxnKeyDelTime int
CoreTopic string
LogLevel int
Banner bool
@@ -88,6 +90,7 @@
KVStoreTimeout: default_KVStoreTimeout,
KVStoreHost: default_KVStoreHost,
KVStorePort: default_KVStorePort,
+ KVTxnKeyDelTime: default_KVTxnKeyDelTime,
CoreTopic: default_CoreTopic,
LogLevel: default_LogLevel,
Banner: default_Banner,
@@ -139,6 +142,9 @@
help = fmt.Sprintf("KV store port")
flag.IntVar(&(cf.KVStorePort), "kv_store_port", default_KVStorePort, help)
+ help = fmt.Sprintf("The time to wait before deleting a completed transaction key")
+ flag.IntVar(&(cf.KVTxnKeyDelTime), "kv_txn_delete_time", default_KVTxnKeyDelTime, help)
+
help = fmt.Sprintf("Log level")
flag.IntVar(&(cf.LogLevel), "log_level", default_LogLevel, help)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index a8d5abf..d75a44f 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -19,6 +19,7 @@
"context"
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
"github.com/opencord/voltha-go/protos/voltha"
@@ -39,20 +40,31 @@
clusterDataProxy *model.Proxy
localDataProxy *model.Proxy
exitChannel chan int
+ kvClient kvstore.Client
}
func init() {
log.AddPackage(log.JSON, log.WarnLevel, nil)
}
-func NewCore(id string, cf *config.RWCoreFlags) *Core {
+func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client) *Core {
var core Core
core.instanceId = id
core.exitChannel = make(chan int, 1)
core.config = cf
- // TODO: Setup the KV store
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
+ core.kvClient = kvClient
+
+ // Setup the KV store
+ // Do not call NewBackend constructor; it creates its own KV client
+ backend := model.Backend {
+ Client: kvClient,
+ StoreType: cf.KVStoreType,
+ Host: cf.KVStoreHost,
+ Port: cf.KVStorePort,
+ Timeout: cf.KVStoreTimeout,
+ PathPrefix: "service/voltha"}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
core.localDataProxy = core.localDataRoot.GetProxy("/", false)
return &core
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 44d9dc1..c2a3634 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -17,9 +17,10 @@
/*
* Two voltha cores receive the same request; each tries to acquire ownership of the request
* by writing its identifier (e.g. container name or pod name) to the transaction key named
- * after the serial number of the request. The core that loses the race to reserve the request
- * watches the progress of the core actually serving the request. Once the request is complete,
- * the serving core closes the transaction by invoking the KVTransaction's Close method, which
+ * after the serial number of the request. The core that loses the race for acquisition
+ * monitors the progress of the core actually serving the request by watching for changes
+ * in the value of the transaction key. Once the request is complete, the
+ * serving core closes the transaction by invoking the KVTransaction's Close method, which
* replaces the value of the transaction (i.e. serial number) key with the string
* TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
* and then deletes the transaction key.
@@ -34,8 +35,8 @@
package core
import (
- "kvstore"
"time"
+ "github.com/opencord/voltha-go/db/kvstore"
log "github.com/opencord/voltha-go/common/log"
)
@@ -53,7 +54,7 @@
)
type TransactionContext struct {
- kvClient *kvstore.EtcdClient
+ kvClient kvstore.Client
kvOperationTimeout int
owner string
timeToDeleteCompletedKeys int
@@ -75,7 +76,7 @@
func NewTransactionContext(
owner string,
txnPrefix string,
- kvClient *kvstore.EtcdClient,
+ kvClient kvstore.Client,
kvOpTimeout int,
keyDeleteTime int) *TransactionContext {
@@ -104,7 +105,7 @@
*/
func SetTransactionContext(owner string,
txnPrefix string,
- kvClient *kvstore.EtcdClient,
+ kvClient kvstore.Client,
kvOpTimeout int,
keyDeleteTime int) error {
@@ -112,7 +113,7 @@
return nil
}
-type EtcdTransaction struct {
+type KVTransaction struct {
ch chan int
txnId string
txnKey string
@@ -122,10 +123,10 @@
* A KVTransaction constructor
*
* :param txnId: The serial number of a voltha request.
- * :return: A KVTransaction instance (currently an EtcdTransaction is returned)
+ * :return: A KVTransaction instance
*/
-func NewEtcdTransaction(txnId string) *EtcdTransaction {
- return &EtcdTransaction{
+func NewKVTransaction(txnId string) *KVTransaction {
+ return &KVTransaction{
txnId: txnId,
txnKey: ctx.txnPrefix + txnId}
}
@@ -141,7 +142,7 @@
* :return: true - reservation acquired, process the request
* false - reservation not acquired, request being processed by another core
*/
-func (c *EtcdTransaction) Acquired(duration int64) bool {
+func (c *KVTransaction) Acquired(duration int64) bool {
var acquired bool
var currOwner string = ""
var res int
@@ -158,7 +159,7 @@
// Setting value to nil leads to watch mode
if value != nil {
if currOwner, err = kvstore.ToString(value); err != nil {
- log.Fatal("Unexpected owner type")
+ log.Error("unexpected-owner-type")
value = nil
}
}
@@ -166,23 +167,23 @@
// Process the request immediately
res = SEIZED_BY_SELF
} else {
- log.Debugw("Another owns the transaction", log.Fields{"owner": currOwner})
// Another core instance has reserved the request
// Watch for reservation expiry or successful request completion
- // Add a timeout here in case we miss an event from the KV
- log.Debugw("Wait for KV events", log.Fields{"timeout": duration})
events := ctx.kvClient.Watch(c.txnKey)
+ log.Debugw("watch-other-server",
+ log.Fields{"owner": currOwner, "timeout": duration})
select {
+ // Add a timeout here in case we miss an event from the KV
case <-time.After(time.Duration(duration) * time.Millisecond):
// In case of missing events, let's check the transaction key
kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
if err == nil && kvp == nil {
- log.Debug("Missed DELETED event from KV")
+ log.Debug("missed-deleted-event")
res = ABANDONED_BY_OTHER
} else if val, err := kvstore.ToString(kvp.Value);
err == nil && val == TRANSACTION_COMPLETE {
- log.Debugw("Missed PUT event from KV",
+ log.Debugw("missed-put-event",
log.Fields{"key": c.txnKey, "value": val})
res = COMPLETED_BY_OTHER
} else {
@@ -190,7 +191,7 @@
}
case event := <-events:
- log.Debugw("Received KV event", log.Fields{"type": event.EventType})
+ log.Debugw("received-event", log.Fields{"type": event.EventType})
if event.EventType == kvstore.DELETE {
// The other core failed to process the request; step up
res = ABANDONED_BY_OTHER
@@ -209,7 +210,7 @@
// Clean-up: delete the transaction key after a long delay
go c.deleteTransactionKey()
- log.Debugw("Acquire transaction", log.Fields{"result": txnState[res]})
+ log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
switch res {
case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
acquired = true
@@ -219,20 +220,20 @@
return acquired
}
-func (c *EtcdTransaction) deleteTransactionKey() {
- log.Debugw("Schedule key deletion", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) deleteTransactionKey() {
+ log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
- log.Debugw("Background key deletion", log.Fields{"key": c.txnKey})
+ log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
}
-func (c *EtcdTransaction) Close() error {
- log.Debugw("Close", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) Close() error {
+ log.Debugw("close", log.Fields{"key": c.txnKey})
return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
}
-func (c *EtcdTransaction) Delete() error {
- log.Debugw("Delete", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) Delete() error {
+ log.Debugw("delete", log.Fields{"key": c.txnKey})
err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
return err
}
diff --git a/rw_core/main.go b/rw_core/main.go
index f53a6ba..cd5dbe9 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -73,6 +73,7 @@
addr := rw.config.KVStoreHost + ":" + strconv.Itoa(rw.config.KVStorePort)
client, err := newKVClient(rw.config.KVStoreType, addr, rw.config.KVStoreTimeout)
if err != nil {
+ rw.kvClient = nil
log.Error(err)
return err
}
@@ -131,13 +132,23 @@
// log.Fatalw("failed-to-start-kafka-proxy", log.Fields{"err":err})
//}
+ // Setup KV Client
+ log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
+ err := rw.setKVClient()
+ if err == nil {
+ // Setup KV transaction context
+ c.SetTransactionContext(rw.config.InstanceID,
+ "service/voltha/transactions/",
+ rw.kvClient,
+ rw.config.KVStoreTimeout,
+ rw.config.KVTxnKeyDelTime)
+ }
+
// Create the core service
- rw.core = c.NewCore(rw.config.InstanceID, rw.config)
+ rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient)
// start the core
rw.core.Start(ctx)
-
- // Setup KV Client
}
func (rw *rwCore) stop() {