VOL-1512: Set device Active ownership per Core in a Core pair
- Changed NB & SB APIs to seize requests based on device ownership
- Added queue support for change-events
- Need to make prefix & timeout for the device ownership key configurable,
  currently hard-coded
- Need to make KV Transaction Monitor timeout configurable,
  currently hard-coded
- Need to clean up AdapterRequestHandlerProxy & LogicalDeviceManager
  constructors

Change-Id: Ieeb3df6d70baa529b87c8253cb9f0f5b2a94382a
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index f5295c4..3d9487f 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -40,12 +40,14 @@
 	defaultRequestTimeout     int64
 	longRunningRequestTimeout int64
 	coreInCompetingMode       bool
+	core                      *Core
 }
 
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+func NewAdapterRequestHandlerProxy(core *Core, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
 	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
 	defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
 	var proxy AdapterRequestHandlerProxy
+	proxy.core = core
 	proxy.coreInstanceId = coreInstanceId
 	proxy.deviceMgr = dMgr
 	proxy.lDeviceMgr = ldMgr
@@ -58,7 +60,7 @@
 	return &proxy
 }
 
-func (rhp *AdapterRequestHandlerProxy) acquireTransaction(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
+func (rhp *AdapterRequestHandlerProxy) acquireRequest(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := rhp.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
@@ -74,6 +76,33 @@
 	}
 }
 
+// This is a helper function that attempts to acquire the request by using the device ownership model
+func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionId string, devId string, maxTimeout ...int64) (*KVTransaction, error) {
+	timeout := rhp.defaultRequestTimeout
+	if len(maxTimeout) > 0 {
+		timeout = maxTimeout[0]
+	}
+	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+	txn := NewKVTransaction(transactionId)
+	if txn == nil {
+		return nil, errors.New("fail-to-create-transaction")
+	}
+
+	if rhp.core.deviceOwnership.OwnedByMe(devId) {
+		if txn.Acquired(timeout) {
+			return txn, nil
+		} else {
+			return nil, errors.New("failed-to-seize-request")
+		}
+	} else {
+		if txn.Monitor(timeout) {
+			return txn, nil
+		} else {
+			return nil, errors.New("device-not-owned")
+		}
+	}
+}
+
 // competeForTransaction is a helper function to determine whether every request needs to compete with another
 // Core to execute the request
 func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
@@ -112,7 +141,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// Update our adapters in memory
 			go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory()
@@ -156,7 +185,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -224,7 +253,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -294,7 +323,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -336,7 +365,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -416,7 +445,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -500,7 +529,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -557,7 +586,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -616,7 +645,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -682,7 +711,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -733,7 +762,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -785,7 +814,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -887,7 +916,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil