VOL-2658, VOL-2840 - remove core compete code
Change-Id: Ic34d030bc805c6948369c65db6f77f9739320570
(cherry picked from commit fd27f4b45b3c8d89174a8da7192598b768b4907d)
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 9706cfb..712f51d 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -19,12 +19,13 @@
import (
"context"
"fmt"
- "github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"reflect"
"sync"
"time"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -218,57 +219,6 @@
}
}
-//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory(ctx context.Context, adapter *voltha.Adapter) {
- aMgr.lockAdaptersMap.Lock()
- defer aMgr.lockAdaptersMap.Unlock()
-
- if adapterAgent, ok := aMgr.adapterAgents[adapter.Id]; ok {
- if adapterAgent.getAdapter() != nil {
- // Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
- go func() {
- err := aMgr.deviceMgr.adapterRestarted(ctx, adapter)
- if err != nil {
- logger.Errorw("unable-to-restart-adapter", log.Fields{"error": err})
- }
- }()
- return
- }
- }
-
- // Update the adapters
- adaptersIf, err := aMgr.clusterDataProxy.List(ctx, "/adapters", 0, false, "")
- if err != nil {
- logger.Errorw("failed-to-list-adapters-from-cluster-proxy", log.Fields{"error": err})
- return
- }
- if adaptersIf != nil {
- for _, adapterIf := range adaptersIf.([]interface{}) {
- if adapter, ok := adapterIf.(*voltha.Adapter); ok {
- logger.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
- aMgr.updateAdapterWithoutLock(adapter)
- }
- }
- }
- aMgr.lockdDeviceTypeToAdapterMap.Lock()
- defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- // Update the device types
- deviceTypesIf, err := aMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, "")
- if err != nil {
- logger.Errorw("Failed-to-list-device-types-in-cluster-data-proxy", log.Fields{"error": err})
- return
- }
- if deviceTypesIf != nil {
- dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
- for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
- if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
- logger.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
- aMgr.updateDeviceTypeWithoutLock(dType)
- }
- }
- }
-}
-
func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 3c1c5d0..9ea44a1 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -19,15 +19,15 @@
import (
"context"
"errors"
+ "time"
+
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
- "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "time"
)
// AdapterRequestHandlerProxy represent adapter request handler proxy attributes
@@ -40,13 +40,12 @@
clusterDataProxy *model.Proxy
defaultRequestTimeout time.Duration
longRunningRequestTimeout time.Duration
- coreInCompetingMode bool
core *Core
}
// NewAdapterRequestHandlerProxy assigns values for adapter request handler proxy attributes and returns the new instance
func NewAdapterRequestHandlerProxy(core *Core, coreInstanceID string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
- aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout time.Duration,
+ aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, longRunningRequestTimeout time.Duration,
defaultRequestTimeout time.Duration) *AdapterRequestHandlerProxy {
var proxy AdapterRequestHandlerProxy
proxy.core = core
@@ -56,50 +55,11 @@
proxy.clusterDataProxy = cdProxy
proxy.localDataProxy = ldProxy
proxy.adapterMgr = aMgr
- proxy.coreInCompetingMode = incompetingMode
proxy.defaultRequestTimeout = defaultRequestTimeout
proxy.longRunningRequestTimeout = longRunningRequestTimeout
return &proxy
}
-// This is a helper function that attempts to acquire the request by using the device ownership model
-func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(ctx context.Context, transactionID string, devID string, maxTimeout ...int64) (*KVTransaction, error) {
- timeout := rhp.defaultRequestTimeout.Milliseconds()
- if len(maxTimeout) > 0 {
- timeout = maxTimeout[0]
- }
- txn := NewKVTransaction(transactionID)
- if txn == nil {
- return nil, errors.New("fail-to-create-transaction")
- }
-
- var acquired bool
- var err error
- if devID != "" {
- var ownedByMe bool
- if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: devID}); err != nil {
- logger.Warnw("getting-ownership-failed", log.Fields{"deviceId": devID, "error": err})
- return nil, kafka.ErrorTransactionInvalidId
- }
- acquired, err = txn.Acquired(ctx, timeout, ownedByMe)
- } else {
- acquired, err = txn.Acquired(ctx, timeout)
- }
- if err == nil && acquired {
- logger.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnID})
- return txn, nil
- }
- logger.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnID, "error": err})
- return nil, kafka.ErrorTransactionNotAcquired
-}
-
-// competeForTransaction is a helper function to determine whether every request needs to compete with another
-// Core to execute the request
-func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
- return rhp.coreInCompetingMode
-}
-
-// Register registers the adapter
func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
if len(args) < 3 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -130,19 +90,6 @@
}
logger.Debugw("Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val, "core-id": rhp.coreInstanceID})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, "")
- if err != nil {
- if err.Error() == kafka.ErrorTransactionNotAcquired.Error() {
- logger.Debugw("Another core handled the request", log.Fields{"transactionId": transactionID})
- // Update our adapters in memory
- go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(context.TODO(), adapter)
- }
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
return rhp.adapterMgr.registerAdapter(adapter, deviceTypes)
}
@@ -172,16 +119,6 @@
}
logger.Debugw("GetDevice", log.Fields{"deviceID": pID.Id, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
// Get the device via the device manager
device, err := rhp.deviceMgr.GetDevice(context.TODO(), pID.Id)
if err != nil {
@@ -216,18 +153,6 @@
}
logger.Debugw("DeviceUpdate", log.Fields{"deviceID": device.Id, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, device.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
- logger.Debugw("DeviceUpdate got txn", log.Fields{"deviceID": device.Id, "transactionID": transactionID.Val})
-
if err := rhp.deviceMgr.updateDeviceUsingAdapterData(context.TODO(), device); err != nil {
logger.Debugw("unable-to-update-device-using-adapter-data", log.Fields{"error": err})
return nil, err
@@ -279,16 +204,6 @@
}
logger.Debugw("GetChildDevice", log.Fields{"parentDeviceID": pID.Id, "args": args, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
return rhp.deviceMgr.GetChildDevice(context.TODO(), pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
}
@@ -318,16 +233,6 @@
}
logger.Debugw("GetChildDeviceWithProxyAddress", log.Fields{"proxyAddress": proxyAddress, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, proxyAddress.DeviceId)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
return rhp.deviceMgr.GetChildDeviceWithProxyAddress(context.TODO(), proxyAddress)
}
@@ -362,16 +267,6 @@
}
logger.Debugw("GetPorts", log.Fields{"deviceID": deviceID.Id, "portype": pt.Val, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
return rhp.deviceMgr.getPorts(context.TODO(), deviceID.Id, voltha.Port_PortType(pt.Val))
}
@@ -401,16 +296,6 @@
}
logger.Debugw("GetChildDevices", log.Fields{"deviceID": pID.Id, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
return rhp.deviceMgr.getAllChildDevices(context.TODO(), pID.Id)
}
@@ -479,16 +364,6 @@
"deviceType": dt.Val, "channelID": chnlID.Val, "serialNumber": serialNumber.Val,
"vendorID": vendorID.Val, "onuID": onuID.Val, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
device, err := rhp.deviceMgr.childDeviceDetected(context.TODO(), pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
if err != nil {
logger.Debugw("child-detection-failed", log.Fields{"parentID": pID.Id, "onuID": onuID.Val, "error": err})
@@ -534,16 +409,6 @@
logger.Debugw("DeviceStateUpdate", log.Fields{"deviceID": deviceID.Id, "oper-status": operStatus,
"conn-status": connStatus, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.updateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
logger.Debugw("unable-to-update-device-status", log.Fields{"error": err})
@@ -590,16 +455,6 @@
logger.Debugw("ChildrenStateUpdate", log.Fields{"deviceID": deviceID.Id, "oper-status": operStatus,
"conn-status": connStatus, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
if err := rhp.deviceMgr.updateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
@@ -640,16 +495,6 @@
}
logger.Debugw("PortsStateUpdate", log.Fields{"deviceID": deviceID.Id, "operStatus": operStatus, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.updatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val)); err != nil {
logger.Debugw("unable-to-update-ports-state", log.Fields{"error": err})
return nil, err
@@ -701,16 +546,6 @@
logger.Debugw("PortStateUpdate", log.Fields{"deviceID": deviceID.Id, "operStatus": operStatus,
"portType": portType, "portNo": portNo, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.updatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
voltha.OperStatus_Types(operStatus.Val)); err != nil {
// If the error doesn't change behavior and is essentially ignored, it is not an error, it is a
@@ -747,16 +582,6 @@
}
logger.Debugw("DeleteAllPorts", log.Fields{"deviceID": deviceID.Id, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.deleteAllPorts(context.TODO(), deviceID.Id); err != nil {
logger.Debugw("unable-to-delete-ports", log.Fields{"error": err})
return nil, err
@@ -790,16 +615,6 @@
}
logger.Debugw("ChildDevicesLost", log.Fields{"deviceID": parentDeviceID.Id, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.childDevicesLost(context.TODO(), parentDeviceID.Id); err != nil {
logger.Debugw("unable-to-disable-child-devices", log.Fields{"error": err})
return nil, err
@@ -833,16 +648,6 @@
}
logger.Debugw("ChildDevicesDetected", log.Fields{"deviceID": parentDeviceID.Id, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.childDevicesDetected(context.TODO(), parentDeviceID.Id); err != nil {
logger.Debugw("child-devices-detection-failed", log.Fields{"parentID": parentDeviceID.Id, "error": err})
return nil, err
@@ -881,16 +686,6 @@
}
logger.Debugw("PortCreated", log.Fields{"deviceID": deviceID.Id, "port": port, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.addPort(context.TODO(), deviceID.Id, port); err != nil {
logger.Debugw("unable-to-add-port", log.Fields{"error": err})
return nil, err
@@ -924,16 +719,6 @@
logger.Debugw("DevicePMConfigUpdate", log.Fields{"deviceID": pmConfigs.Id, "configs": pmConfigs,
"transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pmConfigs.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.initPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs); err != nil {
logger.Debugw("unable-to-initialize-pm-configs", log.Fields{"error": err})
return nil, err
@@ -979,18 +764,6 @@
logger.Debugw("PacketIn", log.Fields{"deviceID": deviceID.Id, "port": portNo.Val, "packet": packet,
"transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- // TODO: If this adds too much latencies then needs to remove transaction and let OFAgent filter out
- // duplicates.
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.PacketIn(context.TODO(), deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
logger.Debugw("unable-to-receive-packet-from-adapter", log.Fields{"error": err})
return nil, err
@@ -1031,16 +804,6 @@
logger.Debugw("UpdateImageDownload", log.Fields{"deviceID": deviceID.Id, "image-download": img,
"transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.updateImageDownload(context.TODO(), deviceID.Id, img); err != nil {
logger.Debugw("unable-to-update-image-download", log.Fields{"error": err})
return nil, err
@@ -1073,16 +836,6 @@
}
logger.Debugw("ReconcileChildDevices", log.Fields{"deviceID": parentDeviceID.Id, "transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
- if err != nil {
- logger.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.reconcileChildDevices(context.TODO(), parentDeviceID.Id); err != nil {
logger.Debugw("unable-to-reconcile-child-devices", log.Fields{"error": err})
return nil, err
@@ -1122,16 +875,6 @@
logger.Debugw("DeviceReasonUpdate", log.Fields{"deviceId": deviceID.Id, "reason": reason.Val,
"transactionID": transactionID.Val})
- // Try to grab the transaction as this core may be competing with another Core
- if rhp.competeForTransaction() {
- txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
- if err != nil {
- logger.Debugw("DeviceReasonUpdate: Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
- return nil, err
- }
- defer txn.Close(context.TODO())
- }
-
if err := rhp.deviceMgr.updateDeviceReason(context.TODO(), deviceID.Id, reason.Val); err != nil {
logger.Debugw("unable-to-update-device-reason", log.Fields{"error": err})
return nil, err
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index f2cf3da..9771619 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -103,14 +103,6 @@
if err != nil {
panic("no kv client")
}
- // Setup KV transaction context
- txnPrefix := cf.KVStoreDataPrefix + "/transactions/"
- if err = SetTransactionContext(coreInstanceID,
- txnPrefix,
- client,
- cf.KVStoreTimeout); err != nil {
- logger.Fatal("creating-transaction-context-failed")
- }
return client
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 57d00cf..69cd3c8 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -55,7 +55,6 @@
kvClient kvstore.Client
backend db.Backend
kafkaClient kafka.Client
- deviceOwnership *DeviceOwnership
}
// NewCore creates instance of rw core
@@ -151,10 +150,6 @@
go core.startAdapterManager(ctx)
go core.monitorKvstoreLiveness(ctx)
- // Setup device ownership context
- core.deviceOwnership = NewDeviceOwnership(core.instanceID, core.kvClient, core.deviceMgr, core.logicalDeviceMgr,
- "service/voltha/owns_device", 10)
-
logger.Info("core-services-started")
return nil
}
@@ -378,7 +373,7 @@
ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
) error {
requestProxy := NewAdapterRequestHandlerProxy(core, coreInstanceID, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
- core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+ core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
// Register the broadcast topic to handle any core-bound broadcast requests
if err := core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy); err != nil {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index e8b79a6..1a1b773 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -19,6 +19,11 @@
import (
"context"
"errors"
+ "reflect"
+ "runtime"
+ "sync"
+ "time"
+
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -29,10 +34,6 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "reflect"
- "runtime"
- "sync"
- "time"
)
// DeviceManager represent device manager attributes
@@ -128,13 +129,6 @@
if !ok {
return nil
}
- // Register this device for ownership tracking
- go func() {
- _, err = dMgr.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: deviceID})
- if err != nil {
- logger.Errorw("unable-to-find-core-instance-active-owns-this-device", log.Fields{"error": err})
- }
- }()
return agent.(*DeviceAgent)
}
//TODO: Change the return params to return an error as well
@@ -241,25 +235,13 @@
if dMgr.IsDeviceInCache(id) { // Proceed only if an agent is present for this device
if root, _ := dMgr.IsRootDevice(id); root {
// stop managing the logical device
- ldeviceID := dMgr.logicalDeviceMgr.stopManagingLogicalDeviceWithDeviceID(ctx, id)
- if ldeviceID != "" { // Can happen if logical device agent was already stopped
- err := dMgr.core.deviceOwnership.AbandonDevice(ldeviceID)
- if err != nil {
- logger.Errorw("unable-to-abandon-the-device", log.Fields{"error": err})
- }
- }
- // We do not need to stop the child devices as this is taken care by the state machine.
+ _ = dMgr.logicalDeviceMgr.stopManagingLogicalDeviceWithDeviceID(ctx, id)
}
if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
if err := agent.stop(ctx); err != nil {
logger.Warnw("unable-to-stop-device-agent", log.Fields{"device-id": agent.deviceID, "error": err})
}
dMgr.deleteDeviceAgentFromMap(agent)
- // Abandon the device ownership
- err := dMgr.core.deviceOwnership.AbandonDevice(id)
- if err != nil {
- logger.Warnw("unable-to-abandon-device", log.Fields{"error": err})
- }
}
}
}
@@ -438,10 +420,10 @@
if !device.(*voltha.Device).Root {
continue
}
- if hostPort != "" && hostPort == device.(*voltha.Device).GetHostAndPort() {
+ if hostPort != "" && hostPort == device.(*voltha.Device).GetHostAndPort() && device.(*voltha.Device).AdminState != voltha.AdminState_DELETED {
return true, nil
}
- if newDevice.MacAddress != "" && newDevice.MacAddress == device.(*voltha.Device).MacAddress {
+ if newDevice.MacAddress != "" && newDevice.MacAddress == device.(*voltha.Device).MacAddress && device.(*voltha.Device).AdminState != voltha.AdminState_DELETED {
return true, nil
}
}
@@ -627,14 +609,13 @@
logger.Debugw("adapter-restarted", log.Fields{"adapter": adapter.Id})
// Let's reconcile the device managed by this Core only
- rootDeviceIds := dMgr.core.deviceOwnership.GetAllDeviceIdsOwnedByMe()
- if len(rootDeviceIds) == 0 {
+ if len(dMgr.rootDevices) == 0 {
logger.Debugw("nothing-to-reconcile", log.Fields{"adapterId": adapter.Id})
return nil
}
responses := make([]utils.Response, 0)
- for _, rootDeviceID := range rootDeviceIds {
+ for rootDeviceID := range dMgr.rootDevices {
if rootDevice, _ := dMgr.getDeviceFromModel(ctx, rootDeviceID); rootDevice != nil {
if rootDevice.Adapter == adapter.Id {
if isOkToReconcile(rootDevice) {
@@ -1025,13 +1006,6 @@
}
dMgr.addDeviceAgentToMap(agent)
- // Since this Core has handled this request then it therefore owns this child device. Set the
- // ownership of this device to this Core
- _, err = dMgr.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: agent.deviceID})
- if err != nil {
- logger.Errorw("unable-to-find-core-instance-active-owns-this-device", log.Fields{"error": err})
- }
-
// Activate the child device
if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
go func() {
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
deleted file mode 100644
index 2389c78..0000000
--- a/rw_core/core/device_ownership.go
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package core
-
-import (
- "context"
- "fmt"
- "sync"
- "time"
-
- "github.com/opencord/voltha-go/rw_core/utils"
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- "github.com/opencord/voltha-protos/v3/go/voltha"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-type ownership struct {
- id string
- owned bool
- chnl chan int
-}
-
-// DeviceOwnership represent device ownership attributes
-type DeviceOwnership struct {
- instanceID string
- exitChannel chan int
- kvClient kvstore.Client
- reservationTimeout int64 // Duration in seconds
- ownershipPrefix string
- deviceMgr *DeviceManager
- logicalDeviceMgr *LogicalDeviceManager
- deviceMap map[string]*ownership
- deviceMapLock sync.RWMutex
- deviceToKeyMap map[string]string
- deviceToKeyMapLock sync.RWMutex
- ownershipLock sync.RWMutex
-}
-
-// NewDeviceOwnership creates device ownership instance
-func NewDeviceOwnership(id string, kvClient kvstore.Client, deviceMgr *DeviceManager, logicalDeviceMgr *LogicalDeviceManager, ownershipPrefix string, reservationTimeout int64) *DeviceOwnership {
- var deviceOwnership DeviceOwnership
- deviceOwnership.instanceID = id
- deviceOwnership.exitChannel = make(chan int, 1)
- deviceOwnership.kvClient = kvClient
- deviceOwnership.deviceMgr = deviceMgr
- deviceOwnership.logicalDeviceMgr = logicalDeviceMgr
- deviceOwnership.ownershipPrefix = ownershipPrefix
- deviceOwnership.reservationTimeout = reservationTimeout
- deviceOwnership.deviceMap = make(map[string]*ownership)
- deviceOwnership.deviceMapLock = sync.RWMutex{}
- deviceOwnership.deviceToKeyMap = make(map[string]string)
- deviceOwnership.deviceToKeyMapLock = sync.RWMutex{}
- deviceOwnership.ownershipLock = sync.RWMutex{}
- return &deviceOwnership
-}
-
-// Start starts device device ownership
-func (da *DeviceOwnership) Start(ctx context.Context) {
- logger.Info("starting-deviceOwnership", log.Fields{"instanceId": da.instanceID})
- logger.Info("deviceOwnership-started")
-}
-
-// Stop stops device ownership
-func (da *DeviceOwnership) Stop(ctx context.Context) {
- logger.Info("stopping-deviceOwnership")
- da.exitChannel <- 1
- // Need to flush all device reservations
- da.abandonAllDevices()
- logger.Info("deviceOwnership-stopped")
-}
-
-func (da *DeviceOwnership) tryToReserveKey(ctx context.Context, id string) bool {
- var currOwner string
- //Try to reserve the key
- kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
- value, err := da.kvClient.Reserve(ctx, kvKey, da.instanceID, da.reservationTimeout)
- if err != nil {
- logger.Errorw("error", log.Fields{"error": err, "id": id, "instanceId": da.instanceID})
- }
- if value != nil {
- if currOwner, err = kvstore.ToString(value); err != nil {
- logger.Error("unexpected-owner-type")
- }
- return currOwner == da.instanceID
- }
- return false
-}
-
-func (da *DeviceOwnership) renewReservation(ctx context.Context, id string) bool {
- // Try to reserve the key
- kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
- if err := da.kvClient.RenewReservation(ctx, kvKey); err != nil {
- logger.Errorw("reservation-renewal-error", log.Fields{"error": err, "instance": da.instanceID})
- return false
- }
- return true
-}
-
-func (da *DeviceOwnership) monitorOwnership(ctx context.Context, id string, chnl chan int) {
- logger.Debugw("start-device-monitoring", log.Fields{"id": id})
- op := "starting"
- exit := false
- ticker := time.NewTicker(time.Duration(da.reservationTimeout) / 3 * time.Second)
- for {
- select {
- case <-da.exitChannel:
- logger.Debugw("closing-monitoring", log.Fields{"Id": id})
- exit = true
- case <-ticker.C:
- logger.Debugw(fmt.Sprintf("%s-reservation", op), log.Fields{"Id": id})
- case <-chnl:
- logger.Debugw("closing-device-monitoring", log.Fields{"Id": id})
- exit = true
- }
- if exit {
- logger.Infow("exiting-device-monitoring", log.Fields{"Id": id})
- ticker.Stop()
- break
- }
- deviceOwned, ownedByMe := da.getOwnership(id)
- if deviceOwned && ownedByMe {
- // Device owned; renew reservation
- op = "renew"
- if da.renewReservation(ctx, id) {
- logger.Debugw("reservation-renewed", log.Fields{"id": id, "instanceId": da.instanceID})
- } else {
- logger.Debugw("reservation-not-renewed", log.Fields{"id": id, "instanceId": da.instanceID})
- }
- } else {
- // Device not owned or not owned by me; try to seize ownership
- op = "retry"
- if err := da.setOwnership(id, da.tryToReserveKey(ctx, id)); err != nil {
- logger.Errorw("unexpected-error", log.Fields{"error": err})
- }
- }
- }
- logger.Debugw("device-monitoring-stopped", log.Fields{"id": id})
-}
-
-func (da *DeviceOwnership) getOwnership(id string) (bool, bool) {
- da.deviceMapLock.RLock()
- defer da.deviceMapLock.RUnlock()
- if val, exist := da.deviceMap[id]; exist {
- return true, val.owned
- }
- return false, false
-}
-
-func (da *DeviceOwnership) setOwnership(id string, owner bool) error {
- da.deviceMapLock.Lock()
- defer da.deviceMapLock.Unlock()
- if _, exist := da.deviceMap[id]; exist {
- if da.deviceMap[id].owned != owner {
- logger.Debugw("ownership-changed", log.Fields{"Id": id, "owner": owner})
- }
- da.deviceMap[id].owned = owner
- return nil
- }
- return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
-}
-
-// GetAllDeviceIdsOwnedByMe returns all the deviceIds (root device Ids) that is managed by this Core
-func (da *DeviceOwnership) GetAllDeviceIdsOwnedByMe() []string {
- deviceIds := []string{}
- da.deviceMapLock.Lock()
- defer da.deviceMapLock.Unlock()
- for _, ownership := range da.deviceMap {
- if ownership.owned {
- deviceIds = append(deviceIds, ownership.id)
- }
- }
- return deviceIds
-}
-
-// OwnedByMe returns whether this Core instance active owns this device. This function will automatically
-// trigger the process to monitor the device and update the device ownership regularly.
-func (da *DeviceOwnership) OwnedByMe(ctx context.Context, id interface{}) (bool, error) {
- // Retrieve the ownership key based on the id
- var ownershipKey string
- var err error
- var idStr string
- var cache bool
- if ownershipKey, idStr, cache, err = da.getOwnershipKey(ctx, id); err != nil {
- logger.Warnw("no-ownershipkey", log.Fields{"error": err})
- return false, err
- }
-
- // Update the deviceToKey map, if not from cache
- if !cache {
- da.deviceToKeyMapLock.Lock()
- da.deviceToKeyMap[idStr] = ownershipKey
- da.deviceToKeyMapLock.Unlock()
- }
-
- // Add a lock to prevent creation of two separate monitoring routines for the same device. When a NB request for a
- // device not in memory is received this results in this function being called in rapid succession, once when
- // loading the device and once when handling the NB request.
- da.ownershipLock.Lock()
- defer da.ownershipLock.Unlock()
-
- deviceOwned, ownedByMe := da.getOwnership(ownershipKey)
- if deviceOwned {
- logger.Debugw("ownership", log.Fields{"Id": ownershipKey, "owned": ownedByMe})
- return ownedByMe, nil
- }
- // Not owned by me or maybe nobody else. Try to reserve it
- reservedByMe := da.tryToReserveKey(ctx, ownershipKey)
- myChnl := make(chan int)
-
- da.deviceMapLock.Lock()
- da.deviceMap[ownershipKey] = &ownership{
- id: ownershipKey,
- owned: reservedByMe,
- chnl: myChnl}
- da.deviceMapLock.Unlock()
-
- logger.Debugw("set-new-ownership", log.Fields{"Id": ownershipKey, "owned": reservedByMe})
- go da.monitorOwnership(context.Background(), ownershipKey, myChnl)
- return reservedByMe, nil
-}
-
-//AbandonDevice must be invoked whenever a device is deleted from the Core
-func (da *DeviceOwnership) AbandonDevice(id string) error {
- if id == "" {
- return status.Error(codes.FailedPrecondition, "id-nil")
- }
- da.deviceMapLock.Lock()
- defer da.deviceMapLock.Unlock()
- o, exist := da.deviceMap[id]
- if exist { // id is ownership key
- // Need to clean up all deviceToKeyMap entries using this device as key
- da.deviceToKeyMapLock.Lock()
- defer da.deviceToKeyMapLock.Unlock()
- for k, v := range da.deviceToKeyMap {
- if id == v {
- delete(da.deviceToKeyMap, k)
- }
- }
- // Remove the device reference from the deviceMap
- delete(da.deviceMap, id)
-
- // Stop the Go routine monitoring the device
- close(o.chnl)
- delete(da.deviceMap, id)
- logger.Debugw("abandoning-device", log.Fields{"Id": id})
- return nil
- }
- // id is not ownership key
- da.deleteDeviceKey(id)
- return nil
-}
-
-//abandonAllDevices must be invoked whenever a device is deleted from the Core
-func (da *DeviceOwnership) abandonAllDevices() {
- da.deviceMapLock.Lock()
- defer da.deviceMapLock.Unlock()
- da.deviceToKeyMapLock.Lock()
- defer da.deviceToKeyMapLock.Unlock()
- for k := range da.deviceToKeyMap {
- delete(da.deviceToKeyMap, k)
- }
- for _, val := range da.deviceMap {
- close(val.chnl)
- }
-}
-
-func (da *DeviceOwnership) deleteDeviceKey(id string) {
- da.deviceToKeyMapLock.Lock()
- defer da.deviceToKeyMapLock.Unlock()
- delete(da.deviceToKeyMap, id)
-}
-
-// getOwnershipKey returns the ownership key that the id param uses. Ownership key is the parent
-// device Id of a child device or the rootdevice of a logical device. This function also returns the
-// id in string format of the id param via the ref output as well as if the data was retrieved from cache
-func (da *DeviceOwnership) getOwnershipKey(ctx context.Context, id interface{}) (ownershipKey string, ref string, cached bool, err error) {
-
- if id == nil {
- return "", "", false, status.Error(codes.InvalidArgument, "nil-id")
- }
- da.deviceToKeyMapLock.RLock()
- defer da.deviceToKeyMapLock.RUnlock()
- var device *voltha.Device
- var lDevice *voltha.LogicalDevice
- // The id can either be a device Id or a logical device id.
- if dID, ok := id.(*utils.DeviceID); ok {
- // Use cache if present
- if val, exist := da.deviceToKeyMap[dID.ID]; exist {
- return val, dID.ID, true, nil
- }
- if device, _ = da.deviceMgr.GetDevice(ctx, dID.ID); device == nil {
- return "", dID.ID, false, status.Errorf(codes.NotFound, "id-absent-%s", dID)
- }
- if device.Root {
- return device.Id, dID.ID, false, nil
- }
- return device.ParentId, dID.ID, false, nil
- } else if ldID, ok := id.(*utils.LogicalDeviceID); ok {
- // Use cache if present
- if val, exist := da.deviceToKeyMap[ldID.ID]; exist {
- return val, ldID.ID, true, nil
- }
- if lDevice, _ = da.logicalDeviceMgr.getLogicalDevice(ctx, ldID.ID); lDevice == nil {
- return "", ldID.ID, false, status.Errorf(codes.NotFound, "id-absent-%s", dID)
- }
- return lDevice.RootDeviceId, ldID.ID, false, nil
- }
- return "", "", false, status.Error(codes.NotFound, fmt.Sprintf("id-%v", id))
-}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index f4786aa..30ea5ed 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -27,7 +27,6 @@
"github.com/golang/protobuf/ptypes/empty"
da "github.com/opencord/voltha-go/common/core/northbound/grpc"
- "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-lib-go/v3/pkg/version"
"github.com/opencord/voltha-protos/v3/go/common"
@@ -35,12 +34,9 @@
"github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
-var errorIDNotFound = status.Error(codes.NotFound, "id-not-found")
-
// Image related constants
const (
ImageDownload = iota
@@ -83,90 +79,6 @@
return handler
}
-// isTestMode is a helper function to determine a function is invoked for testing only
-func isTestMode(ctx context.Context) bool {
- md, _ := metadata.FromIncomingContext(ctx)
- _, exist := md[common.TestModeKeys_api_test.String()]
- return exist
-}
-
-// This function attempts to extract the serial number from the request metadata
-// and create a KV transaction for that serial number for the current core.
-func (handler *APIHandler) createKvTransaction(ctx context.Context) (*KVTransaction, error) {
- var (
- err error
- ok bool
- md metadata.MD
- serNum []string
- )
- if md, ok = metadata.FromIncomingContext(ctx); !ok {
- err = errors.New("metadata-not-found")
- } else if serNum, ok = md["voltha_serial_number"]; !ok {
- err = errors.New("serial-number-not-found")
- }
- if !ok || serNum == nil {
- logger.Error(err)
- return nil, err
- }
- // Create KV transaction
- txn := NewKVTransaction(serNum[0])
- return txn, nil
-}
-
-// isOFControllerRequest is a helper function to determine if a request was initiated
-// from the OpenFlow controller (or its proxy, e.g. OFAgent)
-func (handler *APIHandler) isOFControllerRequest(ctx context.Context) bool {
- if md, ok := metadata.FromIncomingContext(ctx); ok {
- // Metadata in context
- if _, ok = md[handler.core.config.CoreBindingKey]; ok {
- // OFAgent field in metadata
- logger.Debug("OFController-request")
- return true
- }
- }
- logger.Debug("not-OFController-request")
- return false
-}
-
-// competeForTransaction is a helper function to determine whether every request needs to compete with another
-// Core to execute the request
-func (handler *APIHandler) competeForTransaction() bool {
- return handler.coreInCompetingMode
-}
-
-// takeRequestOwnership creates a transaction in the dB for this request and handles the logic of transaction
-// acquisition. If the device is owned by this Core (in a core-pair) then acquire the transaction with a
-// timeout value (in the event this Core dies the transaction times out in the dB causing the other Core in the
-// core-pair to proceed with the it). If the device is not owned then this Core will just monitor the transaction
-// for potential timeouts.
-func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...time.Duration) (*KVTransaction, error) {
- timeout := handler.defaultRequestTimeout
- if len(maxTimeout) > 0 {
- timeout = maxTimeout[0]
- }
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
- return nil, err
- }
- var acquired bool
- if id != nil {
- var ownedByMe bool
- if ownedByMe, err = handler.core.deviceOwnership.OwnedByMe(ctx, id); err != nil {
- logger.Warnw("getting-ownership-failed", log.Fields{"deviceId": id, "error": err})
- return nil, errorIDNotFound
- }
- acquired, err = txn.Acquired(ctx, timeout.Milliseconds(), ownedByMe)
- } else {
- acquired, err = txn.Acquired(ctx, timeout.Milliseconds())
- }
- if err == nil && acquired {
- logger.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnID})
- return txn, nil
- }
- logger.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnID, "error": err})
- return nil, errorTransactionNotAcquired
-}
-
// waitForNilResponseOnSuccess is a helper function to wait for a response on channel monitorCh where an nil
// response is expected in a successful scenario
func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
@@ -205,30 +117,12 @@
func (handler *APIHandler) GetLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
logger.Debugw("GetLogicalDevicePort-request", log.Fields{"id": *id})
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: id.Id})
- if err != nil {
- return &voltha.LogicalPort{}, err
- }
- defer txn.Close(ctx)
- }
return handler.logicalDeviceMgr.getLogicalPort(ctx, id)
}
// EnableLogicalDevicePort enables logical device port
func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
logger.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: id.Id})
- if err != nil {
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
ch := make(chan interface{})
defer close(ch)
@@ -239,17 +133,6 @@
// DisableLogicalDevicePort disables logical device port
func (handler *APIHandler) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
logger.Debugw("DisableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: id.Id})
- if err != nil {
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
ch := make(chan interface{})
defer close(ch)
@@ -260,17 +143,6 @@
// UpdateLogicalDeviceFlowTable updates logical device flow table
func (handler *APIHandler) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
logger.Debugw("UpdateLogicalDeviceFlowTable-request", log.Fields{"flow": flow, "test": common.TestModeKeys_api_test.String()})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: flow.Id})
- if err != nil {
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
ch := make(chan interface{})
defer close(ch)
@@ -281,18 +153,6 @@
// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
func (handler *APIHandler) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
logger.Debugw("UpdateLogicalDeviceFlowGroupTable-request", log.Fields{"flow": flow, "test": common.TestModeKeys_api_test.String()})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: flow.Id})
- if err != nil {
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
-
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.updateGroupTable(ctx, flow.Id, flow.GroupMod, ch)
@@ -321,20 +181,12 @@
// ListDeviceIds returns the list of device ids managed by a voltha core
func (handler *APIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
logger.Debug("ListDeviceIDs")
- if isTestMode(ctx) {
- return &voltha.IDs{Items: make([]*voltha.ID, 0)}, nil
- }
return handler.deviceMgr.ListDeviceIds()
}
//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
func (handler *APIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
logger.Debug("ReconcileDevices")
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- // No need to grab a transaction as this request is core specific
ch := make(chan interface{})
defer close(ch)
@@ -345,31 +197,12 @@
// GetLogicalDevice provides a cloned most up to date logical device
func (handler *APIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
logger.Debugw("GetLogicalDevice-request", log.Fields{"id": id})
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: id.Id})
- if err != nil {
- return &voltha.LogicalDevice{}, err
- }
- defer txn.Close(ctx)
- }
return handler.logicalDeviceMgr.getLogicalDevice(ctx, id.Id)
}
// ListLogicalDevices returns the list of all logical devices
func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
logger.Debug("ListLogicalDevices-request")
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, nil)
- if err != nil {
- return &voltha.LogicalDevices{}, err
- }
- defer txn.Close(ctx)
- if handler.isOFControllerRequest(ctx) {
- // Since an OF controller is only interested in the set of logical devices managed by thgis Core then return
- // only logical devices managed/monitored by this Core.
- return handler.logicalDeviceMgr.listManagedLogicalDevices(ctx)
- }
- }
return handler.logicalDeviceMgr.listLogicalDevices(ctx)
}
@@ -382,39 +215,18 @@
// ListLogicalDeviceFlows returns the flows of logical device
func (handler *APIHandler) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
logger.Debugw("ListLogicalDeviceFlows", log.Fields{"id": *id})
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: id.Id})
- if err != nil {
- return &openflow_13.Flows{}, err
- }
- defer txn.Close(ctx)
- }
return handler.logicalDeviceMgr.ListLogicalDeviceFlows(ctx, id.Id)
}
// ListLogicalDeviceFlowGroups returns logical device flow groups
func (handler *APIHandler) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"id": *id})
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: id.Id})
- if err != nil {
- return &openflow_13.FlowGroups{}, err
- }
- defer txn.Close(ctx)
- }
return handler.logicalDeviceMgr.ListLogicalDeviceFlowGroups(ctx, id.Id)
}
// ListLogicalDevicePorts returns ports of logical device
func (handler *APIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: id.Id})
- if err != nil {
- return &voltha.LogicalPorts{}, err
- }
- defer txn.Close(ctx)
- }
return handler.logicalDeviceMgr.ListLogicalDevicePorts(ctx, id.Id)
}
@@ -425,18 +237,6 @@
return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
}
logger.Debugw("create-device", log.Fields{"device": *device})
- if isTestMode(ctx) {
- return &voltha.Device{Id: device.Id}, nil
- }
-
- if handler.competeForTransaction() {
- // There are no device Id present in this function.
- txn, err := handler.takeRequestOwnership(ctx, nil)
- if err != nil {
- return &voltha.Device{}, err
- }
- defer txn.Close(ctx)
- }
ch := make(chan interface{})
defer close(ch)
@@ -449,10 +249,6 @@
return nil, err
}
if d, ok := res.(*voltha.Device); ok {
- _, err := handler.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: d.Id})
- if err != nil {
- logger.Errorw("unable-to-find-core-instance-active-owns-this-device", log.Fields{"error": err})
- }
return d, nil
}
}
@@ -468,17 +264,6 @@
// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
func (handler *APIHandler) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
logger.Debugw("enabledevice", log.Fields{"id": id})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: id.Id}, handler.longRunningRequestTimeout)
- if err != nil {
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
ch := make(chan interface{})
defer close(ch)
@@ -489,17 +274,6 @@
// DisableDevice disables a device along with any child device it may have
func (handler *APIHandler) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
logger.Debugw("disabledevice-request", log.Fields{"id": id})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: id.Id})
- if err != nil {
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
ch := make(chan interface{})
defer close(ch)
@@ -510,17 +284,6 @@
//RebootDevice invoked the reboot API to the corresponding adapter
func (handler *APIHandler) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
logger.Debugw("rebootDevice-request", log.Fields{"id": id})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: id.Id})
- if err != nil {
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
ch := make(chan interface{})
defer close(ch)
@@ -531,23 +294,6 @@
// DeleteDevice removes a device from the data model
func (handler *APIHandler) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
logger.Debugw("deletedevice-request", log.Fields{"id": id})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: id.Id})
- if err != nil {
- if err == errorTransactionNotAcquired {
- if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: id.Id}); !ownedByMe && err == nil {
- // Remove the device in memory
- handler.deviceMgr.stopManagingDevice(ctx, id.Id)
- }
- }
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
ch := make(chan interface{})
defer close(ch)
@@ -558,14 +304,6 @@
// ListDevicePorts returns the ports details for a specific device entry
func (handler *APIHandler) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
logger.Debugw("listdeviceports-request", log.Fields{"id": id})
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: id.Id})
- if err != nil {
- return &voltha.Ports{}, err
- }
- defer txn.Close(ctx)
- }
-
device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
if err != nil {
return &voltha.Ports{}, err
@@ -578,13 +316,6 @@
// ListDeviceFlows returns the flow details for a specific device entry
func (handler *APIHandler) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
logger.Debugw("listdeviceflows-request", log.Fields{"id": id})
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: id.Id})
- if err != nil {
- return &openflow_13.Flows{}, err
- }
- defer txn.Close(ctx)
- }
device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
if err != nil {
@@ -660,18 +391,6 @@
// processImageRequest is a helper method to execute an image download request
func (handler *APIHandler) processImageRequest(ctx context.Context, img *voltha.ImageDownload, requestType int) (*common.OperationResp, error) {
logger.Debugw("processImageDownload", log.Fields{"img": *img, "requestType": requestType})
- if isTestMode(ctx) {
- resp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
- return resp, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: img.Id})
- if err != nil {
- return &common.OperationResp{}, err
- }
- defer txn.Close(ctx)
- }
failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
@@ -711,10 +430,6 @@
// DownloadImage execute an image download request
func (handler *APIHandler) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
logger.Debugw("DownloadImage-request", log.Fields{"img": *img})
- if isTestMode(ctx) {
- resp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
- return resp, nil
- }
return handler.processImageRequest(ctx, img, ImageDownload)
}
@@ -722,53 +437,27 @@
// CancelImageDownload cancels image download request
func (handler *APIHandler) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
logger.Debugw("cancelImageDownload-request", log.Fields{"img": *img})
- if isTestMode(ctx) {
- resp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
- return resp, nil
- }
return handler.processImageRequest(ctx, img, CancelImageDownload)
}
// ActivateImageUpdate activates image update request
func (handler *APIHandler) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
logger.Debugw("activateImageUpdate-request", log.Fields{"img": *img})
- if isTestMode(ctx) {
- resp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
- return resp, nil
- }
-
return handler.processImageRequest(ctx, img, ActivateImage)
}
// RevertImageUpdate reverts image update
func (handler *APIHandler) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
logger.Debugw("revertImageUpdate-request", log.Fields{"img": *img})
- if isTestMode(ctx) {
- resp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
- return resp, nil
- }
-
return handler.processImageRequest(ctx, img, RevertImage)
}
// GetImageDownloadStatus returns status of image download
func (handler *APIHandler) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
logger.Debugw("getImageDownloadStatus-request", log.Fields{"img": *img})
- if isTestMode(ctx) {
- resp := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_SUCCEEDED}
- return resp, nil
- }
failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: img.Id})
- if err != nil {
- return failedresponse, err
- }
- defer txn.Close(ctx)
- }
-
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.getImageDownloadStatus(ctx, img, ch)
@@ -794,10 +483,6 @@
// GetImageDownload returns image download
func (handler *APIHandler) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
logger.Debugw("GetImageDownload-request", log.Fields{"img": *img})
- if isTestMode(ctx) {
- resp := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_SUCCEEDED}
- return resp, nil
- }
download, err := handler.deviceMgr.getImageDownload(ctx, img)
if err != nil {
@@ -809,10 +494,6 @@
// ListImageDownloads returns image downloads
func (handler *APIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
logger.Debugw("ListImageDownloads-request", log.Fields{"deviceId": id.Id})
- if isTestMode(ctx) {
- resp := &voltha.ImageDownloads{Items: []*voltha.ImageDownload{}}
- return resp, nil
- }
downloads, err := handler.deviceMgr.listImageDownloads(ctx, id.Id)
if err != nil {
@@ -839,16 +520,6 @@
// UpdateDevicePmConfigs updates the PM configs
func (handler *APIHandler) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
logger.Debugw("UpdateDevicePmConfigs-request", log.Fields{"configs": *configs})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: configs.Id})
- if err != nil {
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
ch := make(chan interface{})
defer close(ch)
@@ -859,13 +530,6 @@
// ListDevicePmConfigs returns pm configs of device
func (handler *APIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
logger.Debugw("ListDevicePmConfigs-request", log.Fields{"deviceId": *id})
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: id.Id})
- if err != nil {
- return &voltha.PmConfigs{}, err
- }
- defer txn.Close(ctx)
- }
return handler.deviceMgr.listPmConfigs(ctx, id.Id)
}
@@ -898,10 +562,6 @@
func (handler *APIHandler) SelfTest(ctx context.Context, id *voltha.ID) (*voltha.SelfTestResponse, error) {
logger.Debugw("SelfTest-request", log.Fields{"id": id})
- if isTestMode(ctx) {
- resp := &voltha.SelfTestResponse{Result: voltha.SelfTestResponse_SUCCESS}
- return resp, nil
- }
return &voltha.SelfTestResponse{}, errors.New("UnImplemented")
}
@@ -910,12 +570,10 @@
//TODO: Update this logic once the OF Controller (OFAgent in this case) can include a transaction Id in its
// request. For performance reason we can let both Cores in a Core-Pair forward the Packet to the adapters and
// let once of the shim layer (kafka proxy or adapter request handler filters out the duplicate packet)
- if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(ctx, &utils.LogicalDeviceID{ID: packet.Id}); ownedByMe && err == nil {
- if agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(ctx, packet.Id); agent != nil {
- agent.packetOut(ctx, packet.PacketOut)
- } else {
- logger.Errorf("No logical device agent present", log.Fields{"logicaldeviceID": packet.Id})
- }
+ if agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(ctx, packet.Id); agent != nil {
+ agent.packetOut(ctx, packet.PacketOut)
+ } else {
+ logger.Errorf("No logical device agent present", log.Fields{"logicaldeviceID": packet.Id})
}
}
@@ -1095,13 +753,6 @@
func (handler *APIHandler) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
logger.Debugw("ListLogicalDeviceMeters", log.Fields{"id": *id})
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: id.Id})
- if err != nil {
- return &openflow_13.Meters{}, err // TODO: Return empty meter entry
- }
- defer txn.Close(ctx)
- }
return handler.logicalDeviceMgr.ListLogicalDeviceMeters(ctx, id.Id)
}
@@ -1124,19 +775,6 @@
) (*common.OperationResp, error) {
logger.Debugw("SimulateAlarm-request", log.Fields{"id": in.Id})
successResp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
- if isTestMode(ctx) {
- return successResp, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: in.Id}, handler.longRunningRequestTimeout)
- if err != nil {
- failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
- return failedresponse, err
- }
- defer txn.Close(ctx)
- }
-
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.simulateAlarm(ctx, in, ch)
@@ -1147,18 +785,6 @@
func (handler *APIHandler) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
logger.Debugw("UpdateLogicalDeviceMeterTable-request",
log.Fields{"meter": meter, "test": common.TestModeKeys_api_test.String()})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{ID: meter.Id})
- if err != nil {
- return &empty.Empty{}, err
- }
- defer txn.Close(ctx)
- }
-
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.updateMeterTable(ctx, meter.Id, meter.MeterMod, ch)
@@ -1177,18 +803,6 @@
func (handler *APIHandler) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
logger.Debugw("EnablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: port.DeviceId})
- if err != nil {
- return nil, err
- }
- defer txn.Close(ctx)
- }
-
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.enablePort(ctx, port, ch)
@@ -1198,18 +812,6 @@
func (handler *APIHandler) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
logger.Debugw("DisablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
- if isTestMode(ctx) {
- return &empty.Empty{}, nil
- }
-
- if handler.competeForTransaction() {
- txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: port.DeviceId})
- if err != nil {
- return nil, err
- }
- defer txn.Close(ctx)
- }
-
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.disablePort(ctx, port, ch)
diff --git a/rw_core/core/id.go b/rw_core/core/id.go
index 399bf1d..cb87377 100644
--- a/rw_core/core/id.go
+++ b/rw_core/core/id.go
@@ -17,41 +17,28 @@
package core
import (
- "crypto/rand"
- "encoding/hex"
"errors"
"fmt"
- m "math/rand"
+ "math/rand"
"strconv"
+
+ "github.com/google/uuid"
)
-func randomHex(n int) (string, error) {
- bytes := make([]byte, n)
- if _, err := rand.Read(bytes); err != nil {
- return "", err
- }
- return hex.EncodeToString(bytes), nil
-}
-
-// CreateDeviceID produces a device ID. DeviceId is 16 hex long - lower 12 hex is the device id.
-// TODO: A cluster unique ID may be required
+// CreateDeviceID produces a device ID. The device ID is a UUID
func CreateDeviceID() string {
- val, _ := randomHex(12)
- return val
+ return uuid.New().String()
}
-// CreateLogicalDeviceID is not used for now as the logical device ID is derived from the
-// OLT MAC address
+// CreateLogicalDeviceID produces a logical device ID. The logical device ID is a UUID
func CreateLogicalDeviceID() string {
- // logical device id is 16 hex long - lower 12 hex is the logical device id. For now just generate the 12 hex
- val, _ := randomHex(12)
- return val
+ return uuid.New().String()
}
// CreateLogicalPortID produces a random port ID for a logical device.
func CreateLogicalPortID() uint32 {
// A logical port is a uint32
- return m.Uint32()
+ return rand.Uint32()
}
// CreateDataPathID creates uint64 pathid from string pathid
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 743614a..9dc873b 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -21,6 +21,10 @@
"encoding/hex"
"errors"
"fmt"
+ "reflect"
+ "sync"
+ "time"
+
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
@@ -33,9 +37,6 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "reflect"
- "sync"
- "time"
)
const (
@@ -45,6 +46,7 @@
// LogicalDeviceAgent represent attributes of logical device agent
type LogicalDeviceAgent struct {
logicalDeviceID string
+ serialNumber string
rootDeviceID string
deviceMgr *DeviceManager
ldeviceMgr *LogicalDeviceManager
@@ -67,12 +69,12 @@
stopOnce sync.Once
}
-func newLogicalDeviceAgent(id string, deviceID string, ldeviceMgr *LogicalDeviceManager,
- deviceMgr *DeviceManager,
- cdProxy *model.Proxy, timeout time.Duration) *LogicalDeviceAgent {
+func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalDeviceManager,
+ deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout time.Duration) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
agent.exitChannel = make(chan int, 1)
agent.logicalDeviceID = id
+ agent.serialNumber = sn
agent.rootDeviceID = deviceID
agent.deviceMgr = deviceMgr
agent.clusterDataProxy = cdProxy
@@ -81,7 +83,7 @@
agent.portProxies = make(map[string]*model.Proxy)
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
- agent.requestQueue = coreutils.NewRequestQueue(agent.logicalDeviceID, maxOrderedLogicalDeviceRequestQueueSize)
+ agent.requestQueue = coreutils.NewRequestQueue(agent.serialNumber, maxOrderedLogicalDeviceRequestQueueSize)
return &agent
}
@@ -118,7 +120,7 @@
// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
var datapathID uint64
- if datapathID, err = CreateDataPathID(agent.logicalDeviceID); err != nil {
+ if datapathID, err = CreateDataPathID(agent.serialNumber); err != nil {
return err
}
ld.DatapathId = datapathID
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 53e355e..3f40d6a 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -17,6 +17,11 @@
import (
"context"
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/config"
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
@@ -27,10 +32,6 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
- "math/rand"
- "sync"
- "testing"
- "time"
)
func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
@@ -484,7 +485,7 @@
clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
clonedLD.Id = com.GetRandomString(10)
clonedLD.DatapathId = rand.Uint64()
- lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
+ lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
lDeviceAgent.requestQueue.Start()
added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 070d4ef..af76258 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -19,6 +19,10 @@
import (
"context"
"errors"
+ "strings"
+ "sync"
+ "time"
+
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -27,9 +31,6 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "strings"
- "sync"
- "time"
)
// LogicalDeviceManager represent logical device manager attributes
@@ -132,20 +133,6 @@
return nil, status.Errorf(codes.NotFound, "%s", id)
}
-func (ldMgr *LogicalDeviceManager) listManagedLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
- logger.Debug("listManagedLogicalDevices")
- result := &voltha.LogicalDevices{}
- ldMgr.logicalDeviceAgents.Range(func(key, value interface{}) bool {
- agent := value.(*LogicalDeviceAgent)
- if ld, _ := agent.GetLogicalDevice(ctx); ld != nil {
- result.Items = append(result.Items, ld)
- }
- return true
- })
-
- return result, nil
-}
-
//listLogicalDevices returns the list of all logical devices
func (ldMgr *LogicalDeviceManager) listLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
logger.Debug("ListAllLogicalDevices")
@@ -174,15 +161,16 @@
// For now use the serial number - it may contain any combination of alphabetic characters and numbers,
// with length varying from eight characters to a maximum of 14 characters. Mac Address is part of oneof
// in the Device model. May need to be moved out.
- macAddress := device.MacAddress
- id := strings.Replace(macAddress, ":", "", -1)
+ id := CreateLogicalDeviceID()
+ sn := strings.Replace(device.MacAddress, ":", "", -1)
if id == "" {
- logger.Errorw("mac-address-not-set", log.Fields{"deviceId": device.Id})
+ logger.Errorw("mac-address-not-set", log.Fields{"deviceId": device.Id, "serial-number": sn})
return nil, errors.New("mac-address-not-set")
}
+
logger.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
- agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ agent := newLogicalDeviceAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
ldMgr.addLogicalDeviceAgentToMap(agent)
// Update the root device with the logical device Id reference
@@ -255,7 +243,7 @@
ldMgr.logicalDevicesLoadingLock.Unlock()
if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
logger.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
- agent := newLogicalDeviceAgent(lDeviceID, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ agent := newLogicalDeviceAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
if err := agent.start(ctx, true); err != nil {
return err
}
@@ -303,10 +291,6 @@
}
//Remove the logical device agent from the Map
ldMgr.deleteLogicalDeviceAgent(logDeviceID)
- err := ldMgr.core.deviceOwnership.AbandonDevice(logDeviceID)
- if err != nil {
- logger.Errorw("unable-to-abandon-the-device", log.Fields{"error": err})
- }
}
logger.Debug("deleting-logical-device-ends")
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
deleted file mode 100644
index a353ef2..0000000
--- a/rw_core/core/transaction.go
+++ /dev/null
@@ -1,353 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Two voltha cores receive the same request; each tries to acquire ownership of the request
- * by writing its identifier (e.g. container name or pod name) to the transaction key named
- * after the serial number of the request. The core that loses the race for acquisition
- * monitors the progress of the core actually serving the request by watching for changes
- * in the value of the transaction key. Once the request is complete, the
- * serving core closes the transaction by invoking the KVTransaction's Close method, which
- * replaces the value of the transaction (i.e. serial number) key with the string
- * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
- * and then deletes the transaction key.
- *
- */
-
-package core
-
-import (
- "context"
- "time"
-
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-// Transaction acquisition results
-const (
- UNKNOWN = iota
- SeizedBySelf
- CompletedByOther
- AbandonedByOther
- AbandonedWatchBySelf
-)
-
-var errorTransactionNotAcquired = status.Error(codes.Canceled, "transaction-not-acquired")
-
-// Transaction constant
-const (
- TransactionComplete = "TRANSACTION-COMPLETE"
-)
-
-// Transaction constants used to guarantee the Core processing a request hold on to the transaction until
-// it either completes it (either successfully or times out) or the Core itself crashes (
-// e.g. a server failure).
-// If a request has a timeout of x seconds then the Core processing the request will renew the transaction lease
-// every x/NUM_TXN_RENEWAL_PER_REQUEST seconds. After the Core completes the request it stops renewing the
-// transaction and sets the transaction value to TRANSACTION_COMPLETE. If the processing Core crashes then it
-// will not renew the transaction causing the KV store to delete the transaction after its renewal period. The
-// Core watching the transaction will then take over.
-// Since the MIN_TXN_RENEWAL_INTERVAL_IN_SEC is 3 seconds then for any transaction that completes within 3 seconds
-// there won't be a transaction renewal done.
-const (
- NumTxnRenewalPerRequest = 2
- MinTxnRenewalIntervalInSec = 3
- MinTxnReservationDurationInSec = 5
-)
-
-// TransactionContext represent transaction context attributes
-type TransactionContext struct {
- kvClient kvstore.Client
- kvOperationTimeout int
- owner string
- txnPrefix string
-}
-
-var ctx *TransactionContext
-
-var txnState = []string{
- "UNKNOWN",
- "SEIZED-BY-SELF",
- "COMPLETED-BY-OTHER",
- "ABANDONED-BY-OTHER",
- "ABANDONED_WATCH_BY_SELF"}
-
-// NewTransactionContext creates transaction context instance
-func NewTransactionContext(
- owner string,
- txnPrefix string,
- kvClient kvstore.Client,
- kvOpTimeout int) *TransactionContext {
-
- return &TransactionContext{
- owner: owner,
- txnPrefix: txnPrefix,
- kvClient: kvClient,
- kvOperationTimeout: kvOpTimeout}
-}
-
-/*
- * Before instantiating a KVTransaction, a TransactionContext must be created.
- * The parameters stored in the context govern the behavior of all KVTransaction
- * instances.
- *
- * :param owner: The owner (i.e. voltha core name) of a transaction
- * :param txnPrefix: The key prefix under which all transaction IDs, or serial numbers,
- * will be created (e.g. "service/voltha/transactions")
- * :param kvClient: The client API used for all interactions with the KV store. Currently
- * only the etcd client is supported.
- * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
- * used by this package
- */
-
-// SetTransactionContext creates new transaction context
-func SetTransactionContext(owner string,
- txnPrefix string,
- kvClient kvstore.Client,
- kvOpTimeout int) error {
-
- ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout)
- return nil
-}
-
-// KVTransaction represent KV transaction attributes
-type KVTransaction struct {
- monitorCh chan int
- txnID string
- txnKey string
-}
-
-/*
- * A KVTransaction constructor
- *
- * :param txnId: The serial number of a voltha request.
- * :return: A KVTransaction instance
- */
-
-// NewKVTransaction creates KV transaction instance
-func NewKVTransaction(txnID string) *KVTransaction {
- return &KVTransaction{
- txnID: txnID,
- txnKey: ctx.txnPrefix + txnID}
-}
-
-/*
- * Acquired is invoked by a Core, upon reception of a request, to reserve the transaction key in the KV store. The
- * request may be resource specific (i.e will include an ID for that resource) or may be generic (i.e. list a set of
- * resources). If the request is resource specific then this function should be invoked with the ownedByMe flag to
- * indicate whether this Core owns this resource. In the case where this Core owns this resource or it is a generic
- * request then we will proceed to reserve the transaction key in the KV store for a minimum time specified by the
- * minDuration param. If the reservation request fails (i.e. the other Core got the reservation before this one - this
- * can happen only for generic request) then the Core will start to watch for changes to the key to determine
- * whether the other Core completed the transaction successfully or the Core just died. If the Core does not own the
- * resource then we will proceed to watch the transaction key.
- *
- * :param minDuration: minimum time to reserve the transaction key in the KV store
- * :param ownedByMe: specify whether the request is about a resource owned or not. If it's absent then this is a
- * generic request that has no specific resource ID (e.g. list)
- *
- * :return: A boolean specifying whether the resource was acquired. An error is return in case this function is invoked
- * for a resource that is nonexistent.
- */
-
-// Acquired aquires transaction status
-func (c *KVTransaction) Acquired(ctx context.Context, minDuration int64, ownedByMe ...bool) (bool, error) {
- var acquired bool
- var currOwner string
- var res int
-
- // Convert milliseconds to seconds, rounding up
- // The reservation TTL is specified in seconds
- durationInSecs := minDuration / 1000
- if remainder := minDuration % 1000; remainder > 0 {
- durationInSecs++
- }
- if durationInSecs < int64(MinTxnReservationDurationInSec) {
- durationInSecs = int64(MinTxnReservationDurationInSec)
- }
- genericRequest := true
- resourceOwned := false
- if len(ownedByMe) > 0 {
- genericRequest = false
- resourceOwned = ownedByMe[0]
- }
- if resourceOwned || genericRequest {
- // Keep the reservation longer that the minDuration (which is really the request timeout) to ensure the
- // transaction key stays in the KV store until after the Core finalize a request timeout condition (which is
- // a success from a request completion perspective).
- if err := c.tryToReserveTxn(ctx, durationInSecs*2); err == nil {
- res = SeizedBySelf
- } else {
- logger.Debugw("watch-other-server",
- log.Fields{"transactionId": c.txnID, "owner": currOwner, "timeout": durationInSecs})
- res = c.Watch(ctx, durationInSecs)
- }
- } else {
- res = c.Watch(ctx, durationInSecs)
- }
- switch res {
- case SeizedBySelf, AbandonedByOther:
- acquired = true
- default:
- acquired = false
- }
- logger.Debugw("acquire-transaction-status", log.Fields{"transactionId": c.txnID, "acquired": acquired, "result": txnState[res]})
- return acquired, nil
-}
-
-func (c *KVTransaction) tryToReserveTxn(ctxt context.Context, durationInSecs int64) error {
- var currOwner string
- var res int
- var err error
- value, _ := ctx.kvClient.Reserve(ctxt, c.txnKey, ctx.owner, durationInSecs)
- if value != nil {
- if currOwner, err = kvstore.ToString(value); err != nil { // This should never happen
- logger.Errorw("unexpected-owner-type", log.Fields{"transactionId": c.txnID, "error": err})
- return err
- }
- if currOwner == ctx.owner {
- logger.Debugw("acquired-transaction", log.Fields{"transactionId": c.txnID, "result": txnState[res]})
- // Setup the monitoring channel
- c.monitorCh = make(chan int)
- go c.holdOnToTxnUntilProcessingCompleted(ctxt, c.txnKey, ctx.owner, durationInSecs)
- return nil
- }
- }
- return status.Error(codes.PermissionDenied, "reservation-denied")
-}
-
-// Watch watches transaction
-func (c *KVTransaction) Watch(ctxt context.Context, durationInSecs int64) int {
- var res int
- events := ctx.kvClient.Watch(ctxt, c.txnKey, false)
- defer ctx.kvClient.CloseWatch(c.txnKey, events)
-
- transactionWasAcquiredByOther := false
-
- //Check whether the transaction was already completed by the other Core before we got here.
- if kvp, _ := ctx.kvClient.Get(ctxt, c.txnKey); kvp != nil {
- transactionWasAcquiredByOther = true
- if val, err := kvstore.ToString(kvp.Value); err == nil {
- if val == TransactionComplete {
- res = CompletedByOther
- // Do an immediate delete of the transaction in the KV Store to free up KV Storage faster
- err = c.Delete(ctxt)
- if err != nil {
- logger.Errorw("unable-to-delete-the-transaction", log.Fields{"error": err})
- }
- return res
- }
- } else {
- // An unexpected value - let's get out of here as something did not go according to plan
- res = AbandonedWatchBySelf
- logger.Debugw("cannot-read-transaction-value", log.Fields{"txn": c.txnID, "error": err})
- return res
- }
- }
-
- for {
- select {
- case event := <-events:
- transactionWasAcquiredByOther = true
- logger.Debugw("received-event", log.Fields{"txn": c.txnID, "type": event.EventType})
- if event.EventType == kvstore.DELETE {
- // The other core failed to process the request
- res = AbandonedByOther
- } else if event.EventType == kvstore.PUT {
- key, e1 := kvstore.ToString(event.Key)
- val, e2 := kvstore.ToString(event.Value)
- if e1 == nil && e2 == nil && key == c.txnKey {
- if val == TransactionComplete {
- res = CompletedByOther
- // Successful request completion has been detected. Remove the transaction key
- err := c.Delete(ctxt)
- if err != nil {
- logger.Errorw("unable-to-delete-the-transaction", log.Fields{"error": err})
- }
- } else {
- logger.Debugw("Ignoring-PUT-event", log.Fields{"val": val, "key": key})
- continue
- }
- } else {
- logger.Warnw("received-unexpected-PUT-event", log.Fields{"txn": c.txnID, "key": key, "ctxKey": c.txnKey})
- }
- }
- case <-time.After(time.Duration(durationInSecs) * time.Second):
- // Corner case: In the case where the Core owning the device dies and before this Core takes ownership of
- // this device there is a window where new requests will end up being watched instead of being processed.
- // Grab the request if the other Core did not create the transaction in the KV store.
- // TODO: Use a peer-monitoring probe to switch over (still relies on the probe frequency). This will
- // guarantee that the peer is actually gone instead of limiting the time the peer can get hold of a
- // request.
- if !transactionWasAcquiredByOther {
- logger.Debugw("timeout-no-peer", log.Fields{"txId": c.txnID})
- res = AbandonedByOther
- } else {
- continue
- }
- }
- break
- }
- return res
-}
-
-// Close closes transaction
-func (c *KVTransaction) Close(ctxt context.Context) error {
- logger.Debugw("close", log.Fields{"txn": c.txnID})
- // Stop monitoring the key (applies only when there has been no transaction switch over)
- if c.monitorCh != nil {
- close(c.monitorCh)
- err := ctx.kvClient.Put(ctxt, c.txnKey, TransactionComplete)
-
- if err != nil {
- logger.Errorw("unable-to-write-a-key-value-pair-to-the-KV-store", log.Fields{"error": err})
- }
- }
- return nil
-}
-
-// Delete deletes transaction
-func (c *KVTransaction) Delete(ctxt context.Context) error {
- logger.Debugw("delete", log.Fields{"txn": c.txnID})
- return ctx.kvClient.Delete(ctxt, c.txnKey)
-}
-
-// holdOnToTxnUntilProcessingCompleted renews the transaction lease until the transaction is complete. durationInSecs
-// is used to calculate the frequency at which the Core processing the transaction renews the lease. This function
-// exits only when the transaction is Closed, i.e completed.
-func (c *KVTransaction) holdOnToTxnUntilProcessingCompleted(ctxt context.Context, key string, owner string, durationInSecs int64) {
- logger.Debugw("holdOnToTxnUntilProcessingCompleted", log.Fields{"txn": c.txnID})
- renewInterval := durationInSecs / NumTxnRenewalPerRequest
- if renewInterval < MinTxnRenewalIntervalInSec {
- renewInterval = MinTxnRenewalIntervalInSec
- }
-forLoop:
- for {
- select {
- case <-c.monitorCh:
- logger.Debugw("transaction-renewal-exits", log.Fields{"txn": c.txnID})
- break forLoop
- case <-time.After(time.Duration(renewInterval) * time.Second):
- if err := ctx.kvClient.RenewReservation(ctxt, c.txnKey); err != nil {
- // Log and continue.
- logger.Warnw("transaction-renewal-failed", log.Fields{"txnId": c.txnKey, "error": err})
- }
- }
- }
-}