VOL-1512: Set device Active ownership per Core in a Core pair
- Changed NB & SB APIs to seize requests based on device ownership
- Added queue support for change-events
- Need to make prefix & timeout for the device ownership key configurable,
currently hard-coded
- Need to make KV Transaction Monitor timeout configurable,
currently hard-coded
- Need to clean up AdapterRequestHandlerProxy & LogicalDeviceManager
constructors
Change-Id: Ieeb3df6d70baa529b87c8253cb9f0f5b2a94382a
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index e7125d3..ec7e4ca 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -56,6 +56,7 @@
type TransactionContext struct {
kvClient kvstore.Client
kvOperationTimeout int
+ monitorLoopTime int64
owner string
timeToDeleteCompletedKeys int
txnPrefix string
@@ -71,7 +72,7 @@
"STOPPED-WAITING-FOR-OTHER"}
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
}
func NewTransactionContext(
@@ -79,13 +80,15 @@
txnPrefix string,
kvClient kvstore.Client,
kvOpTimeout int,
- keyDeleteTime int) *TransactionContext {
+ keyDeleteTime int,
+ monLoopTime int64) *TransactionContext {
return &TransactionContext{
owner: owner,
txnPrefix: txnPrefix,
kvClient: kvClient,
kvOperationTimeout: kvOpTimeout,
+ monitorLoopTime: monLoopTime,
timeToDeleteCompletedKeys: keyDeleteTime}
}
@@ -99,18 +102,21 @@
* will be created (e.g. "service/voltha/transactions")
* :param kvClient: The client API used for all interactions with the KV store. Currently
* only the etcd client is supported.
- * :param: kvOpTimeout: The maximum time to be taken by any KV operation used by this
- * package
- * :param keyDeleteTime: The time to wait, in the background, before deleting a
- * TRANSACTION_COMPLETE key
+ * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
+ * used by this package
+ * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
+ * a TRANSACTION_COMPLETE key
+ * :param monLoopTime: The time in milliseconds that the monitor sleeps between
+ * checks for the existence of the transaction key
*/
func SetTransactionContext(owner string,
txnPrefix string,
kvClient kvstore.Client,
kvOpTimeout int,
- keyDeleteTime int) error {
+ keyDeleteTime int,
+ monLoopTime int64) error {
- ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
+ ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
return nil
}
@@ -170,42 +176,10 @@
} else {
// Another core instance has reserved the request
// Watch for reservation expiry or successful request completion
- events := ctx.kvClient.Watch(c.txnKey)
log.Debugw("watch-other-server",
log.Fields{"owner": currOwner, "timeout": duration})
- select {
- // Add a timeout here in case we miss an event from the KV
- case <-time.After(time.Duration(duration) * time.Millisecond):
- // In case of missing events, let's check the transaction key
- kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
- if err == nil && kvp == nil {
- log.Debug("missed-deleted-event")
- res = ABANDONED_BY_OTHER
- } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
- log.Debugw("missed-put-event",
- log.Fields{"key": c.txnKey, "value": val})
- res = COMPLETED_BY_OTHER
- } else {
- res = STOPPED_WAITING_FOR_OTHER
- }
-
- case event := <-events:
- log.Debugw("received-event", log.Fields{"type": event.EventType})
- if event.EventType == kvstore.DELETE {
- // The other core failed to process the request; step up
- res = ABANDONED_BY_OTHER
- } else if event.EventType == kvstore.PUT {
- key, e1 := kvstore.ToString(event.Key)
- val, e2 := kvstore.ToString(event.Value)
- if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
- res = COMPLETED_BY_OTHER
- // Successful request completion has been detected
- // Remove the transaction key
- c.Delete()
- }
- }
- }
+ res = c.Watch(duration)
}
// Clean-up: delete the transaction key after a long delay
go c.deleteTransactionKey()
@@ -224,6 +198,103 @@
return acquired
}
+/*
+ * This function monitors the progress of a request that's been reserved by another
+ * Voltha core.
+ *
+ * :param duration: The duration of the reservation in milliseconds
+ * :return: true - reservation abandoned by the other core, process the request
+ * false - reservation not owned, request being processed by another core
+ */
+func (c *KVTransaction) Monitor(duration int64) bool {
+ var acquired bool
+ var res int
+ var timeElapsed int64
+
+ // Convert milliseconds to seconds, rounding up
+ // The reservation TTL is specified in seconds
+ durationInSecs := duration / 1000
+ if remainder := duration % 1000; remainder > 0 {
+ durationInSecs++
+ }
+ // Check if transaction key has been set
+ keyExists := false
+ for timeElapsed = 0; timeElapsed < duration; timeElapsed = timeElapsed + ctx.monitorLoopTime {
+ kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
+ if err == nil && kvp == nil {
+ // This core has received the request before the core that actually
+ // owns the device. The owning core has yet to seize the transaction.
+ time.Sleep(time.Duration(ctx.monitorLoopTime) * time.Millisecond)
+ } else {
+ keyExists = true
+ log.Debug("waited-for-other-to-reserve-transaction")
+ break
+ }
+ }
+ if keyExists {
+ // Watch for reservation expiry or successful request completion
+ log.Debugw("watch-other-server", log.Fields{"timeout": duration})
+ res = c.Watch(duration)
+ } else {
+ res = STOPPED_WAITING_FOR_OTHER
+ }
+ // Clean-up: delete the transaction key after a long delay
+ go c.deleteTransactionKey()
+
+ log.Debugw("own-transaction", log.Fields{"result": txnState[res]})
+ switch res {
+ case ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+ acquired = true
+ default:
+ acquired = false
+ }
+ // Ensure the request watcher does not reply before the request server
+ if !acquired {
+ time.Sleep(1 * time.Second)
+ }
+ return acquired
+}
+
+// duration in milliseconds
+func (c *KVTransaction) Watch(duration int64) int {
+ var res int
+
+ events := ctx.kvClient.Watch(c.txnKey)
+ select {
+ // Add a timeout here in case we miss an event from the KV
+ case <-time.After(time.Duration(duration) * time.Millisecond):
+ // In case of missing events, let's check the transaction key
+ kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
+ if err == nil && kvp == nil {
+ log.Debug("missed-deleted-event")
+ res = ABANDONED_BY_OTHER
+ } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
+ log.Debugw("missed-put-event",
+ log.Fields{"key": c.txnKey, "value": val})
+ res = COMPLETED_BY_OTHER
+ } else {
+ res = STOPPED_WAITING_FOR_OTHER
+ }
+
+ case event := <-events:
+ log.Debugw("received-event", log.Fields{"type": event.EventType})
+ if event.EventType == kvstore.DELETE {
+ // The other core failed to process the request
+ res = ABANDONED_BY_OTHER
+ } else if event.EventType == kvstore.PUT {
+ key, e1 := kvstore.ToString(event.Key)
+ val, e2 := kvstore.ToString(event.Value)
+ if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
+ res = COMPLETED_BY_OTHER
+ // Successful request completion has been detected
+ // Remove the transaction key
+ c.Delete()
+ }
+ }
+ }
+ return res
+}
+
func (c *KVTransaction) deleteTransactionKey() {
log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)