[VOL-1512] Set device ownership
This commit consists of the following:
1) Set device ownership per Core in a Core-pair such that only 1
Core actively process a device (i.e. handles all the requests for
that device) while the other Core in the pair passively watch for
updates on that device and will take over in case the owner Core
failed to process the transaction.
2) Cleanup the lock mechanisms to ensure we use a read lock when
needed instead of just a lock.
3) Update logical port additions such that ports are added only when
the device is enabled.
4) Update the port Ids for the logical ports.
5) Update some sarama client configs for performance - this is an
ongoing tune up.
6) Update the adapter request handler in the Core to send back an
ACK immediately to the adapter request instead of processing the
request fully and then sending an ACK. This reduces the latency
over kafka and therefore reduces the likelihood of timeouts.
Change-Id: I9149bf3ba6fbad38e3a29c76ea8dba2f9f731d29
diff --git a/compose/rw_core_concurrency_test.yml b/compose/rw_core_concurrency_test.yml
index 8f6f35e..c672cfd 100644
--- a/compose/rw_core_concurrency_test.yml
+++ b/compose/rw_core_concurrency_test.yml
@@ -31,9 +31,9 @@
- -rw_core_topic=rwcore
- -kv_store_data_prefix=service/voltha
- -in_competing_mode=true
- - -timeout_long_request=3000
- - -timeout_request=300
- - -log_level=0
+ - -timeout_long_request=5000
+ - -timeout_request=6000
+ - -log_level=4
ports:
- 50057:50057
volumes:
@@ -57,9 +57,9 @@
- -rw_core_topic=rwcore
- -kv_store_data_prefix=service/voltha
- -in_competing_mode=true
- - -timeout_long_request=3000
- - -timeout_request=300
- - -log_level=0
+ - -timeout_long_request=5000
+ - -timeout_request=6000
+ - -log_level=4
ports:
- 50058:50057
volumes:
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 42ab83c..887a167 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -184,33 +184,12 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
- // count keys about to be deleted
- gresp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
- if err != nil {
- log.Error(err)
- return err
- }
-
// delete the keys
- dresp, err := c.ectdAPI.Delete(ctx, key, v3Client.WithPrefix())
- if err != nil {
- log.Error(err)
+ if _, err := c.ectdAPI.Delete(ctx, key, v3Client.WithPrefix()); err != nil {
+ log.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
return err
}
-
- if dresp == nil || gresp == nil {
- log.Debug("nothing-to-delete")
- return nil
- }
-
- log.Debugw("delete-keys", log.Fields{"all-keys-deleted": int64(len(gresp.Kvs)) == dresp.Deleted})
- if int64(len(gresp.Kvs)) == dresp.Deleted {
- log.Debug("All-keys-deleted")
- } else {
- log.Error("not-all-keys-deleted")
- err := errors.New("not-all-keys-deleted")
- return err
- }
+ log.Debugw("key(s)-deleted", log.Fields{"key": key})
return nil
}
@@ -243,7 +222,7 @@
defer func() {
if !reservationSuccessful {
if err = c.ReleaseReservation(key); err != nil {
- log.Errorf("cannot-release-lease")
+ log.Error("cannot-release-lease")
}
}
}()
@@ -308,6 +287,7 @@
// ReleaseReservation releases reservation for a specific key.
func (c *EtcdClient) ReleaseReservation(key string) error {
// Get the leaseid using the key
+ log.Debugw("Release-reservation", log.Fields{"key":key})
var ok bool
var leaseID *v3Client.LeaseID
c.writeLock.Lock()
diff --git a/kafka/client.go b/kafka/client.go
index 316a4a5..9588274 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -40,9 +40,9 @@
DefaultKafkaPort = 9092
DefaultGroupName = "voltha"
DefaultSleepOnError = 1
- DefaultProducerFlushFrequency = 5
+ DefaultProducerFlushFrequency = 1
DefaultProducerFlushMessages = 1
- DefaultProducerFlushMaxmessages = 5
+ DefaultProducerFlushMaxmessages = 1
DefaultProducerReturnSuccess = true
DefaultProducerReturnErrors = true
DefaultProducerRetryMax = 3
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 1229e7a..e5e9606 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -256,7 +256,7 @@
// specific key, hence ensuring a single partition is used to publish the request. This ensures that the
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
//key := GetDeviceIdFromTopic(*toTopic)
- log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
+ log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
kp.kafkaClient.Send(protoRequest, toTopic, key)
if waitForResponse {
@@ -370,8 +370,8 @@
}
func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
- kp.lockTopicResponseChannelMap.Lock()
- defer kp.lockTopicResponseChannelMap.Unlock()
+ kp.lockTopicResponseChannelMap.RLock()
+ defer kp.lockTopicResponseChannelMap.RUnlock()
_, exist := kp.topicToResponseChannelMap[topic]
return exist
}
@@ -710,10 +710,10 @@
kp.kafkaClient.Send(icm, replyTopic, key)
}
} else if msg.Header.Type == ic.MessageType_RESPONSE {
- log.Debugw("response-received", log.Fields{"msg": msg})
+ log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
go kp.dispatchResponse(msg)
} else {
- log.Warnw("unsupported-message-received", log.Fields{"msg": msg})
+ log.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
}
}
@@ -726,8 +726,8 @@
}
func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
- kp.lockTransactionIdToChannelMap.Lock()
- defer kp.lockTransactionIdToChannelMap.Unlock()
+ kp.lockTransactionIdToChannelMap.RLock()
+ defer kp.lockTransactionIdToChannelMap.RUnlock()
if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
return
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 698fefa..b0ce502 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -540,8 +540,8 @@
}
func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
+ sc.lockTopicToConsumerChannelMap.RLock()
+ defer sc.lockTopicToConsumerChannelMap.RUnlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
return consumerCh
@@ -726,8 +726,8 @@
// topic via the unique channel each subscriber received during subscription
func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
// Need to go over all channels and publish messages to them - do we need to copy msg?
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
+ sc.lockTopicToConsumerChannelMap.RLock()
+ defer sc.lockTopicToConsumerChannelMap.RUnlock()
for _, ch := range consumerCh.channels {
go func(c chan *ic.InterContainerMessage) {
c <- protoMessage
diff --git a/python/adapters/ponsim_olt/ponsim_olt.py b/python/adapters/ponsim_olt/ponsim_olt.py
index 84e303c..42a36c1 100644
--- a/python/adapters/ponsim_olt/ponsim_olt.py
+++ b/python/adapters/ponsim_olt/ponsim_olt.py
@@ -298,6 +298,7 @@
parent_port_no=1,
child_device_type='ponsim_onu',
channel_id=vlan_id,
+ serial_number=onu.serial_number,
)
self.log.info('starting-frame-grpc-stream')
diff --git a/python/adapters/ponsim_onu/ponsim_onu.py b/python/adapters/ponsim_onu/ponsim_onu.py
index e0592b4..0810376 100644
--- a/python/adapters/ponsim_onu/ponsim_onu.py
+++ b/python/adapters/ponsim_onu/ponsim_onu.py
@@ -199,7 +199,7 @@
device.root = False
device.vendor = 'ponsim'
device.model = 'n/a'
- device.serial_number = device.id
+ device.serial_number = device.serial_number
device.mac_address = "AA:BB:CC:DD:E0:00"
yield self.core_proxy.device_update(device)
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 0be4e12..ec3f0db 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -28,15 +28,14 @@
)
const (
- SENTINEL_ADAPTER_ID = "adapter_sentinel"
+ SENTINEL_ADAPTER_ID = "adapter_sentinel"
SENTINEL_DEVICETYPE_ID = "device_type_sentinel"
-
)
type AdapterAgent struct {
- adapter *voltha.Adapter
+ adapter *voltha.Adapter
deviceTypes map[string]*voltha.DeviceType
- lock sync.RWMutex
+ lock sync.RWMutex
}
func newAdapterAgent(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *AdapterAgent {
@@ -77,14 +76,14 @@
}
func (aa *AdapterAgent) updateAdapter(adapter *voltha.Adapter) {
- aa.lock.RLock()
- defer aa.lock.RUnlock()
+ aa.lock.Lock()
+ defer aa.lock.Unlock()
aa.adapter = adapter
}
-func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType) {
- aa.lock.RLock()
- defer aa.lock.RUnlock()
+func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType) {
+ aa.lock.Lock()
+ defer aa.lock.Unlock()
aa.deviceTypes[deviceType.Id] = deviceType
}
@@ -112,7 +111,7 @@
return &adapterMgr
}
-func (aMgr *AdapterManager) start(ctx context.Context) {
+func (aMgr *AdapterManager) start(ctx context.Context) {
log.Info("starting-adapter-manager")
// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
@@ -154,7 +153,7 @@
// Load the device types
if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
- dTypes := &voltha.DeviceTypes{Items:[]*voltha.DeviceType{}}
+ dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
log.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
@@ -165,11 +164,10 @@
} else {
log.Debug("no-existing-device-type-found")
// No device types data. In order to have a proxy setup for that path let's create a fake device type
- aMgr.addDeviceTypes(&voltha.DeviceTypes{Items:[]*voltha.DeviceType{&voltha.DeviceType{Id:SENTINEL_DEVICETYPE_ID, Adapter:SENTINEL_ADAPTER_ID}}}, true)
+ aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{&voltha.DeviceType{Id: SENTINEL_DEVICETYPE_ID, Adapter: SENTINEL_ADAPTER_ID}}}, true)
}
}
-
//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory() {
// Update the adapters
@@ -193,7 +191,6 @@
}
}
-
func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
@@ -215,7 +212,6 @@
}
}
-
func (aMgr *AdapterManager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) {
if deviceTypes == nil {
return
@@ -231,7 +227,7 @@
adapterAgent.updateDeviceType(clonedDType)
} else {
log.Debugw("adapter-not-exist", log.Fields{"deviceTypes": deviceTypes, "adapterId": clonedDType.Adapter})
- aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id:clonedDType.Adapter}, deviceTypes)
+ aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: clonedDType.Adapter}, deviceTypes)
}
aMgr.deviceTypeToAdapterMap[clonedDType.Id] = clonedDType.Adapter
}
@@ -252,9 +248,9 @@
}
func (aMgr *AdapterManager) listAdapters(ctx context.Context) (*voltha.Adapters, error) {
- result := &voltha.Adapters{Items:[]*voltha.Adapter{}}
- aMgr.lockAdaptersMap.Lock()
- defer aMgr.lockAdaptersMap.Unlock()
+ result := &voltha.Adapters{Items: []*voltha.Adapter{}}
+ aMgr.lockAdaptersMap.RLock()
+ defer aMgr.lockAdaptersMap.RUnlock()
for _, adapterAgent := range aMgr.adapterAgents {
if a := adapterAgent.getAdapter(); a != nil {
if a.Id != SENTINEL_ADAPTER_ID { // don't report the sentinel
@@ -272,8 +268,8 @@
}
func (aMgr *AdapterManager) getAdapter(adapterId string) *voltha.Adapter {
- aMgr.lockAdaptersMap.Lock()
- defer aMgr.lockAdaptersMap.Unlock()
+ aMgr.lockAdaptersMap.RLock()
+ defer aMgr.lockAdaptersMap.RUnlock()
if adapterAgent, ok := aMgr.adapterAgents[adapterId]; ok {
return adapterAgent.getAdapter()
}
@@ -281,7 +277,7 @@
}
//updateAdapter updates an adapter if it exist. Otherwise, it creates it.
-func (aMgr *AdapterManager) updateAdapter(adapter *voltha.Adapter) {
+func (aMgr *AdapterManager) updateAdapter(adapter *voltha.Adapter) {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
if adapterAgent, ok := aMgr.adapterAgents[adapter.Id]; ok {
@@ -292,7 +288,7 @@
}
//updateDeviceType updates an adapter if it exist. Otherwise, it creates it.
-func (aMgr *AdapterManager) updateDeviceType(deviceType *voltha.DeviceType) {
+func (aMgr *AdapterManager) updateDeviceType(deviceType *voltha.DeviceType) {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
aMgr.lockdDeviceTypeToAdapterMap.Lock()
@@ -301,17 +297,17 @@
adapterAgent.updateDeviceType(deviceType)
} else {
aMgr.adapterAgents[deviceType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: deviceType.Adapter},
- &voltha.DeviceTypes{Items:[]*voltha.DeviceType{deviceType}})
+ &voltha.DeviceTypes{Items: []*voltha.DeviceType{deviceType}})
}
aMgr.deviceTypeToAdapterMap[deviceType.Id] = deviceType.Adapter
}
-func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *voltha.CoreInstance {
+func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *voltha.CoreInstance {
log.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
if aMgr.getAdapter(adapter.Id) != nil {
// Already registered
- return &voltha.CoreInstance{InstanceId:aMgr.coreInstanceId}
+ return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceId}
}
// Save the adapter and the device types
aMgr.addAdapter(adapter, true)
@@ -319,7 +315,7 @@
log.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
- return &voltha.CoreInstance{InstanceId:aMgr.coreInstanceId}
+ return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceId}
}
//getAdapterName returns the name of the device adapter that service this device type
@@ -333,7 +329,7 @@
}
// getDeviceType returns the device type proto definition given the name of the device type
-func (aMgr *AdapterManager) getDeviceType(deviceType string) *voltha.DeviceType {
+func (aMgr *AdapterManager) getDeviceType(deviceType string) *voltha.DeviceType {
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
if adapterId, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
@@ -408,4 +404,4 @@
}
}
return nil
-}
\ No newline at end of file
+}
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 3d9487f..f450ca2 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -25,6 +25,7 @@
"github.com/opencord/voltha-go/kafka"
ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/voltha"
+ "github.com/opencord/voltha-go/rw_core/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -65,11 +66,11 @@
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
}
- log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
txn := NewKVTransaction(transactionId)
if txn == nil {
return nil, errors.New("fail-to-create-transaction")
} else if txn.Acquired(timeout) {
+ log.Debugw("acquired-request", log.Fields{"xtrnsId": transactionId})
return txn, nil
} else {
return nil, errors.New("failed-to-seize-request")
@@ -82,20 +83,23 @@
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
}
- log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
txn := NewKVTransaction(transactionId)
if txn == nil {
return nil, errors.New("fail-to-create-transaction")
}
- if rhp.core.deviceOwnership.OwnedByMe(devId) {
+ if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id:devId}) {
+ log.Debugw("owned-by-me", log.Fields{"Id": devId})
if txn.Acquired(timeout) {
+ log.Debugw("processing-request", log.Fields{"Id": devId})
return txn, nil
} else {
return nil, errors.New("failed-to-seize-request")
}
} else {
+ log.Debugw("not-owned-by-me", log.Fields{"Id": devId})
if txn.Monitor(timeout) {
+ log.Debugw("timeout-processing-request", log.Fields{"Id": devId})
return txn, nil
} else {
return nil, errors.New("device-not-owned")
@@ -270,9 +274,10 @@
if updatedDevice, err := rhp.mergeDeviceInfoFromAdapter(device); err != nil {
return nil, status.Errorf(codes.Internal, "%s", err.Error())
} else {
- if err := rhp.deviceMgr.updateDevice(updatedDevice); err != nil {
- return nil, err
- }
+ go rhp.deviceMgr.updateDevice(updatedDevice)
+ //if err := rhp.deviceMgr.updateDevice(updatedDevice); err != nil {
+ // return nil, err
+ //}
}
return new(empty.Empty), nil
@@ -365,9 +370,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, proxyAddress.DeviceId); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
return nil, nil
} else {
defer txn.Close()
@@ -599,10 +603,13 @@
return nil, nil
}
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
- if err := rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
- voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
- return nil, err
- }
+ go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+ voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+
+ //if err := rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+ // voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
+ // return nil, err
+ //}
return new(empty.Empty), nil
}
@@ -659,10 +666,13 @@
}
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
- if err := rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
- voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
- return nil, err
- }
+ go rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+ voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+
+ //if err := rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+ // voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
+ // return nil, err
+ //}
return new(empty.Empty), nil
}
@@ -723,10 +733,14 @@
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- if err := rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
- voltha.OperStatus_OperStatus(operStatus.Val)); err != nil {
- return nil, err
- }
+
+ go rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+ voltha.OperStatus_OperStatus(operStatus.Val))
+
+ //if err := rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+ // voltha.OperStatus_OperStatus(operStatus.Val)); err != nil {
+ // return nil, err
+ //}
return new(empty.Empty), nil
}
@@ -774,9 +788,10 @@
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
- return nil, err
- }
+ go rhp.deviceMgr.addPort(deviceId.Id, port)
+ //if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
+ // return nil, err
+ //}
return new(empty.Empty), nil
}
@@ -827,9 +842,10 @@
return nil, nil
}
- if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
- return nil, err
- }
+ go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
+ //if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
+ // return nil, err
+ //}
return new(empty.Empty), nil
}
@@ -877,9 +893,10 @@
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- if err := rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
- return nil, err
- }
+ go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
+ //if err := rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
+ // return nil, err
+ //}
return new(empty.Empty), nil
}
@@ -928,8 +945,9 @@
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- if err := rhp.deviceMgr.updateImageDownload(deviceId.Id, img); err != nil {
- return nil, err
- }
+ go rhp.deviceMgr.updateImageDownload(deviceId.Id, img)
+ //if err := rhp.deviceMgr.updateImageDownload(deviceId.Id, img); err != nil {
+ // return nil, err
+ //}
return new(empty.Empty), nil
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 02f99fd..9f6adcd 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -65,10 +65,6 @@
core.kvClient = kvClient
core.kafkaClient = kafkaClient
- // Setup device ownership context
- core.deviceOwnership = NewDeviceOwnership(id, kvClient,
- "service/voltha/owns_device", 60)
-
// Setup the KV store
// Do not call NewBackend constructor; it creates its own KV client
// Commented the backend for now until the issue between the model and the KV store
@@ -106,6 +102,10 @@
go core.startGRPCService(ctx)
go core.startAdapterManager(ctx)
+ // Setup device ownership context
+ core.deviceOwnership = NewDeviceOwnership(core.instanceId, core.kvClient, core.deviceMgr, core.logicalDeviceMgr,
+ "service/voltha/owns_device", 10)
+
log.Info("adaptercore-started")
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index b321f22..1d9480d 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -126,8 +126,8 @@
// GetDevice retrieves the latest device information from the data model
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ agent.lockDevice.RLock()
+ defer agent.lockDevice.RUnlock()
if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
@@ -538,8 +538,8 @@
}
func (agent *DeviceAgent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ agent.lockDevice.RLock()
+ defer agent.lockDevice.RUnlock()
log.Debugw("getImageDownload", log.Fields{"id": agent.deviceId})
// Get the most up to date the device info
if device, err := agent.getDeviceWithoutLock(); err != nil {
@@ -555,8 +555,8 @@
}
func (agent *DeviceAgent) listImageDownloads(ctx context.Context, deviceId string) (*voltha.ImageDownloads, error) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ agent.lockDevice.RLock()
+ defer agent.lockDevice.RUnlock()
log.Debugw("listImageDownloads", log.Fields{"id": agent.deviceId})
// Get the most up to date the device info
if device, err := agent.getDeviceWithoutLock(); err != nil {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 06d3bd4..b27f01a 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -100,16 +100,16 @@
// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
- dMgr.lockDeviceAgentsMap.Lock()
+ dMgr.lockDeviceAgentsMap.RLock()
if agent, ok := dMgr.deviceAgents[deviceId]; ok {
- dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.lockDeviceAgentsMap.RUnlock()
return agent
} else {
// Try to load into memory - loading will also create the device agent
- dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.lockDeviceAgentsMap.RUnlock()
if err := dMgr.load(deviceId); err == nil {
- dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.lockDeviceAgentsMap.RLock()
+ defer dMgr.lockDeviceAgentsMap.RUnlock()
if agent, ok = dMgr.deviceAgents[deviceId]; ok {
return agent
}
@@ -120,8 +120,8 @@
// listDeviceIdsFromMap returns the list of device IDs that are in memory
func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
- dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.lockDeviceAgentsMap.RLock()
+ defer dMgr.lockDeviceAgentsMap.RUnlock()
result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
for key, _ := range dMgr.deviceAgents {
result.Items = append(result.Items, &voltha.ID{Id: key})
@@ -302,8 +302,8 @@
}
func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
- dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.lockDeviceAgentsMap.RLock()
+ defer dMgr.lockDeviceAgentsMap.RUnlock()
_, exist := dMgr.deviceAgents[id]
return exist
}
@@ -627,8 +627,8 @@
dMgr.addDeviceAgentToMap(agent)
agent.start(nil, false)
- // Set device ownership
- dMgr.core.deviceOwnership.OwnedByMe(agent.deviceId)
+ //// Set device ownership
+ //dMgr.core.deviceOwnership.OwnedByMe(agent.deviceId)
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index f229383..92262ab 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -20,12 +20,18 @@
"fmt"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "github.com/opencord/voltha-go/rw_core/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"sync"
"time"
)
+func init() {
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
type ownership struct {
id string
owned bool
@@ -38,19 +44,27 @@
kvClient kvstore.Client
reservationTimeout int64 // Duration in seconds
ownershipPrefix string
+ deviceMgr *DeviceManager
+ logicalDeviceMgr *LogicalDeviceManager
deviceMap map[string]*ownership
deviceMapLock *sync.RWMutex
+ deviceToKeyMap map[string]string
+ deviceToKeyMapLock *sync.RWMutex
}
-func NewDeviceOwnership(id string, kvClient kvstore.Client, ownershipPrefix string, reservationTimeout int64) *DeviceOwnership {
+func NewDeviceOwnership(id string, kvClient kvstore.Client, deviceMgr *DeviceManager, logicalDeviceMgr *LogicalDeviceManager, ownershipPrefix string, reservationTimeout int64) *DeviceOwnership {
var deviceOwnership DeviceOwnership
deviceOwnership.instanceId = id
deviceOwnership.exitChannel = make(chan int, 1)
deviceOwnership.kvClient = kvClient
+ deviceOwnership.deviceMgr = deviceMgr
+ deviceOwnership.logicalDeviceMgr = logicalDeviceMgr
deviceOwnership.ownershipPrefix = ownershipPrefix
deviceOwnership.reservationTimeout = reservationTimeout
deviceOwnership.deviceMap = make(map[string]*ownership)
deviceOwnership.deviceMapLock = &sync.RWMutex{}
+ deviceOwnership.deviceToKeyMap = make(map[string]string)
+ deviceOwnership.deviceToKeyMapLock = &sync.RWMutex{}
return &deviceOwnership
}
@@ -63,14 +77,18 @@
log.Info("stopping-deviceOwnership")
da.exitChannel <- 1
// Need to flush all device reservations
+ da.abandonAllDevices()
log.Info("deviceOwnership-stopped")
}
func (da *DeviceOwnership) tryToReserveKey(id string) bool {
var currOwner string
- // Try to reserve the key
+ //Try to reserve the key
kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
value, err := da.kvClient.Reserve(kvKey, da.instanceId, da.reservationTimeout)
+ if err != nil {
+ log.Errorw("error", log.Fields{"error": err, "id": id, "instanceId": da.instanceId})
+ }
if value != nil {
if currOwner, err = kvstore.ToString(value); err != nil {
log.Error("unexpected-owner-type")
@@ -80,55 +98,61 @@
return false
}
-func (da *DeviceOwnership) startOwnershipMonitoring(id string, chnl chan int) {
- var op string
+func (da *DeviceOwnership) renewReservation(id string) bool {
+ // Try to reserve the key
+ kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
+ if err := da.kvClient.RenewReservation(kvKey); err != nil {
+ log.Errorw("reservation-renewal-error", log.Fields{"error": err, "instance": da.instanceId})
+ return false
+ }
+ return true
+}
-startloop:
+func (da *DeviceOwnership) MonitorOwnership(id string, chnl chan int) {
+ op := "starting"
+ exit := false
+ ticker := time.NewTicker(time.Duration(da.reservationTimeout) / 3 * time.Second)
for {
- da.deviceMapLock.RLock()
- val, exist := da.deviceMap[id]
- da.deviceMapLock.RUnlock()
- if exist && val.owned {
+ select {
+ case <-da.exitChannel:
+ log.Infow("closing-monitoring", log.Fields{"Id": id})
+ exit = true
+ case <-ticker.C:
+ log.Debugw(fmt.Sprintf("%s-reservation", op), log.Fields{"Id": id})
+ case <-chnl:
+ log.Infow("closing-device-monitoring", log.Fields{"Id": id})
+ exit = true
+ }
+ if exit {
+ ticker.Stop()
+ break
+ }
+ deviceOwned, ownedByMe := da.getOwnership(id)
+ if deviceOwned && ownedByMe {
// Device owned; renew reservation
op = "renew"
- kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
- if err := da.kvClient.RenewReservation(kvKey); err != nil {
- log.Errorw("reservation-renewal-error", log.Fields{"error": err})
+ if da.renewReservation(id) {
+ log.Debugw("reservation-renewed", log.Fields{"id": id, "instanceId": da.instanceId})
+ } else {
+ log.Debugw("reservation-not-renewed", log.Fields{"id": id, "instanceId": da.instanceId})
}
} else {
- // Device not owned; try to seize ownership
+ // Device not owned or not owned by me; try to seize ownership
op = "retry"
if err := da.setOwnership(id, da.tryToReserveKey(id)); err != nil {
log.Errorw("unexpected-error", log.Fields{"error": err})
}
}
- select {
- case <-da.exitChannel:
- log.Infow("closing-monitoring", log.Fields{"Id": id})
- break startloop
- case <-time.After(time.Duration(da.reservationTimeout) / 3 * time.Second):
- msg := fmt.Sprintf("%s-reservation", op)
- log.Infow(msg, log.Fields{"Id": id})
- case <-chnl:
- log.Infow("closing-device-monitoring", log.Fields{"Id": id})
- break startloop
- }
}
}
-func (da *DeviceOwnership) getOwnership(id string) bool {
+func (da *DeviceOwnership) getOwnership(id string) (bool, bool) {
da.deviceMapLock.RLock()
defer da.deviceMapLock.RUnlock()
if val, exist := da.deviceMap[id]; exist {
- return val.owned
+ return true, val.owned
}
- log.Debugw("setting-up-new-ownership", log.Fields{"Id": id})
- // Not owned by me or maybe anybody else. Try to reserve it
- reservedByMe := da.tryToReserveKey(id)
- myChnl := make(chan int)
- da.deviceMap[id] = &ownership{id: id, owned: reservedByMe, chnl: myChnl}
- go da.startOwnershipMonitoring(id, myChnl)
- return reservedByMe
+ return false, false
}
func (da *DeviceOwnership) setOwnership(id string, owner bool) error {
@@ -146,8 +170,26 @@
// OwnedByMe returns where this Core instance active owns this device. This function will automatically
// trigger the process to monitor the device and update the device ownership regularly.
-func (da *DeviceOwnership) OwnedByMe(id string) bool {
- return da.getOwnership(id)
+func (da *DeviceOwnership) OwnedByMe(id interface{}) bool {
+ // Retrieve the ownership key based on the id
+ var ownershipKey string
+ var err error
+ if ownershipKey, err = da.getOwnershipKey(id); err != nil {
+ log.Warnw("no-ownershipkey", log.Fields{"error": err})
+ return false
+ }
+
+ deviceOwned, ownedByMe := da.getOwnership(ownershipKey)
+ if deviceOwned {
+ return ownedByMe
+ }
+ // Not owned by me or maybe anybody else. Try to reserve it
+ reservedByMe := da.tryToReserveKey(ownershipKey)
+ myChnl := make(chan int)
+ da.deviceMap[ownershipKey] = &ownership{id: ownershipKey, owned: reservedByMe, chnl: myChnl}
+ log.Debugw("set-new-ownership", log.Fields{"Id": ownershipKey, "owned": reservedByMe})
+ go da.MonitorOwnership(ownershipKey, myChnl)
+ return reservedByMe
}
//AbandonDevice must be invoked whenever a device is deleted from the Core
@@ -163,3 +205,73 @@
}
return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
}
+
+//abandonAllDevices must be invoked whenever a device is deleted from the Core
+func (da *DeviceOwnership) abandonAllDevices() {
+ da.deviceMapLock.Lock()
+ defer da.deviceMapLock.Unlock()
+ for _, val := range da.deviceMap {
+ close(val.chnl)
+ }
+}
+
+func (da *DeviceOwnership) getDeviceKey(id string) (string, error) {
+ da.deviceToKeyMapLock.RLock()
+ defer da.deviceToKeyMapLock.RUnlock()
+ if val, exist := da.deviceToKeyMap[id]; exist {
+ return val, nil
+ }
+ return "", status.Error(codes.NotFound, fmt.Sprintf("not-present-%s", id))
+}
+
+func (da *DeviceOwnership) updateDeviceKey(id string, key string) error {
+ da.deviceToKeyMapLock.Lock()
+ defer da.deviceToKeyMapLock.Unlock()
+ if _, exist := da.deviceToKeyMap[id]; exist {
+ return status.Error(codes.AlreadyExists, fmt.Sprintf("already-present-%s", id))
+ }
+ da.deviceToKeyMap[id] = key
+ return nil
+}
+
+func (da *DeviceOwnership) getOwnershipKey(id interface{}) (string, error) {
+ if id == nil {
+ return "", status.Error(codes.InvalidArgument, "nil-id")
+ }
+ var device *voltha.Device
+ var lDevice *voltha.LogicalDevice
+ // The id can either be a device Id or a logical device id.
+ if dId, ok := id.(*utils.DeviceID); ok {
+ // Use cache if present
+ if val, err := da.getDeviceKey(dId.Id); err == nil {
+ return val, nil
+ }
+ if device, _ = da.deviceMgr.GetDevice(dId.Id); device == nil {
+ return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", dId))
+ }
+ if device.Root {
+ if err := da.updateDeviceKey(dId.Id, device.Id); err != nil {
+ log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.Id, "error": err})
+ }
+ return device.Id, nil
+ } else {
+ if err := da.updateDeviceKey(dId.Id, device.ParentId); err != nil {
+ log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.ParentId, "error": err})
+ }
+ return device.ParentId, nil
+ }
+ } else if ldId, ok := id.(*utils.LogicalDeviceID); ok {
+ // Use cache if present
+ if val, err := da.getDeviceKey(ldId.Id); err == nil {
+ return val, nil
+ }
+ if lDevice, _ = da.logicalDeviceMgr.getLogicalDevice(ldId.Id); lDevice == nil {
+ return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", ldId))
+ }
+ if err := da.updateDeviceKey(ldId.Id, lDevice.RootDeviceId); err != nil {
+ log.Warnw("Error-updating-cache", log.Fields{"id": ldId.Id, "key": lDevice.RootDeviceId, "error": err})
+ }
+ return lDevice.RootDeviceId, nil
+ }
+ return "", status.Error(codes.NotFound, fmt.Sprintf("id-%s", id))
+}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d7834eb..1e5cc5b 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -25,6 +25,7 @@
"github.com/opencord/voltha-go/protos/common"
"github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
+ "github.com/opencord/voltha-go/rw_core/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
@@ -40,14 +41,6 @@
)
-type deviceID struct {
- id string
-}
-
-type logicalDeviceID struct {
- id string
-}
-
type APIHandler struct {
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
@@ -61,20 +54,6 @@
core *Core
}
-//func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
-// handler := &APIHandler{
-// deviceMgr: deviceMgr,
-// logicalDeviceMgr: lDeviceMgr,
-// adapterMgr:adapterMgr,
-// coreInCompetingMode:inCompetingMode,
-// longRunningRequestTimeout:longRunningRequestTimeout,
-// defaultRequestTimeout:defaultRequestTimeout,
-// // TODO: Figure out what the 'hint' parameter to queue.New does
-// packetInQueue: queue.New(10),
-// }
-// return handler
-//}
-
func NewAPIHandler(core *Core) *APIHandler {
handler := &APIHandler{
deviceMgr: core.deviceMgr,
@@ -155,16 +134,16 @@
} else {
if id != nil {
// The id can either be a device Id or a logical device id.
- if dId, ok := id.(*deviceID); ok {
+ if dId, ok := id.(*utils.DeviceID); ok {
// Since this core has not processed this request, let's load the device, along with its extended
// family (parents and children) in memory. This will keep this core in-sync with its paired core as
// much as possible. The watch feature in the core model will ensure that the contents of those objects in
// memory are in sync.
time.Sleep(2 * time.Second)
- go handler.deviceMgr.load(dId.id)
- } else if ldId, ok := id.(*logicalDeviceID); ok {
+ go handler.deviceMgr.load(dId.Id)
+ } else if ldId, ok := id.(*utils.LogicalDeviceID); ok {
// This will load the logical device along with its children and grandchildren
- go handler.logicalDeviceMgr.load(ldId.id)
+ go handler.logicalDeviceMgr.load(ldId.Id)
}
}
return nil, errors.New("failed-to-seize-request")
@@ -185,11 +164,7 @@
owned := false
if id != nil {
- if devId, ok := id.(*deviceID); ok {
- owned = handler.core.deviceOwnership.OwnedByMe(devId.id)
- } else if lDevId, ok := id.(*logicalDeviceID); ok {
- owned = handler.core.deviceOwnership.OwnedByMe(lDevId.id)
- }
+ owned = handler.core.deviceOwnership.OwnedByMe(id)
}
if owned {
if txn.Acquired(timeout) {
@@ -264,7 +239,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -285,7 +260,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -307,7 +282,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -330,7 +305,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -432,7 +407,7 @@
return &voltha.Device{}, err
}
if d, ok := res.(*voltha.Device); ok {
- handler.core.deviceOwnership.OwnedByMe(d.Id)
+ handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id:d.Id})
return d, nil
}
}
@@ -453,7 +428,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}, handler.longRunningRequestTimeout); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -474,7 +449,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -495,7 +470,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -538,7 +513,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:img.Id}); err != nil {
return &common.OperationResp{}, err
} else {
defer txn.Close()
@@ -629,7 +604,7 @@
failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:img.Id}); err != nil {
return failedresponse, err
} else {
defer txn.Close()
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 03edb57..a8e6a70 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -168,8 +168,8 @@
// GetLogicalDevice locks the logical device model and then retrieves the latest logical device information
func (agent *LogicalDeviceAgent) GetLogicalDevice() (*voltha.LogicalDevice, error) {
log.Debug("GetLogicalDevice")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ agent.lockLogicalDevice.RLock()
+ defer agent.lockLogicalDevice.RUnlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice, nil
@@ -178,9 +178,9 @@
}
func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() (*voltha.LogicalPorts, error) {
- log.Debug("!!!!!ListLogicalDevicePorts")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ log.Debug("ListLogicalDevicePorts")
+ agent.lockLogicalDevice.RLock()
+ defer agent.lockLogicalDevice.RUnlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
lPorts := make([]*voltha.LogicalPort, 0)
@@ -195,8 +195,8 @@
// listFlows locks the logical device model and then retrieves the latest flow information
func (agent *LogicalDeviceAgent) listFlows() []*ofp.OfpFlowStats {
log.Debug("listFlows")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ agent.lockLogicalDevice.RLock()
+ defer agent.lockLogicalDevice.RUnlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice.Flows.Items
@@ -207,8 +207,8 @@
// listFlowGroups locks the logical device model and then retrieves the latest flow groups information
func (agent *LogicalDeviceAgent) listFlowGroups() []*ofp.OfpGroupEntry {
log.Debug("listFlowGroups")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ agent.lockLogicalDevice.RLock()
+ defer agent.lockLogicalDevice.RUnlock()
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice.FlowGroups.Items
@@ -240,7 +240,7 @@
log.Debug("getLogicalDeviceWithoutLock")
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- log.Debug("getLogicalDeviceWithoutLock", log.Fields{"ldevice": lDevice})
+ //log.Debug("getLogicalDeviceWithoutLock", log.Fields{"ldevice": lDevice})
return lDevice, nil
}
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
@@ -268,7 +268,6 @@
func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
- //var portCap *ic.PortCapability
var err error
var device *voltha.Device
@@ -278,7 +277,6 @@
}
//Get UNI port number
- //var uniPort uint32
changesMade := false
for _, port := range device.Ports {
if port.Type == voltha.Port_ETHERNET_NNI {
@@ -287,7 +285,6 @@
} else {
changesMade = true
}
- //uniPort = port.PortNo
}
}
if changesMade {
@@ -301,11 +298,9 @@
func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
- //var portCap *ic.PortCapability
var err error
//Get UNI port number
- //var uniPort uint32
changesMade := false
for _, port := range childDevice.Ports {
if port.Type == voltha.Port_ETHERNET_UNI {
@@ -314,36 +309,11 @@
} else {
changesMade = true
}
- //uniPort = port.PortNo
}
}
if changesMade {
go agent.setupDeviceGraph()
}
- //if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, uniPort); err != nil {
- // log.Errorw("error-creating-logical-port", log.Fields{"error": err})
- // return err
- //}
- //agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
- //// Get stored logical device
- //if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
- // return status.Error(codes.NotFound, agent.logicalDeviceId)
- //} else {
- // log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
- // portCap.Port.RootPort = false
- // //TODO: For now use the channel id assigned by the OLT as logical port number
- // lPortNo := childDevice.ProxyAddress.ChannelId
- // portCap.Port.Id = fmt.Sprintf("uni-%d", lPortNo)
- // portCap.Port.OfpPort.PortNo = lPortNo
- // portCap.Port.OfpPort.Name = portCap.Port.Id
- // portCap.Port.DeviceId = childDevice.Id
- // portCap.Port.DevicePortNo = uniPort
- // portCap.Port.DeviceId = childDevice.Id
- //
- // ldevice.Ports = append(ldevice.Ports, portCap.Port)
- // return agent.updateLogicalDeviceWithoutLock(ldevice)
- //}
return err
}
@@ -353,9 +323,6 @@
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
}
- //if a, ok := afterUpdate.(*voltha.LogicalDevice); ok {
- // log.Debugw("AFTER UPDATE", log.Fields{"logical": a})
- //}
return nil
}
@@ -1242,13 +1209,19 @@
func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port) error {
- log.Infow("addNNILogicalPort", log.Fields{"NNI": port})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- if agent.portExist(device, port) {
- log.Debugw("port-already-exist", log.Fields{"port": port})
+ log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
+ if device.AdminState != voltha.AdminState_ENABLED {
+ log.Infow("device-not-enabled", log.Fields{"deviceId": device.Id})
return nil
}
+ agent.lockLogicalDevice.RLock()
+ if agent.portExist(device, port) {
+ log.Debugw("port-already-exist", log.Fields{"port": port})
+ agent.lockLogicalDevice.RUnlock()
+ return nil
+ }
+ agent.lockLogicalDevice.RUnlock()
+
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -1256,6 +1229,15 @@
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
return err
}
+
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ // Double check again if this port has been already added since the getPortCapability could have taken a long time
+ if agent.portExist(device, port) {
+ log.Debugw("port-already-exist", log.Fields{"port": port})
+ return nil
+ }
+
portCap.Port.RootPort = true
lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
lp.DeviceId = device.Id
@@ -1295,12 +1277,17 @@
func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port) error {
log.Debugw("addUNILogicalPort", log.Fields{"port": port})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- if agent.portExist(childDevice, port) {
- log.Debugw("port-already-exist", log.Fields{"port": port})
+ if childDevice.AdminState != voltha.AdminState_ENABLED {
+ log.Infow("device-not-enabled", log.Fields{"deviceId": childDevice.Id})
return nil
}
+ agent.lockLogicalDevice.RLock()
+ if agent.portExist(childDevice, port) {
+ log.Debugw("port-already-exist", log.Fields{"port": port})
+ agent.lockLogicalDevice.RUnlock()
+ return nil
+ }
+ agent.lockLogicalDevice.RUnlock()
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -1308,8 +1295,13 @@
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
return err
}
- //agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ // Double check again if this port has been already added since the getPortCapability could have taken a long time
+ if agent.portExist(childDevice, port) {
+ log.Debugw("port-already-exist", log.Fields{"port": port})
+ return nil
+ }
// Get stored logical device
if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
return status.Error(codes.NotFound, agent.logicalDeviceId)
@@ -1318,7 +1310,7 @@
portCap.Port.RootPort = false
portCap.Port.Id = port.Label
portCap.Port.OfpPort.PortNo = port.PortNo
- portCap.Port.OfpPort.Name = portCap.Port.Id
+ portCap.Port.OfpPort.Name = childDevice.SerialNumber
portCap.Port.DeviceId = childDevice.Id
portCap.Port.DevicePortNo = port.PortNo
cloned := (proto.Clone(ldevice)).(*voltha.LogicalDevice)
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 47249f6..5f572b1 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -91,16 +91,16 @@
// getLogicalDeviceAgent returns the logical device agent. If the device is not in memory then the device will
// be loaded from dB and a logical device agent created to managed it.
func (ldMgr *LogicalDeviceManager) getLogicalDeviceAgent(logicalDeviceId string) *LogicalDeviceAgent {
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ ldMgr.lockLogicalDeviceAgentsMap.RLock()
if agent, ok := ldMgr.logicalDeviceAgents[logicalDeviceId]; ok {
- ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
return agent
} else {
// Try to load into memory - loading will also create the logical device agent
- ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
if err := ldMgr.load(logicalDeviceId); err == nil {
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
- defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ ldMgr.lockLogicalDeviceAgentsMap.RLock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
if agent, ok = ldMgr.logicalDeviceAgents[logicalDeviceId]; ok {
return agent
}
@@ -147,20 +147,6 @@
return result, nil
}
-// List only logical devices that are in memory
-//func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
-// log.Debug("listLogicalDevices")
-// result := &voltha.LogicalDevices{}
-// ldMgr.lockLogicalDeviceAgentsMap.Lock()
-// defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
-// for _, agent := range ldMgr.logicalDeviceAgents {
-// if lDevice, err := agent.GetLogicalDevice(); err == nil {
-// result.Items = append(result.Items, lDevice)
-// }
-// }
-// return result, nil
-//}
-
func (ldMgr *LogicalDeviceManager) createLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
@@ -184,8 +170,8 @@
ldMgr.addLogicalDeviceAgentToMap(agent)
go agent.start(ctx, false)
- // Set device ownership
- ldMgr.core.deviceOwnership.OwnedByMe(id)
+ //// Set device ownership
+ //ldMgr.core.deviceOwnership.OwnedByMe(id)
log.Debug("creating-logical-device-ends")
return &id, nil
@@ -193,8 +179,7 @@
// load loads a logical device manager in memory
func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
- //log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
- log.Errorw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+ log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
// To prevent a race condition, let's hold the logical device agent map lock. This will prevent a loading and
// a create logical device callback from occurring at the same time.
ldMgr.lockLogicalDeviceAgentsMap.Lock()
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index ec7e4ca..12bf93e 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -46,7 +46,8 @@
SEIZED_BY_SELF
COMPLETED_BY_OTHER
ABANDONED_BY_OTHER
- STOPPED_WAITING_FOR_OTHER
+ STOPPED_WATCHING_KEY
+ STOPPED_WAITING_FOR_KEY
)
const (
@@ -69,7 +70,8 @@
"SEIZED-BY-SELF",
"COMPLETED-BY-OTHER",
"ABANDONED-BY-OTHER",
- "STOPPED-WAITING-FOR-OTHER"}
+ "STOPPED-WATCHING-KEY",
+ "STOPPED-WAITING-FOR-KEY"}
func init() {
log.AddPackage(log.JSON, log.DebugLevel, nil)
@@ -166,7 +168,7 @@
// Setting value to nil leads to watch mode
if value != nil {
if currOwner, err = kvstore.ToString(value); err != nil {
- log.Error("unexpected-owner-type")
+ log.Errorw("unexpected-owner-type", log.Fields{"txn": c.txnId})
value = nil
}
}
@@ -177,16 +179,16 @@
// Another core instance has reserved the request
// Watch for reservation expiry or successful request completion
log.Debugw("watch-other-server",
- log.Fields{"owner": currOwner, "timeout": duration})
+ log.Fields{"txn": c.txnId, "owner": currOwner, "timeout": duration})
res = c.Watch(duration)
}
// Clean-up: delete the transaction key after a long delay
go c.deleteTransactionKey()
- log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
+ log.Debugw("acquire-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
switch res {
- case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+ case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY:
acquired = true
default:
acquired = false
@@ -209,7 +211,6 @@
func (c *KVTransaction) Monitor(duration int64) bool {
var acquired bool
var res int
- var timeElapsed int64
// Convert milliseconds to seconds, rounding up
// The reservation TTL is specified in seconds
@@ -217,33 +218,15 @@
if remainder := duration % 1000; remainder > 0 {
durationInSecs++
}
- // Check if transaction key has been set
- keyExists := false
- for timeElapsed = 0; timeElapsed < duration; timeElapsed = timeElapsed + ctx.monitorLoopTime {
- kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
- if err == nil && kvp == nil {
- // This core has received the request before the core that actually
- // owns the device. The owning core has yet to seize the transaction.
- time.Sleep(time.Duration(ctx.monitorLoopTime) * time.Millisecond)
- } else {
- keyExists = true
- log.Debug("waited-for-other-to-reserve-transaction")
- break
- }
- }
- if keyExists {
- // Watch for reservation expiry or successful request completion
- log.Debugw("watch-other-server", log.Fields{"timeout": duration})
- res = c.Watch(duration)
- } else {
- res = STOPPED_WAITING_FOR_OTHER
- }
+
+ res = c.Watch(duration)
+
// Clean-up: delete the transaction key after a long delay
go c.deleteTransactionKey()
- log.Debugw("own-transaction", log.Fields{"result": txnState[res]})
+ log.Debugw("monitor-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
switch res {
- case ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+ case ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY, STOPPED_WAITING_FOR_KEY:
acquired = true
default:
acquired = false
@@ -266,18 +249,18 @@
// In case of missing events, let's check the transaction key
kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
if err == nil && kvp == nil {
- log.Debug("missed-deleted-event")
+ log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
res = ABANDONED_BY_OTHER
} else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
- log.Debugw("missed-put-event",
- log.Fields{"key": c.txnKey, "value": val})
+ log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
res = COMPLETED_BY_OTHER
} else {
- res = STOPPED_WAITING_FOR_OTHER
+ log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
+ res = STOPPED_WATCHING_KEY
}
case event := <-events:
- log.Debugw("received-event", log.Fields{"type": event.EventType})
+ log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
if event.EventType == kvstore.DELETE {
// The other core failed to process the request
res = ABANDONED_BY_OTHER
@@ -296,19 +279,19 @@
}
func (c *KVTransaction) deleteTransactionKey() {
- log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
+ log.Debugw("schedule-key-deletion", log.Fields{"txnId": c.txnId, "txnkey": c.txnKey})
time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
- log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
+ log.Debugw("background-key-deletion", log.Fields{"txn": c.txnId, "txnkey": c.txnKey})
ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
}
func (c *KVTransaction) Close() error {
- log.Debugw("close", log.Fields{"key": c.txnKey})
+ log.Debugw("close", log.Fields{"txn": c.txnId})
return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
}
func (c *KVTransaction) Delete() error {
- log.Debugw("delete", log.Fields{"key": c.txnKey})
+ log.Debugw("delete", log.Fields{"txn": c.txnId})
err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
return err
}
diff --git a/rw_core/graph/device_graph.go b/rw_core/graph/device_graph.go
index d7192da..58198de 100644
--- a/rw_core/graph/device_graph.go
+++ b/rw_core/graph/device_graph.go
@@ -66,6 +66,7 @@
logicalPorts []*voltha.LogicalPort
RootPorts map[uint32]uint32
Routes map[OFPortLink][]RouteHop
+ graphBuildLock sync.RWMutex
boundaryPorts sync.Map
}
@@ -73,6 +74,7 @@
var dg DeviceGraph
dg.GGraph = goraph.NewGraph()
dg.getDevice = getDevice
+ dg.graphBuildLock = sync.RWMutex{}
return &dg
}
@@ -80,6 +82,8 @@
if dg == nil {
return
}
+ dg.graphBuildLock.Lock()
+ defer dg.graphBuildLock.Unlock()
dg.logicalPorts = lps
// Set the root ports
dg.RootPorts = make(map[uint32]uint32)
@@ -215,6 +219,8 @@
}
func (dg *DeviceGraph) GetDeviceNodeIds() map[string]string {
+ dg.graphBuildLock.RLock()
+ defer dg.graphBuildLock.RUnlock()
nodeIds := make(map[string]string)
nodesMap := dg.GGraph.GetNodes()
for id, node := range nodesMap {
diff --git a/rw_core/main.go b/rw_core/main.go
index 92b16d2..2dadeb2 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -130,7 +130,7 @@
rw.kvClient,
rw.config.KVStoreTimeout,
rw.config.KVTxnKeyDelTime,
- 10); err != nil {
+ 1); err != nil {
log.Fatal("creating-transaction-context-failed")
}
}
@@ -229,7 +229,10 @@
}
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
- log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+ //log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/flow_decomposition", log.DebugLevel)
+ //log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/graph", log.DebugLevel)
+ //log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+ //log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
defer log.CleanUp()
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
new file mode 100644
index 0000000..1e1ed9f
--- /dev/null
+++ b/rw_core/utils/core_utils.go
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package utils
+
+type DeviceID struct {
+ Id string
+}
+
+type LogicalDeviceID struct {
+ Id string
+}