VOL-1885 : Ignore the reservation PUTs from transaction owner.
This will ensure that the rw_core that is not the owner of the transaction does not respond before
the active transaction owner has marked the transaction as complete.
Change-Id: Idf1baa23ca0d8b602fdc3e1b5658cc3010dbefff
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 711f3b5..0d30340 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -195,7 +195,7 @@
}
// Ensure the request watcher does not reply before the request server
if !acquired {
- time.Sleep(1 * time.Second)
+ log.Debugw("Transaction was not ACQUIRED", log.Fields{"txn": c.txnId})
}
return acquired
}
@@ -233,7 +233,7 @@
}
// Ensure the request watcher does not reply before the request server
if !acquired {
- time.Sleep(1 * time.Second)
+ log.Debugw("Transaction was not acquired", log.Fields{"txn": c.txnId})
}
return acquired
}
@@ -243,37 +243,47 @@
var res int
events := ctx.kvClient.Watch(c.txnKey)
- select {
- // Add a timeout here in case we miss an event from the KV
- case <-time.After(time.Duration(duration) * time.Millisecond):
- // In case of missing events, let's check the transaction key
- kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
- if err == nil && kvp == nil {
- log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
- res = ABANDONED_BY_OTHER
- } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
- log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
- res = COMPLETED_BY_OTHER
- } else {
- log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
- res = STOPPED_WATCHING_KEY
- }
- case event := <-events:
- log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
- if event.EventType == kvstore.DELETE {
- // The other core failed to process the request
- res = ABANDONED_BY_OTHER
- } else if event.EventType == kvstore.PUT {
- key, e1 := kvstore.ToString(event.Key)
- val, e2 := kvstore.ToString(event.Value)
- if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
+ for {
+ select {
+ // Add a timeout here in case we miss an event from the KV
+ case <-time.After(time.Duration(duration) * time.Millisecond):
+ // In case of missing events, let's check the transaction key
+ kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
+ if err == nil && kvp == nil {
+ log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
+ res = ABANDONED_BY_OTHER
+ } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
+ log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
res = COMPLETED_BY_OTHER
- // Successful request completion has been detected
- // Remove the transaction key
- c.Delete()
+ } else {
+ log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
+ res = STOPPED_WATCHING_KEY
+ }
+
+ case event := <-events:
+ log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
+ if event.EventType == kvstore.DELETE {
+ // The other core failed to process the request
+ res = ABANDONED_BY_OTHER
+ } else if event.EventType == kvstore.PUT {
+ key, e1 := kvstore.ToString(event.Key)
+ val, e2 := kvstore.ToString(event.Value)
+ if e1 == nil && key == c.txnKey && e2 == nil {
+ if val == TRANSACTION_COMPLETE {
+ res = COMPLETED_BY_OTHER
+ // Successful request completion has been detected
+ // Remove the transaction key
+ c.Delete()
+ } else {
+ log.Debugf("Ignoring reservation PUT event with val %v for key %v",
+ val, key)
+ continue
+ }
+ }
}
}
+ break
}
return res
}