[VOL-1997] Remove transaction timeout for a non-active rw_core
This commit cleans up the transaction processing between two
cores in a pair. It prevents the core not processing the request
to grab the request based on a timeout only.
Since this update heavily relies on the etcd mechanism then customized
local tests (not unit as could not find a full-featured etcd mock)
were run against it as well as some basic manual tests with
kind-voltha.
There is a TODO item in this commit to implement a peer-probe
mechanism to guarantee that a core in a pair has actually died
before a switch over is done.
Minor updates after first review.
Comments updates after second review
Change-Id: Ifc1442471595a979b39251535b8ee9210e1a52df
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 987a3f6..482abfc 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -31,7 +31,6 @@
"google.golang.org/grpc/status"
"io"
"sync"
- "time"
)
const (
@@ -124,62 +123,41 @@
return handler.coreInCompetingMode
}
-// acquireRequest handles transaction processing for device creation and list requests, i.e. when there are no
-// specific id requested (list scenario) or id present in the request (creation use case).
-func (handler *APIHandler) acquireRequest(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+// takeRequestOwnership creates a transaction in the dB for this request and handles the logic of transaction
+// acquisition. If the device is owned by this Core (in a core-pair) then acquire the transaction with a
+// timeout value (in the event this Core dies the transaction times out in the dB causing the other Core in the
+// core-pair to proceed with the it). If the device is not owned then this Core will just monitor the transaction
+// for potential timeouts.
+func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
timeout := handler.defaultRequestTimeout
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
}
- log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
txn, err := handler.createKvTransaction(ctx)
if txn == nil {
return nil, err
- } else if txn.Acquired(timeout) {
+ }
+ var acquired bool
+ if id != nil {
+ var ownedByMe bool
+ if ownedByMe, err = handler.core.deviceOwnership.OwnedByMe(id); err != nil {
+ log.Warnw("getting-ownership-failed", log.Fields{"deviceId": id, "error": err})
+ return nil, errorTransactionInvalidId
+ }
+ acquired, err = txn.Acquired(timeout, ownedByMe)
+ } else {
+ acquired, err = txn.Acquired(timeout)
+ }
+ if err == nil && acquired {
+ log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnId})
return txn, nil
} else {
+ log.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnId, "error": err})
return nil, errorTransactionNotAcquired
}
}
-// takeRequestOwnership creates a transaction in the dB for this request and handles the logic of transaction
-// acquisition. If the device is owned by this Core (in a core-pair) then acquire the transaction with a
-// timeout value (in the event of a timeout the other Core in the core-pair will proceed with the transaction). If the
-// device is not owned then this Core will just monitor the transaction for potential timeouts.
-func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
- t := time.Now()
- timeout := handler.defaultRequestTimeout
- if len(maxTimeout) > 0 {
- timeout = maxTimeout[0]
- }
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
- return nil, err
- }
-
- owned := false
- if id != nil {
- owned = handler.core.deviceOwnership.OwnedByMe(id)
- }
- if owned {
- if txn.Acquired(timeout) {
- log.Debugw("acquired-transaction", log.Fields{"transaction-timeout": timeout})
- return txn, nil
- } else {
- return nil, errorTransactionNotAcquired
- }
- } else {
- if txn.Monitor(timeout) {
- log.Debugw("acquired-transaction-after-timeout", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
- return txn, nil
- } else {
- log.Debugw("transaction-completed-by-other", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
- return nil, errorTransactionNotAcquired
- }
- }
-}
-
-// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
+// waitForNilResponseOnSuccess is a helper function to wait for a response on channel monitorCh where an nil
// response is expected in a successful scenario
func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
select {
@@ -390,7 +368,7 @@
func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
log.Debug("ListLogicalDevices-request")
if handler.competeForTransaction() {
- if txn, err := handler.acquireRequest(ctx); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
return &voltha.LogicalDevices{}, err
} else {
defer txn.Close()
@@ -455,7 +433,7 @@
if handler.competeForTransaction() {
// There are no device Id present in this function.
- if txn, err := handler.acquireRequest(ctx); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
return &voltha.Device{}, err
} else {
defer txn.Close()
@@ -557,9 +535,11 @@
if handler.competeForTransaction() {
if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
- if err == errorTransactionNotAcquired && !handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}) {
- // Remove the device in memory
- handler.deviceMgr.stopManagingDevice(id.Id)
+ if err == errorTransactionNotAcquired {
+ if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}); !ownedByMe && err == nil {
+ // Remove the device in memory
+ handler.deviceMgr.stopManagingDevice(id.Id)
+ }
}
return new(empty.Empty), err
} else {
@@ -808,7 +788,7 @@
//TODO: Update this logic once the OF Controller (OFAgent in this case) can include a transaction Id in its
// request. For performance reason we can let both Cores in a Core-Pair forward the Packet to the adapters and
// let once of the shim layer (kafka proxy or adapter request handler filters out the duplicate packet)
- if handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: packet.Id}) {
+ if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: packet.Id}); ownedByMe && err == nil {
agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
agent.packetOut(packet.PacketOut)
}