VOL-1512: Set device Active ownership per Core in a Core pair
- Changed NB & SB APIs to seize requests based on device ownership
- Added queue support for change-events
- Need to make prefix & timeout for the device ownership key configurable,
currently hard-coded
- Need to make KV Transaction Monitor timeout configurable,
currently hard-coded
- Need to clean up AdapterRequestHandlerProxy & LogicalDeviceManager
constructors
Change-Id: Ieeb3df6d70baa529b87c8253cb9f0f5b2a94382a
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index f5295c4..3d9487f 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -40,12 +40,14 @@
defaultRequestTimeout int64
longRunningRequestTimeout int64
coreInCompetingMode bool
+ core *Core
}
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+func NewAdapterRequestHandlerProxy(core *Core, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
var proxy AdapterRequestHandlerProxy
+ proxy.core = core
proxy.coreInstanceId = coreInstanceId
proxy.deviceMgr = dMgr
proxy.lDeviceMgr = ldMgr
@@ -58,7 +60,7 @@
return &proxy
}
-func (rhp *AdapterRequestHandlerProxy) acquireTransaction(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
+func (rhp *AdapterRequestHandlerProxy) acquireRequest(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
timeout := rhp.defaultRequestTimeout
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
@@ -74,6 +76,33 @@
}
}
+// This is a helper function that attempts to acquire the request by using the device ownership model
+func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionId string, devId string, maxTimeout ...int64) (*KVTransaction, error) {
+ timeout := rhp.defaultRequestTimeout
+ if len(maxTimeout) > 0 {
+ timeout = maxTimeout[0]
+ }
+ log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+ txn := NewKVTransaction(transactionId)
+ if txn == nil {
+ return nil, errors.New("fail-to-create-transaction")
+ }
+
+ if rhp.core.deviceOwnership.OwnedByMe(devId) {
+ if txn.Acquired(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("failed-to-seize-request")
+ }
+ } else {
+ if txn.Monitor(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("device-not-owned")
+ }
+ }
+}
+
// competeForTransaction is a helper function to determine whether every request needs to compete with another
// Core to execute the request
func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
@@ -112,7 +141,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// Update our adapters in memory
go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory()
@@ -156,7 +185,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -224,7 +253,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -294,7 +323,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -336,7 +365,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -416,7 +445,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -500,7 +529,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -557,7 +586,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -616,7 +645,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -682,7 +711,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -733,7 +762,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -785,7 +814,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -887,7 +916,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil