VOL-2658, VOL-2840 - remove core compete code

Change-Id: Ic34d030bc805c6948369c65db6f77f9739320570
(cherry picked from commit fd27f4b45b3c8d89174a8da7192598b768b4907d)
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 3c1c5d0..9ea44a1 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -19,15 +19,15 @@
 import (
 	"context"
 	"errors"
+	"time"
+
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
-	"github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"time"
 )
 
 // AdapterRequestHandlerProxy represent adapter request handler proxy attributes
@@ -40,13 +40,12 @@
 	clusterDataProxy          *model.Proxy
 	defaultRequestTimeout     time.Duration
 	longRunningRequestTimeout time.Duration
-	coreInCompetingMode       bool
 	core                      *Core
 }
 
 // NewAdapterRequestHandlerProxy assigns values for adapter request handler proxy attributes and returns the new instance
 func NewAdapterRequestHandlerProxy(core *Core, coreInstanceID string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
-	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout time.Duration,
+	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, longRunningRequestTimeout time.Duration,
 	defaultRequestTimeout time.Duration) *AdapterRequestHandlerProxy {
 	var proxy AdapterRequestHandlerProxy
 	proxy.core = core
@@ -56,50 +55,11 @@
 	proxy.clusterDataProxy = cdProxy
 	proxy.localDataProxy = ldProxy
 	proxy.adapterMgr = aMgr
-	proxy.coreInCompetingMode = incompetingMode
 	proxy.defaultRequestTimeout = defaultRequestTimeout
 	proxy.longRunningRequestTimeout = longRunningRequestTimeout
 	return &proxy
 }
 
-// This is a helper function that attempts to acquire the request by using the device ownership model
-func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(ctx context.Context, transactionID string, devID string, maxTimeout ...int64) (*KVTransaction, error) {
-	timeout := rhp.defaultRequestTimeout.Milliseconds()
-	if len(maxTimeout) > 0 {
-		timeout = maxTimeout[0]
-	}
-	txn := NewKVTransaction(transactionID)
-	if txn == nil {
-		return nil, errors.New("fail-to-create-transaction")
-	}
-
-	var acquired bool
-	var err error
-	if devID != "" {
-		var ownedByMe bool
-		if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: devID}); err != nil {
-			logger.Warnw("getting-ownership-failed", log.Fields{"deviceId": devID, "error": err})
-			return nil, kafka.ErrorTransactionInvalidId
-		}
-		acquired, err = txn.Acquired(ctx, timeout, ownedByMe)
-	} else {
-		acquired, err = txn.Acquired(ctx, timeout)
-	}
-	if err == nil && acquired {
-		logger.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnID})
-		return txn, nil
-	}
-	logger.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnID, "error": err})
-	return nil, kafka.ErrorTransactionNotAcquired
-}
-
-// competeForTransaction is a helper function to determine whether every request needs to compete with another
-// Core to execute the request
-func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
-	return rhp.coreInCompetingMode
-}
-
-// Register registers the adapter
 func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
 	if len(args) < 3 {
 		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -130,19 +90,6 @@
 	}
 	logger.Debugw("Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val, "core-id": rhp.coreInstanceID})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, "")
-		if err != nil {
-			if err.Error() == kafka.ErrorTransactionNotAcquired.Error() {
-				logger.Debugw("Another core handled the request", log.Fields{"transactionId": transactionID})
-				// Update our adapters in memory
-				go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(context.TODO(), adapter)
-			}
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
 	return rhp.adapterMgr.registerAdapter(adapter, deviceTypes)
 }
 
@@ -172,16 +119,6 @@
 	}
 	logger.Debugw("GetDevice", log.Fields{"deviceID": pID.Id, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	// Get the device via the device manager
 	device, err := rhp.deviceMgr.GetDevice(context.TODO(), pID.Id)
 	if err != nil {
@@ -216,18 +153,6 @@
 	}
 	logger.Debugw("DeviceUpdate", log.Fields{"deviceID": device.Id, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, device.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
-	logger.Debugw("DeviceUpdate got txn", log.Fields{"deviceID": device.Id, "transactionID": transactionID.Val})
-
 	if err := rhp.deviceMgr.updateDeviceUsingAdapterData(context.TODO(), device); err != nil {
 		logger.Debugw("unable-to-update-device-using-adapter-data", log.Fields{"error": err})
 		return nil, err
@@ -279,16 +204,6 @@
 	}
 	logger.Debugw("GetChildDevice", log.Fields{"parentDeviceID": pID.Id, "args": args, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	return rhp.deviceMgr.GetChildDevice(context.TODO(), pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
 }
 
@@ -318,16 +233,6 @@
 	}
 	logger.Debugw("GetChildDeviceWithProxyAddress", log.Fields{"proxyAddress": proxyAddress, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, proxyAddress.DeviceId)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	return rhp.deviceMgr.GetChildDeviceWithProxyAddress(context.TODO(), proxyAddress)
 }
 
@@ -362,16 +267,6 @@
 	}
 	logger.Debugw("GetPorts", log.Fields{"deviceID": deviceID.Id, "portype": pt.Val, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	return rhp.deviceMgr.getPorts(context.TODO(), deviceID.Id, voltha.Port_PortType(pt.Val))
 }
 
@@ -401,16 +296,6 @@
 	}
 	logger.Debugw("GetChildDevices", log.Fields{"deviceID": pID.Id, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	return rhp.deviceMgr.getAllChildDevices(context.TODO(), pID.Id)
 }
 
@@ -479,16 +364,6 @@
 		"deviceType": dt.Val, "channelID": chnlID.Val, "serialNumber": serialNumber.Val,
 		"vendorID": vendorID.Val, "onuID": onuID.Val, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	device, err := rhp.deviceMgr.childDeviceDetected(context.TODO(), pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
 	if err != nil {
 		logger.Debugw("child-detection-failed", log.Fields{"parentID": pID.Id, "onuID": onuID.Val, "error": err})
@@ -534,16 +409,6 @@
 	logger.Debugw("DeviceStateUpdate", log.Fields{"deviceID": deviceID.Id, "oper-status": operStatus,
 		"conn-status": connStatus, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.updateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
 		voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
 		logger.Debugw("unable-to-update-device-status", log.Fields{"error": err})
@@ -590,16 +455,6 @@
 	logger.Debugw("ChildrenStateUpdate", log.Fields{"deviceID": deviceID.Id, "oper-status": operStatus,
 		"conn-status": connStatus, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
 	if err := rhp.deviceMgr.updateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
 		voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
@@ -640,16 +495,6 @@
 	}
 	logger.Debugw("PortsStateUpdate", log.Fields{"deviceID": deviceID.Id, "operStatus": operStatus, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.updatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val)); err != nil {
 		logger.Debugw("unable-to-update-ports-state", log.Fields{"error": err})
 		return nil, err
@@ -701,16 +546,6 @@
 	logger.Debugw("PortStateUpdate", log.Fields{"deviceID": deviceID.Id, "operStatus": operStatus,
 		"portType": portType, "portNo": portNo, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.updatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
 		voltha.OperStatus_Types(operStatus.Val)); err != nil {
 		// If the error doesn't change behavior and is essentially ignored, it is not an error, it is a
@@ -747,16 +582,6 @@
 	}
 	logger.Debugw("DeleteAllPorts", log.Fields{"deviceID": deviceID.Id, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.deleteAllPorts(context.TODO(), deviceID.Id); err != nil {
 		logger.Debugw("unable-to-delete-ports", log.Fields{"error": err})
 		return nil, err
@@ -790,16 +615,6 @@
 	}
 	logger.Debugw("ChildDevicesLost", log.Fields{"deviceID": parentDeviceID.Id, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.childDevicesLost(context.TODO(), parentDeviceID.Id); err != nil {
 		logger.Debugw("unable-to-disable-child-devices", log.Fields{"error": err})
 		return nil, err
@@ -833,16 +648,6 @@
 	}
 	logger.Debugw("ChildDevicesDetected", log.Fields{"deviceID": parentDeviceID.Id, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.childDevicesDetected(context.TODO(), parentDeviceID.Id); err != nil {
 		logger.Debugw("child-devices-detection-failed", log.Fields{"parentID": parentDeviceID.Id, "error": err})
 		return nil, err
@@ -881,16 +686,6 @@
 	}
 	logger.Debugw("PortCreated", log.Fields{"deviceID": deviceID.Id, "port": port, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.addPort(context.TODO(), deviceID.Id, port); err != nil {
 		logger.Debugw("unable-to-add-port", log.Fields{"error": err})
 		return nil, err
@@ -924,16 +719,6 @@
 	logger.Debugw("DevicePMConfigUpdate", log.Fields{"deviceID": pmConfigs.Id, "configs": pmConfigs,
 		"transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pmConfigs.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.initPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs); err != nil {
 		logger.Debugw("unable-to-initialize-pm-configs", log.Fields{"error": err})
 		return nil, err
@@ -979,18 +764,6 @@
 	logger.Debugw("PacketIn", log.Fields{"deviceID": deviceID.Id, "port": portNo.Val, "packet": packet,
 		"transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	// TODO: If this adds too much latencies then needs to remove transaction and let OFAgent filter out
-	// duplicates.
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.PacketIn(context.TODO(), deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
 		logger.Debugw("unable-to-receive-packet-from-adapter", log.Fields{"error": err})
 		return nil, err
@@ -1031,16 +804,6 @@
 	logger.Debugw("UpdateImageDownload", log.Fields{"deviceID": deviceID.Id, "image-download": img,
 		"transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.updateImageDownload(context.TODO(), deviceID.Id, img); err != nil {
 		logger.Debugw("unable-to-update-image-download", log.Fields{"error": err})
 		return nil, err
@@ -1073,16 +836,6 @@
 	}
 	logger.Debugw("ReconcileChildDevices", log.Fields{"deviceID": parentDeviceID.Id, "transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
-		if err != nil {
-			logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.reconcileChildDevices(context.TODO(), parentDeviceID.Id); err != nil {
 		logger.Debugw("unable-to-reconcile-child-devices", log.Fields{"error": err})
 		return nil, err
@@ -1122,16 +875,6 @@
 	logger.Debugw("DeviceReasonUpdate", log.Fields{"deviceId": deviceID.Id, "reason": reason.Val,
 		"transactionID": transactionID.Val})
 
-	// Try to grab the transaction as this core may be competing with another Core
-	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
-		if err != nil {
-			logger.Debugw("DeviceReasonUpdate: Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
-			return nil, err
-		}
-		defer txn.Close(context.TODO())
-	}
-
 	if err := rhp.deviceMgr.updateDeviceReason(context.TODO(), deviceID.Id, reason.Val); err != nil {
 		logger.Debugw("unable-to-update-device-reason", log.Fields{"error": err})
 		return nil, err