VOL-1025: Implement a Go language library for affinity proxy request/response handling
- Etcd is the only KV store currently supported
Change-Id: Ic3edcd6b98950686561d9d59ebacca2be4d99358
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
new file mode 100644
index 0000000..44d9dc1
--- /dev/null
+++ b/rw_core/core/transaction.go
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Two voltha cores receive the same request; each tries to acquire ownership of the request
+ * by writing its identifier (e.g. container name or pod name) to the transaction key named
+ * after the serial number of the request. The core that loses the race to reserve the request
+ * watches the progress of the core actually serving the request. Once the request is complete,
+ * the serving core closes the transaction by invoking the KVTransaction's Close method, which
+ * replaces the value of the transaction (i.e. serial number) key with the string
+ * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
+ * and then deletes the transaction key.
+ *
+ * To ensure the key is removed despite possible standby core failures, a KV operation is
+ * scheduled in the background on both cores to delete the key well after the transaction is
+ * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
+ * long enough, on the order of many seconds, to ensure the standby sees the transaction
+ * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
+ * the KV store.
+ */
+package core
+
+import (
+ "kvstore"
+ "time"
+ log "github.com/opencord/voltha-go/common/log"
+)
+
+// Transaction acquisition results
+const (
+ UNKNOWN = iota
+ SEIZED_BY_SELF
+ COMPLETED_BY_OTHER
+ ABANDONED_BY_OTHER
+ STOPPED_WAITING_FOR_OTHER
+)
+
+const (
+ TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
+)
+
+type TransactionContext struct {
+ kvClient *kvstore.EtcdClient
+ kvOperationTimeout int
+ owner string
+ timeToDeleteCompletedKeys int
+ txnPrefix string
+}
+var ctx *TransactionContext
+
+var txnState = []string {
+ "UNKNOWN",
+ "SEIZED-BY-SELF",
+ "COMPLETED-BY-OTHER",
+ "ABANDONED-BY-OTHER",
+ "STOPPED-WAITING-FOR-OTHER"}
+
+func init() {
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+func NewTransactionContext(
+ owner string,
+ txnPrefix string,
+ kvClient *kvstore.EtcdClient,
+ kvOpTimeout int,
+ keyDeleteTime int) *TransactionContext {
+
+ return &TransactionContext{
+ owner: owner,
+ txnPrefix: txnPrefix,
+ kvClient: kvClient,
+ kvOperationTimeout: kvOpTimeout,
+ timeToDeleteCompletedKeys: keyDeleteTime}
+}
+
+/*
+ * Before instantiating a KVTransaction, a TransactionContext must be created.
+ * The parameters stored in the context govern the behaviour of all KVTransaction
+ * instances.
+ *
+ * :param owner: The owner (i.e. voltha core name) of a transaction
+ * :param txnPrefix: The key prefix under which all transaction IDs, or serial numbers,
+ * will be created (e.g. "service/voltha/transactions")
+ * :param kvClient: The client API used for all interactions with the KV store. Currently
+ * only the etcd client is supported.
+ * :param: kvOpTimeout: The maximum time to be taken by any KV operation used by this
+ * package
+ * :param keyDeleteTime: The time to wait, in the background, before deleting a
+ * TRANSACTION_COMPLETE key
+ */
+func SetTransactionContext(owner string,
+ txnPrefix string,
+ kvClient *kvstore.EtcdClient,
+ kvOpTimeout int,
+ keyDeleteTime int) error {
+
+ ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
+ return nil
+}
+
+type EtcdTransaction struct {
+ ch chan int
+ txnId string
+ txnKey string
+}
+
+/*
+ * A KVTransaction constructor
+ *
+ * :param txnId: The serial number of a voltha request.
+ * :return: A KVTransaction instance (currently an EtcdTransaction is returned)
+ */
+func NewEtcdTransaction(txnId string) *EtcdTransaction {
+ return &EtcdTransaction{
+ txnId: txnId,
+ txnKey: ctx.txnPrefix + txnId}
+}
+
+/*
+ * This function returns a boolean indicating whether or not the caller should process
+ * the request. True is returned in one of two cases:
+ * (1) The current core successfully reserved the request's serial number with the KV store
+ * (2) The current core failed in its reservation attempt but observed that the serving core
+ * has abandoned processing the request
+ *
+ * :param duration: The duration of the reservation in milliseconds
+ * :return: true - reservation acquired, process the request
+ * false - reservation not acquired, request being processed by another core
+ */
+func (c *EtcdTransaction) Acquired(duration int64) bool {
+ var acquired bool
+ var currOwner string = ""
+ var res int
+
+ // Convert milliseconds to seconds, rounding up
+ // The reservation TTL is specified in seconds
+ durationInSecs := duration / 1000
+ if remainder := duration % 1000; remainder > 0 {
+ durationInSecs++
+ }
+ value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
+
+ // If the reservation failed, do we simply abort or drop into watch mode anyway?
+ // Setting value to nil leads to watch mode
+ if value != nil {
+ if currOwner, err = kvstore.ToString(value); err != nil {
+ log.Fatal("Unexpected owner type")
+ value = nil
+ }
+ }
+ if err == nil && value != nil && currOwner == ctx.owner {
+ // Process the request immediately
+ res = SEIZED_BY_SELF
+ } else {
+ log.Debugw("Another owns the transaction", log.Fields{"owner": currOwner})
+ // Another core instance has reserved the request
+ // Watch for reservation expiry or successful request completion
+ // Add a timeout here in case we miss an event from the KV
+ log.Debugw("Wait for KV events", log.Fields{"timeout": duration})
+ events := ctx.kvClient.Watch(c.txnKey)
+
+ select {
+ case <-time.After(time.Duration(duration) * time.Millisecond):
+ // In case of missing events, let's check the transaction key
+ kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
+ if err == nil && kvp == nil {
+ log.Debug("Missed DELETED event from KV")
+ res = ABANDONED_BY_OTHER
+ } else if val, err := kvstore.ToString(kvp.Value);
+ err == nil && val == TRANSACTION_COMPLETE {
+ log.Debugw("Missed PUT event from KV",
+ log.Fields{"key": c.txnKey, "value": val})
+ res = COMPLETED_BY_OTHER
+ } else {
+ res = STOPPED_WAITING_FOR_OTHER
+ }
+
+ case event := <-events:
+ log.Debugw("Received KV event", log.Fields{"type": event.EventType})
+ if event.EventType == kvstore.DELETE {
+ // The other core failed to process the request; step up
+ res = ABANDONED_BY_OTHER
+ } else if event.EventType == kvstore.PUT {
+ key, e1 := kvstore.ToString(event.Key)
+ val, e2 := kvstore.ToString(event.Value)
+ if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
+ res = COMPLETED_BY_OTHER
+ // Successful request completion has been detected
+ // Remove the transaction key
+ c.Delete()
+ }
+ }
+ }
+ }
+ // Clean-up: delete the transaction key after a long delay
+ go c.deleteTransactionKey()
+
+ log.Debugw("Acquire transaction", log.Fields{"result": txnState[res]})
+ switch res {
+ case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+ acquired = true
+ default:
+ acquired = false
+ }
+ return acquired
+}
+
+func (c *EtcdTransaction) deleteTransactionKey() {
+ log.Debugw("Schedule key deletion", log.Fields{"key": c.txnKey})
+ time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
+ log.Debugw("Background key deletion", log.Fields{"key": c.txnKey})
+ ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
+}
+
+func (c *EtcdTransaction) Close() error {
+ log.Debugw("Close", log.Fields{"key": c.txnKey})
+ return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
+}
+
+func (c *EtcdTransaction) Delete() error {
+ log.Debugw("Delete", log.Fields{"key": c.txnKey})
+ err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
+ return err
+}
+