VOL-1025: Implement a Go language library for affinity proxy request/response handling
- Both etcd and consul KV stores are supported
- Incorporated feedback from last code inspection
- Connected Core object to a KV client
- Added docker compose file for etcd testing
Change-Id: I5e3c9637f4e57d6cf7fa1102e4b3507f17bc8979
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 44d9dc1..c2a3634 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -17,9 +17,10 @@
/*
* Two voltha cores receive the same request; each tries to acquire ownership of the request
* by writing its identifier (e.g. container name or pod name) to the transaction key named
- * after the serial number of the request. The core that loses the race to reserve the request
- * watches the progress of the core actually serving the request. Once the request is complete,
- * the serving core closes the transaction by invoking the KVTransaction's Close method, which
+ * after the serial number of the request. The core that loses the race for acquisition
+ * monitors the progress of the core actually serving the request by watching for changes
+ * in the value of the transaction key. Once the request is complete, the
+ * serving core closes the transaction by invoking the KVTransaction's Close method, which
* replaces the value of the transaction (i.e. serial number) key with the string
* TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
* and then deletes the transaction key.
@@ -34,8 +35,8 @@
package core
import (
- "kvstore"
"time"
+ "github.com/opencord/voltha-go/db/kvstore"
log "github.com/opencord/voltha-go/common/log"
)
@@ -53,7 +54,7 @@
)
type TransactionContext struct {
- kvClient *kvstore.EtcdClient
+ kvClient kvstore.Client
kvOperationTimeout int
owner string
timeToDeleteCompletedKeys int
@@ -75,7 +76,7 @@
func NewTransactionContext(
owner string,
txnPrefix string,
- kvClient *kvstore.EtcdClient,
+ kvClient kvstore.Client,
kvOpTimeout int,
keyDeleteTime int) *TransactionContext {
@@ -104,7 +105,7 @@
*/
func SetTransactionContext(owner string,
txnPrefix string,
- kvClient *kvstore.EtcdClient,
+ kvClient kvstore.Client,
kvOpTimeout int,
keyDeleteTime int) error {
@@ -112,7 +113,7 @@
return nil
}
-type EtcdTransaction struct {
+type KVTransaction struct {
ch chan int
txnId string
txnKey string
@@ -122,10 +123,10 @@
* A KVTransaction constructor
*
* :param txnId: The serial number of a voltha request.
- * :return: A KVTransaction instance (currently an EtcdTransaction is returned)
+ * :return: A KVTransaction instance
*/
-func NewEtcdTransaction(txnId string) *EtcdTransaction {
- return &EtcdTransaction{
+func NewKVTransaction(txnId string) *KVTransaction {
+ return &KVTransaction{
txnId: txnId,
txnKey: ctx.txnPrefix + txnId}
}
@@ -141,7 +142,7 @@
* :return: true - reservation acquired, process the request
* false - reservation not acquired, request being processed by another core
*/
-func (c *EtcdTransaction) Acquired(duration int64) bool {
+func (c *KVTransaction) Acquired(duration int64) bool {
var acquired bool
var currOwner string = ""
var res int
@@ -158,7 +159,7 @@
// Setting value to nil leads to watch mode
if value != nil {
if currOwner, err = kvstore.ToString(value); err != nil {
- log.Fatal("Unexpected owner type")
+ log.Error("unexpected-owner-type")
value = nil
}
}
@@ -166,23 +167,23 @@
// Process the request immediately
res = SEIZED_BY_SELF
} else {
- log.Debugw("Another owns the transaction", log.Fields{"owner": currOwner})
// Another core instance has reserved the request
// Watch for reservation expiry or successful request completion
- // Add a timeout here in case we miss an event from the KV
- log.Debugw("Wait for KV events", log.Fields{"timeout": duration})
events := ctx.kvClient.Watch(c.txnKey)
+ log.Debugw("watch-other-server",
+ log.Fields{"owner": currOwner, "timeout": duration})
select {
+ // Add a timeout here in case we miss an event from the KV
case <-time.After(time.Duration(duration) * time.Millisecond):
// In case of missing events, let's check the transaction key
kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
if err == nil && kvp == nil {
- log.Debug("Missed DELETED event from KV")
+ log.Debug("missed-deleted-event")
res = ABANDONED_BY_OTHER
} else if val, err := kvstore.ToString(kvp.Value);
err == nil && val == TRANSACTION_COMPLETE {
- log.Debugw("Missed PUT event from KV",
+ log.Debugw("missed-put-event",
log.Fields{"key": c.txnKey, "value": val})
res = COMPLETED_BY_OTHER
} else {
@@ -190,7 +191,7 @@
}
case event := <-events:
- log.Debugw("Received KV event", log.Fields{"type": event.EventType})
+ log.Debugw("received-event", log.Fields{"type": event.EventType})
if event.EventType == kvstore.DELETE {
// The other core failed to process the request; step up
res = ABANDONED_BY_OTHER
@@ -209,7 +210,7 @@
// Clean-up: delete the transaction key after a long delay
go c.deleteTransactionKey()
- log.Debugw("Acquire transaction", log.Fields{"result": txnState[res]})
+ log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
switch res {
case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
acquired = true
@@ -219,20 +220,20 @@
return acquired
}
-func (c *EtcdTransaction) deleteTransactionKey() {
- log.Debugw("Schedule key deletion", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) deleteTransactionKey() {
+ log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
- log.Debugw("Background key deletion", log.Fields{"key": c.txnKey})
+ log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
}
-func (c *EtcdTransaction) Close() error {
- log.Debugw("Close", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) Close() error {
+ log.Debugw("close", log.Fields{"key": c.txnKey})
return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
}
-func (c *EtcdTransaction) Delete() error {
- log.Debugw("Delete", log.Fields{"key": c.txnKey})
+func (c *KVTransaction) Delete() error {
+ log.Debugw("delete", log.Fields{"key": c.txnKey})
err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
return err
}