[VOL-1997] Remove transaction timeout for a non-active rw_core
This commit cleans up the transaction processing between two
cores in a pair. It prevents the core not processing the request
to grab the request based on a timeout only.
Since this update heavily relies on the etcd mechanism then customized
local tests (not unit as could not find a full-featured etcd mock)
were run against it as well as some basic manual tests with
kind-voltha.
There is a TODO item in this commit to implement a peer-probe
mechanism to guarantee that a core in a pair has actually died
before a switch over is done.
Minor updates after first review.
Comments updates after second review
Change-Id: Ifc1442471595a979b39251535b8ee9210e1a52df
(cherry picked from commit cc40904e208892dea8e1a2a73b52e6465d3c6d59)
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index bd84bdd..5247247 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -60,22 +60,6 @@
return &proxy
}
-func (rhp *AdapterRequestHandlerProxy) acquireRequest(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
- timeout := rhp.defaultRequestTimeout
- if len(maxTimeout) > 0 {
- timeout = maxTimeout[0]
- }
- txn := NewKVTransaction(transactionId)
- if txn == nil {
- return nil, errors.New("fail-to-create-transaction")
- } else if txn.Acquired(timeout) {
- log.Debugw("acquired-request", log.Fields{"xtrnsId": transactionId})
- return txn, nil
- } else {
- return nil, errorTransactionNotAcquired
- }
-}
-
// This is a helper function that attempts to acquire the request by using the device ownership model
func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionId string, devId string, maxTimeout ...int64) (*KVTransaction, error) {
timeout := rhp.defaultRequestTimeout
@@ -87,22 +71,24 @@
return nil, errors.New("fail-to-create-transaction")
}
- if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}) {
- log.Debugw("owned-by-me", log.Fields{"Id": devId})
- if txn.Acquired(timeout) {
- log.Debugw("processing-request", log.Fields{"Id": devId})
- return txn, nil
- } else {
- return nil, errorTransactionNotAcquired
+ var acquired bool
+ var err error
+ if devId != "" {
+ var ownedByMe bool
+ if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}); err != nil {
+ log.Warnw("getting-ownership-failed", log.Fields{"deviceId": devId, "error": err})
+ return nil, kafka.ErrorTransactionInvalidId
}
+ acquired, err = txn.Acquired(timeout, ownedByMe)
} else {
- log.Debugw("not-owned-by-me", log.Fields{"Id": devId})
- if txn.Monitor(timeout) {
- log.Debugw("timeout-processing-request", log.Fields{"Id": devId})
- return txn, nil
- } else {
- return nil, errorTransactionNotAcquired
- }
+ acquired, err = txn.Acquired(timeout)
+ }
+ if err == nil && acquired {
+ log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnId})
+ return txn, nil
+ } else {
+ log.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnId, "error": err})
+ return nil, kafka.ErrorTransactionNotAcquired
}
}
@@ -120,7 +106,7 @@
}
adapter := &voltha.Adapter{}
deviceTypes := &voltha.DeviceTypes{}
- transactionID := &ic.StrType{}
+ transactionId := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "adapter":
@@ -134,22 +120,23 @@
return nil, err
}
case kafka.TransactionKey:
- if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ if err := ptypes.UnmarshalAny(arg.Value, transactionId); err != nil {
log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreId": rhp.coreInstanceId})
+ log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionId": transactionId.Val, "coreId": rhp.coreInstanceId})
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // Update our adapters in memory
- go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ if txn, err := rhp.takeRequestOwnership(transactionId.Val, ""); err != nil {
+ if err.Error() == kafka.ErrorTransactionNotAcquired.Error() {
+ log.Debugw("Another core handled the request", log.Fields{"transactionId": transactionId})
+ // Update our adapters in memory
+ go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
+ }
+ return nil, err
} else {
defer txn.Close()
}
@@ -189,9 +176,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -238,9 +224,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -300,9 +285,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -342,8 +326,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, proxyAddress.DeviceId); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -390,6 +374,16 @@
allPorts.Items = append(allPorts.Items, aPort)
return allPorts, nil
}
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
+ } else {
+ defer txn.Close()
+ }
+ }
+
return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
}
@@ -421,9 +415,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -505,9 +498,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -565,9 +557,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -580,10 +571,6 @@
go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
voltha.ConnectStatus_ConnectStatus(connStatus.Val))
- //if err := rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
- // voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
- // return nil, err
- //}
return new(empty.Empty), nil
}
@@ -627,9 +614,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -683,9 +669,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -746,9 +731,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -795,9 +779,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -839,9 +822,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -883,9 +865,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -936,9 +917,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -980,9 +960,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -1039,8 +1018,8 @@
// duplicates.
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -1087,9 +1066,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -1132,9 +1110,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}