[VOL-1997] Remove transaction timeout for a non-active rw_core
This commit cleans up the transaction processing between two
cores in a pair. It prevents the core not processing the request
to grab the request based on a timeout only.
Since this update heavily relies on the etcd mechanism then customized
local tests (not unit as could not find a full-featured etcd mock)
were run against it as well as some basic manual tests with
kind-voltha.
There is a TODO item in this commit to implement a peer-probe
mechanism to guarantee that a core in a pair has actually died
before a switch over is done.
Minor updates after first review.
Comments updates after second review
Change-Id: Ifc1442471595a979b39251535b8ee9210e1a52df
(cherry picked from commit cc40904e208892dea8e1a2a73b52e6465d3c6d59)
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 11f3d4e..1b4370d 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -25,12 +25,6 @@
* TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
* and then deletes the transaction key.
*
- * To ensure the key is removed despite possible standby core failures, a KV operation is
- * scheduled in the background on both cores to delete the key well after the transaction is
- * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
- * long enough, on the order of many seconds, to ensure the standby sees the transaction
- * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
- * the KV store.
*/
package core
@@ -48,23 +42,36 @@
SEIZED_BY_SELF
COMPLETED_BY_OTHER
ABANDONED_BY_OTHER
- STOPPED_WATCHING_KEY
- STOPPED_WAITING_FOR_KEY
+ ABANDONED_WATCH_BY_SELF
)
var errorTransactionNotAcquired = status.Error(codes.Canceled, "transaction-not-acquired")
+var errorTransactionInvalidId = status.Error(codes.Canceled, "transaction-invalid-id")
const (
TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
)
+// Transaction constants used to guarantee the Core processing a request hold on to the transaction until
+// it either completes it (either successfully or times out) or the Core itself crashes (e.g. a server failure).
+// If a request has a timeout of x seconds then the Core processing the request will renew the transaction lease
+// every x/NUM_TXN_RENEWAL_PER_REQUEST seconds. After the Core completes the request it stops renewing the
+// transaction and sets the transaction value to TRANSACTION_COMPLETE. If the processing Core crashes then it
+// will not renew the transaction causing the KV store to delete the transaction after its renewal period. The
+// Core watching the transaction will then take over.
+// Since the MIN_TXN_RENEWAL_INTERVAL_IN_SEC is 3 seconds then for any transaction that completes within 3 seconds
+// there won't be a transaction renewal done.
+const (
+ NUM_TXN_RENEWAL_PER_REQUEST = 2
+ MIN_TXN_RENEWAL_INTERVAL_IN_SEC = 3
+ MIN_TXN_RESERVATION_DURATION_IN_SEC = 5
+)
+
type TransactionContext struct {
- kvClient kvstore.Client
- kvOperationTimeout int
- monitorLoopTime int64
- owner string
- timeToDeleteCompletedKeys int
- txnPrefix string
+ kvClient kvstore.Client
+ kvOperationTimeout int
+ owner string
+ txnPrefix string
}
var ctx *TransactionContext
@@ -74,8 +81,7 @@
"SEIZED-BY-SELF",
"COMPLETED-BY-OTHER",
"ABANDONED-BY-OTHER",
- "STOPPED-WATCHING-KEY",
- "STOPPED-WAITING-FOR-KEY"}
+ "ABANDONED_WATCH_BY_SELF"}
func init() {
log.AddPackage(log.JSON, log.DebugLevel, nil)
@@ -85,17 +91,13 @@
owner string,
txnPrefix string,
kvClient kvstore.Client,
- kvOpTimeout int,
- keyDeleteTime int,
- monLoopTime int64) *TransactionContext {
+ kvOpTimeout int) *TransactionContext {
return &TransactionContext{
- owner: owner,
- txnPrefix: txnPrefix,
- kvClient: kvClient,
- kvOperationTimeout: kvOpTimeout,
- monitorLoopTime: monLoopTime,
- timeToDeleteCompletedKeys: keyDeleteTime}
+ owner: owner,
+ txnPrefix: txnPrefix,
+ kvClient: kvClient,
+ kvOperationTimeout: kvOpTimeout}
}
/*
@@ -110,26 +112,20 @@
* only the etcd client is supported.
* :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
* used by this package
- * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
- * a TRANSACTION_COMPLETE key
- * :param monLoopTime: The time in milliseconds that the monitor sleeps between
- * checks for the existence of the transaction key
*/
func SetTransactionContext(owner string,
txnPrefix string,
kvClient kvstore.Client,
- kvOpTimeout int,
- keyDeleteTime int,
- monLoopTime int64) error {
+ kvOpTimeout int) error {
- ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
+ ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout)
return nil
}
type KVTransaction struct {
- ch chan int
- txnId string
- txnKey string
+ monitorCh chan int
+ txnId string
+ txnKey string
}
/*
@@ -145,128 +141,117 @@
}
/*
- * This function returns a boolean indicating whether or not the caller should process
- * the request. True is returned in one of two cases:
- * (1) The current core successfully reserved the request's serial number with the KV store
- * (2) The current core failed in its reservation attempt but observed that the serving core
- * has abandoned processing the request
+ * Acquired is invoked by a Core, upon reception of a request, to reserve the transaction key in the KV store. The
+ * request may be resource specific (i.e will include an ID for that resource) or may be generic (i.e. list a set of
+ * resources). If the request is resource specific then this function should be invoked with the ownedByMe flag to
+ * indicate whether this Core owns this resource. In the case where this Core owns this resource or it is a generic
+ * request then we will proceed to reserve the transaction key in the KV store for a minimum time specified by the
+ * minDuration param. If the reservation request fails (i.e. the other Core got the reservation before this one - this
+ * can happen only for generic request) then the Core will start to watch for changes to the key to determine
+ * whether the other Core completed the transaction successfully or the Core just died. If the Core does not own the
+ * resource then we will proceed to watch the transaction key.
*
- * :param duration: The duration of the reservation in milliseconds
- * :return: true - reservation acquired, process the request
- * false - reservation not acquired, request being processed by another core
+ * :param minDuration: minimum time to reserve the transaction key in the KV store
+ * :param ownedByMe: specify whether the request is about a resource owned or not. If it's absent then this is a
+ * generic request that has no specific resource ID (e.g. list)
+ *
+ * :return: A boolean specifying whether the resource was acquired. An error is return in case this function is invoked
+ * for a resource that is nonexistent.
*/
-func (c *KVTransaction) Acquired(duration int64) bool {
+func (c *KVTransaction) Acquired(minDuration int64, ownedByMe ...bool) (bool, error) {
var acquired bool
var currOwner string = ""
var res int
// Convert milliseconds to seconds, rounding up
// The reservation TTL is specified in seconds
- durationInSecs := duration / 1000
- if remainder := duration % 1000; remainder > 0 {
+ durationInSecs := minDuration / 1000
+ if remainder := minDuration % 1000; remainder > 0 {
durationInSecs++
}
- value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
+ if durationInSecs < int64(MIN_TXN_RESERVATION_DURATION_IN_SEC) {
+ durationInSecs = int64(MIN_TXN_RESERVATION_DURATION_IN_SEC)
+ }
+ genericRequest := true
+ resourceOwned := false
+ if len(ownedByMe) > 0 {
+ genericRequest = false
+ resourceOwned = ownedByMe[0]
+ }
+ if resourceOwned || genericRequest {
+ // Keep the reservation longer that the minDuration (which is really the request timeout) to ensure the
+ // transaction key stays in the KV store until after the Core finalize a request timeout condition (which is
+ // a success from a request completion perspective).
+ if err := c.tryToReserveTxn(durationInSecs * 2); err == nil {
+ res = SEIZED_BY_SELF
+ } else {
+ log.Debugw("watch-other-server",
+ log.Fields{"transactionId": c.txnId, "owner": currOwner, "timeout": durationInSecs})
+ res = c.Watch(durationInSecs)
+ }
+ } else {
+ res = c.Watch(durationInSecs)
+ }
+ switch res {
+ case SEIZED_BY_SELF, ABANDONED_BY_OTHER:
+ acquired = true
+ default:
+ acquired = false
+ }
+ log.Debugw("acquire-transaction-status", log.Fields{"transactionId": c.txnId, "acquired": acquired, "result": txnState[res]})
+ return acquired, nil
+}
- // If the reservation failed, do we simply abort or drop into watch mode anyway?
- // Setting value to nil leads to watch mode
+func (c *KVTransaction) tryToReserveTxn(durationInSecs int64) error {
+ var currOwner string = ""
+ var res int
+ value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
if value != nil {
- if currOwner, err = kvstore.ToString(value); err != nil {
- log.Errorw("unexpected-owner-type", log.Fields{"txn": c.txnId})
- value = nil
+ if currOwner, err = kvstore.ToString(value); err != nil { // This should never happen
+ log.Errorw("unexpected-owner-type", log.Fields{"transactionId": c.txnId, "error": err})
+ return err
+ }
+ if currOwner == ctx.owner {
+ log.Debugw("acquired-transaction", log.Fields{"transactionId": c.txnId, "result": txnState[res]})
+ // Setup the monitoring channel
+ c.monitorCh = make(chan int)
+ go c.holdOnToTxnUntilProcessingCompleted(c.txnKey, ctx.owner, durationInSecs)
+ return nil
}
}
- if err == nil && value != nil && currOwner == ctx.owner {
- // Process the request immediately
- res = SEIZED_BY_SELF
- } else {
- // Another core instance has reserved the request
- // Watch for reservation expiry or successful request completion
- log.Debugw("watch-other-server",
- log.Fields{"txn": c.txnId, "owner": currOwner, "timeout": duration})
-
- res = c.Watch(duration)
- }
- // Clean-up: delete the transaction key after a long delay
- go c.deleteTransactionKey()
-
- log.Debugw("acquire-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
- switch res {
- case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY:
- acquired = true
- default:
- acquired = false
- }
- // Ensure the request watcher does not reply before the request server
- if !acquired {
- log.Debugw("Transaction was not ACQUIRED", log.Fields{"txn": c.txnId})
- }
- return acquired
+ return status.Error(codes.PermissionDenied, "reservation-denied")
}
-/*
- * This function monitors the progress of a request that's been reserved by another
- * Voltha core.
- *
- * :param duration: The duration of the reservation in milliseconds
- * :return: true - reservation abandoned by the other core, process the request
- * false - reservation not owned, request being processed by another core
- */
-func (c *KVTransaction) Monitor(duration int64) bool {
- var acquired bool
- var res int
-
- // Convert milliseconds to seconds, rounding up
- // The reservation TTL is specified in seconds
- durationInSecs := duration / 1000
- if remainder := duration % 1000; remainder > 0 {
- durationInSecs++
- }
-
- res = c.Watch(duration)
-
- // Clean-up: delete the transaction key after a long delay
- go c.deleteTransactionKey()
-
- log.Debugw("monitor-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
- switch res {
- case ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY, STOPPED_WAITING_FOR_KEY:
- acquired = true
- default:
- acquired = false
- }
- // Ensure the request watcher does not reply before the request server
- if !acquired {
- log.Debugw("Transaction was not acquired", log.Fields{"txn": c.txnId})
- }
- return acquired
-}
-
-// duration in milliseconds
-func (c *KVTransaction) Watch(duration int64) int {
+func (c *KVTransaction) Watch(durationInSecs int64) int {
var res int
events := ctx.kvClient.Watch(c.txnKey)
defer ctx.kvClient.CloseWatch(c.txnKey, events)
+ transactionWasAcquiredByOther := false
+
+ //Check whether the transaction was already completed by the other Core before we got here.
+ if kvp, _ := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout); kvp != nil {
+ transactionWasAcquiredByOther = true
+ if val, err := kvstore.ToString(kvp.Value); err == nil {
+ if val == TRANSACTION_COMPLETE {
+ res = COMPLETED_BY_OTHER
+ // Do an immediate delete of the transaction in the KV Store to free up KV Storage faster
+ c.Delete()
+ return res
+ }
+ } else {
+ // An unexpected value - let's get out of here as something did not go according to plan
+ res = ABANDONED_WATCH_BY_SELF
+ log.Debugw("cannot-read-transaction-value", log.Fields{"txn": c.txnId, "error": err})
+ return res
+ }
+ }
+
for {
select {
- // Add a timeout here in case we miss an event from the KV
- case <-time.After(time.Duration(duration) * time.Millisecond):
- // In case of missing events, let's check the transaction key
- kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
- if err == nil && kvp == nil {
- log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
- res = ABANDONED_BY_OTHER
- } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
- log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
- res = COMPLETED_BY_OTHER
- } else {
- log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
- res = STOPPED_WATCHING_KEY
- }
-
case event := <-events:
+ transactionWasAcquiredByOther = true
log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
if event.EventType == kvstore.DELETE {
// The other core failed to process the request
@@ -274,39 +259,73 @@
} else if event.EventType == kvstore.PUT {
key, e1 := kvstore.ToString(event.Key)
val, e2 := kvstore.ToString(event.Value)
- if e1 == nil && key == c.txnKey && e2 == nil {
+ if e1 == nil && e2 == nil && key == c.txnKey {
if val == TRANSACTION_COMPLETE {
res = COMPLETED_BY_OTHER
- // Successful request completion has been detected
- // Remove the transaction key
+ // Successful request completion has been detected. Remove the transaction key
c.Delete()
} else {
- log.Debugf("Ignoring reservation PUT event with val %v for key %v",
- val, key)
+ log.Debugw("Ignoring-PUT-event", log.Fields{"val": val, "key": key})
continue
}
+ } else {
+ log.Warnw("received-unexpected-PUT-event", log.Fields{"txn": c.txnId, "key": key, "ctxKey": c.txnKey})
}
}
+ case <-time.After(time.Duration(durationInSecs) * time.Second):
+ // Corner case: In the case where the Core owning the device dies and before this Core takes ownership of
+ // this device there is a window where new requests will end up being watched instead of being processed.
+ // Grab the request if the other Core did not create the transaction in the KV store.
+ // TODO: Use a peer-monitoring probe to switch over (still relies on the probe frequency). This will
+ // guarantee that the peer is actually gone instead of limiting the time the peer can get hold of a
+ // request.
+ if !transactionWasAcquiredByOther {
+ log.Debugw("timeout-no-peer", log.Fields{"txId": c.txnId})
+ res = ABANDONED_BY_OTHER
+ } else {
+ continue
+ }
}
break
}
return res
}
-func (c *KVTransaction) deleteTransactionKey() {
- log.Debugw("schedule-key-deletion", log.Fields{"txnId": c.txnId, "txnkey": c.txnKey})
- time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
- log.Debugw("background-key-deletion", log.Fields{"txn": c.txnId, "txnkey": c.txnKey})
- ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
-}
-
func (c *KVTransaction) Close() error {
log.Debugw("close", log.Fields{"txn": c.txnId})
- return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
+ // Stop monitoring the key (applies only when there has been no transaction switch over)
+ if c.monitorCh != nil {
+ close(c.monitorCh)
+ ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
+ }
+ return nil
}
func (c *KVTransaction) Delete() error {
log.Debugw("delete", log.Fields{"txn": c.txnId})
- err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
- return err
+ return ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
+}
+
+// holdOnToTxnUntilProcessingCompleted renews the transaction lease until the transaction is complete. durationInSecs
+// is used to calculate the frequency at which the Core processing the transaction renews the lease. This function
+// exits only when the transaction is Closed, i.e completed.
+func (c *KVTransaction) holdOnToTxnUntilProcessingCompleted(key string, owner string, durationInSecs int64) {
+ log.Debugw("holdOnToTxnUntilProcessingCompleted", log.Fields{"txn": c.txnId})
+ renewInterval := durationInSecs / NUM_TXN_RENEWAL_PER_REQUEST
+ if renewInterval < MIN_TXN_RENEWAL_INTERVAL_IN_SEC {
+ renewInterval = MIN_TXN_RENEWAL_INTERVAL_IN_SEC
+ }
+forLoop:
+ for {
+ select {
+ case <-c.monitorCh:
+ log.Debugw("transaction-renewal-exits", log.Fields{"txn": c.txnId})
+ break forLoop
+ case <-time.After(time.Duration(renewInterval) * time.Second):
+ if err := ctx.kvClient.RenewReservation(c.txnKey); err != nil {
+ // Log and continue.
+ log.Warnw("transaction-renewal-failed", log.Fields{"txnId": c.txnKey, "error": err})
+ }
+ }
+ }
}