VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 5d539aa..ac856df 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -119,8 +119,8 @@
aMgr.loadAdaptersAndDevicetypesInMemory()
//// Create the proxies
- aMgr.adapterProxy = aMgr.clusterDataProxy.CreateProxy("/adapters", false)
- aMgr.deviceTypeProxy = aMgr.clusterDataProxy.CreateProxy("/device_types", false)
+ aMgr.adapterProxy = aMgr.clusterDataProxy.CreateProxy(context.Background(), "/adapters", false)
+ aMgr.deviceTypeProxy = aMgr.clusterDataProxy.CreateProxy(context.Background(), "/device_types", false)
// Register the callbacks
aMgr.adapterProxy.RegisterCallback(model.POST_UPDATE, aMgr.adapterUpdated)
@@ -138,7 +138,7 @@
//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() {
// Load the adapters
- if adaptersIf := aMgr.clusterDataProxy.List("/adapters", 0, false, ""); adaptersIf != nil {
+ if adaptersIf := aMgr.clusterDataProxy.List(context.Background(), "/adapters", 0, false, ""); adaptersIf != nil {
for _, adapterIf := range adaptersIf.([]interface{}) {
if adapter, ok := adapterIf.(*voltha.Adapter); ok {
log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
@@ -152,7 +152,7 @@
}
// Load the device types
- if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
+ if deviceTypesIf := aMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, ""); deviceTypesIf != nil {
dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
@@ -171,7 +171,7 @@
//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory() {
// Update the adapters
- if adaptersIf := aMgr.clusterDataProxy.List("/adapters", 0, false, ""); adaptersIf != nil {
+ if adaptersIf := aMgr.clusterDataProxy.List(context.Background(), "/adapters", 0, false, ""); adaptersIf != nil {
for _, adapterIf := range adaptersIf.([]interface{}) {
if adapter, ok := adapterIf.(*voltha.Adapter); ok {
log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
@@ -180,7 +180,7 @@
}
}
// Update the device types
- if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
+ if deviceTypesIf := aMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, ""); deviceTypesIf != nil {
dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
@@ -200,8 +200,8 @@
aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, nil)
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
- if kvAdapter := aMgr.clusterDataProxy.Get("/adapters/"+adapter.Id, 0, false, ""); kvAdapter == nil {
- if added := aMgr.clusterDataProxy.AddWithID("/adapters", adapter.Id, clonedAdapter, ""); added == nil {
+ if kvAdapter := aMgr.clusterDataProxy.Get(context.Background(), "/adapters/"+adapter.Id, 0, false, ""); kvAdapter == nil {
+ if added := aMgr.clusterDataProxy.AddWithID(context.Background(), "/adapters", adapter.Id, clonedAdapter, ""); added == nil {
//TODO: Errors when saving to KV would require a separate go routine to be launched and try the saving again
log.Errorw("failed-to-save-adapter", log.Fields{"adapter": adapter})
} else {
@@ -234,10 +234,10 @@
if saveToDb {
// Save the device types to the KV store as well
for _, deviceType := range deviceTypes.Items {
- if dType := aMgr.clusterDataProxy.Get("/device_types/"+deviceType.Id, 0, false, ""); dType == nil {
+ if dType := aMgr.clusterDataProxy.Get(context.Background(), "/device_types/"+deviceType.Id, 0, false, ""); dType == nil {
// Does not exist - save it
clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
- if added := aMgr.clusterDataProxy.AddWithID("/device_types", deviceType.Id, clonedDType, ""); added == nil {
+ if added := aMgr.clusterDataProxy.AddWithID(context.Background(), "/device_types", deviceType.Id, clonedDType, ""); added == nil {
log.Errorw("failed-to-save-deviceType", log.Fields{"deviceType": deviceType})
} else {
log.Debugw("device-type-saved-to-KV-Store", log.Fields{"deviceType": deviceType})
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 938d8e9..224b3cb 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -49,7 +49,7 @@
kvClient kvstore.Client
kafkaClient kafka.Client
coreMembership *voltha.Membership
- membershipLock *sync.RWMutex
+ membershipLock sync.RWMutex
deviceOwnership *DeviceOwnership
}
@@ -78,9 +78,8 @@
PathPrefix: cf.KVStoreDataPrefix}
core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
- core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
- core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
- core.membershipLock = &sync.RWMutex{}
+ core.clusterDataProxy = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
+ core.localDataProxy = core.localDataRoot.CreateProxy(context.Background(), "/", false)
return &core
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 0ce1cce..0198254 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -29,6 +29,7 @@
"google.golang.org/grpc/status"
"reflect"
"sync"
+ "time"
)
type DeviceAgent struct {
@@ -85,7 +86,7 @@
defer agent.lockDevice.Unlock()
log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceId})
if loadFromdB {
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceId, 1, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
agent.lastData = proto.Clone(d).(*voltha.Device)
agent.deviceType = agent.lastData.Adapter
@@ -97,12 +98,12 @@
log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
} else {
// Add the initial device to the local model
- if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
+ if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceId, agent.lastData, ""); added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
}
}
- agent.deviceProxy = agent.clusterDataProxy.CreateProxy("/devices/"+agent.deviceId, false)
+ agent.deviceProxy = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceId, false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
log.Debug("device-agent-started")
@@ -115,7 +116,7 @@
defer agent.lockDevice.Unlock()
log.Debug("stopping-device-agent")
// Remove the device from the KV store
- if removed := agent.clusterDataProxy.Remove("/devices/"+agent.deviceId, ""); removed == nil {
+ if removed := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceId, ""); removed == nil {
log.Debugw("device-already-removed", log.Fields{"id": agent.deviceId})
}
agent.exitChannel <- 1
@@ -127,7 +128,7 @@
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.RLock()
defer agent.lockDevice.RUnlock()
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 0, true, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -139,7 +140,7 @@
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
// This function is meant so that we do not have duplicate code all over the device agent functions
func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -186,7 +187,9 @@
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = voltha.AdminState_ENABLED
cloned.OperStatus = voltha.OperStatus_ACTIVATING
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -547,7 +550,8 @@
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -574,7 +578,8 @@
// Received an Ack (no error found above). Now update the device in the model to the expected state
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = adminState
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
}
@@ -626,7 +631,8 @@
// the device as well as its association with the logical device
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = voltha.AdminState_DELETED
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -660,7 +666,8 @@
cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
}
cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
// Send the request to the adapter
@@ -706,7 +713,8 @@
if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
// Set the device to Enabled
cloned.AdminState = voltha.AdminState_ENABLED
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
// Send the request to teh adapter
@@ -744,7 +752,8 @@
}
// Set the device to downloading_image
cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -781,7 +790,8 @@
image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
}
}
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -836,7 +846,8 @@
cloned.AdminState = voltha.AdminState_ENABLED
}
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
}
@@ -965,7 +976,8 @@
defer agent.lockDevice.Unlock()
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
cloned := proto.Clone(device).(*voltha.Device)
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", device.Id)
}
@@ -975,7 +987,8 @@
func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
cloned := proto.Clone(device).(*voltha.Device)
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", device.Id)
}
@@ -1002,7 +1015,8 @@
}
log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1022,7 +1036,8 @@
port.OperStatus = voltha.OperStatus_ACTIVE
}
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1043,7 +1058,8 @@
port.OperStatus = voltha.OperStatus_UNKNOWN
}
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1077,7 +1093,8 @@
}
log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1106,7 +1123,8 @@
cloned.Ports = []*voltha.Port{}
log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1125,7 +1143,8 @@
cloned := proto.Clone(storeDevice).(*voltha.Device)
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -1163,7 +1182,8 @@
}
cloned.Ports = append(cloned.Ports, cp)
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -1191,7 +1211,8 @@
}
}
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -1221,7 +1242,8 @@
}
// Store the device with updated peer ports
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -1263,7 +1285,8 @@
log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
// Save the data
cloned := proto.Clone(storeDevice).(*voltha.Device)
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
return
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index ad4f362..257b707 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,34 +33,31 @@
)
type DeviceManager struct {
- deviceAgents map[string]*DeviceAgent
- rootDevices map[string]bool
- lockRootDeviceMap sync.RWMutex
- core *Core
- adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
- logicalDeviceMgr *LogicalDeviceManager
- kafkaICProxy *kafka.InterContainerProxy
- stateTransitions *TransitionMap
- clusterDataProxy *model.Proxy
- coreInstanceId string
- exitChannel chan int
- defaultTimeout int64
- lockDeviceAgentsMap sync.RWMutex
+ deviceAgents sync.Map
+ rootDevices map[string]bool
+ lockRootDeviceMap sync.RWMutex
+ core *Core
+ adapterProxy *AdapterProxy
+ adapterMgr *AdapterManager
+ logicalDeviceMgr *LogicalDeviceManager
+ kafkaICProxy *kafka.InterContainerProxy
+ stateTransitions *TransitionMap
+ clusterDataProxy *model.Proxy
+ coreInstanceId string
+ exitChannel chan int
+ defaultTimeout int64
}
func newDeviceManager(core *Core) *DeviceManager {
var deviceMgr DeviceManager
deviceMgr.core = core
deviceMgr.exitChannel = make(chan int, 1)
- deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
deviceMgr.rootDevices = make(map[string]bool)
deviceMgr.kafkaICProxy = core.kmp
deviceMgr.adapterProxy = NewAdapterProxy(core.kmp)
deviceMgr.coreInstanceId = core.instanceId
deviceMgr.clusterDataProxy = core.clusterDataProxy
deviceMgr.adapterMgr = core.adapterMgr
- deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
deviceMgr.lockRootDeviceMap = sync.RWMutex{}
deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
return &deviceMgr
@@ -92,12 +89,9 @@
}
func (dMgr *DeviceManager) addDeviceAgentToMap(agent *DeviceAgent) {
- dMgr.lockDeviceAgentsMap.Lock()
- //defer dMgr.lockDeviceAgentsMap.Unlock()
- if _, exist := dMgr.deviceAgents[agent.deviceId]; !exist {
- dMgr.deviceAgents[agent.deviceId] = agent
+ if _, exist := dMgr.deviceAgents.Load(agent.deviceId); !exist {
+ dMgr.deviceAgents.Store(agent.deviceId, agent)
}
- dMgr.lockDeviceAgentsMap.Unlock()
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
dMgr.rootDevices[agent.deviceId] = agent.isRootdevice
@@ -105,34 +99,25 @@
}
func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
- dMgr.lockDeviceAgentsMap.Lock()
- //defer dMgr.lockDeviceAgentsMap.Unlock()
- delete(dMgr.deviceAgents, agent.deviceId)
- dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.deviceAgents.Delete(agent.deviceId)
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
delete(dMgr.rootDevices, agent.deviceId)
-
}
// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
- dMgr.lockDeviceAgentsMap.RLock()
- if agent, ok := dMgr.deviceAgents[deviceId]; ok {
- dMgr.lockDeviceAgentsMap.RUnlock()
- return agent
+ if agent, ok := dMgr.deviceAgents.Load(deviceId); ok {
+ return agent.(*DeviceAgent)
} else {
// Try to load into memory - loading will also create the device agent and set the device ownership
- dMgr.lockDeviceAgentsMap.RUnlock()
if err := dMgr.load(deviceId); err == nil {
- dMgr.lockDeviceAgentsMap.RLock()
- defer dMgr.lockDeviceAgentsMap.RUnlock()
- if agent, ok = dMgr.deviceAgents[deviceId]; !ok {
+ if agent, ok = dMgr.deviceAgents.Load(deviceId); !ok {
return nil
} else {
// Register this device for ownership tracking
go dMgr.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: deviceId})
- return agent
+ return agent.(*DeviceAgent)
}
} else {
//TODO: Change the return params to return an error as well
@@ -144,12 +129,13 @@
// listDeviceIdsFromMap returns the list of device IDs that are in memory
func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
- dMgr.lockDeviceAgentsMap.RLock()
- defer dMgr.lockDeviceAgentsMap.RUnlock()
result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
- for key := range dMgr.deviceAgents {
- result.Items = append(result.Items, &voltha.ID{Id: key})
- }
+
+ dMgr.deviceAgents.Range(func(key, value interface{}) bool {
+ result.Items = append(result.Items, &voltha.ID{Id: key.(string)})
+ return true
+ })
+
return result
}
@@ -355,9 +341,7 @@
}
func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
- dMgr.lockDeviceAgentsMap.RLock()
- defer dMgr.lockDeviceAgentsMap.RUnlock()
- _, exist := dMgr.deviceAgents[id]
+ _, exist := dMgr.deviceAgents.Load(id)
return exist
}
@@ -374,7 +358,7 @@
func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
log.Debug("ListDevices")
result := &voltha.Devices{}
- if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
+ if devices := dMgr.clusterDataProxy.List(context.Background(), "/devices", 0, false, ""); devices != nil {
for _, device := range devices.([]interface{}) {
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
@@ -396,7 +380,7 @@
//getDeviceFromModelretrieves the device data from the model.
func (dMgr *DeviceManager) getDeviceFromModel(deviceId string) (*voltha.Device, error) {
- if device := dMgr.clusterDataProxy.Get("/devices/"+deviceId, 0, false, ""); device != nil {
+ if device := dMgr.clusterDataProxy.Get(context.Background(), "/devices/"+deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
return d, nil
}
@@ -791,7 +775,7 @@
log.Debugw("no-op-transition", log.Fields{"deviceId": current.Id})
return nil
}
- log.Debugw("handler-found", log.Fields{"num-handlers": len(handlers), "isParent": current.Root})
+ log.Debugw("handler-found", log.Fields{"num-handlers": len(handlers), "isParent": current.Root, "current-data": current})
for _, handler := range handlers {
log.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
if err := handler(current); err != nil {
@@ -1024,13 +1008,13 @@
func (dMgr *DeviceManager) getAllDeviceIdsWithDeviceParentId(id string) []string {
log.Debugw("getAllAgentsWithDeviceParentId", log.Fields{"parentDeviceId": id})
deviceIds := make([]string, 0)
- dMgr.lockDeviceAgentsMap.RLock()
- defer dMgr.lockDeviceAgentsMap.RUnlock()
- for deviceId, agent := range dMgr.deviceAgents {
+ dMgr.deviceAgents.Range(func(key, value interface{}) bool {
+ agent := value.(*DeviceAgent)
if agent.parentId == id {
- deviceIds = append(deviceIds, deviceId)
+ deviceIds = append(deviceIds, key.(string))
}
- }
+ return true
+ })
return deviceIds
}
@@ -1218,7 +1202,12 @@
}
func (dMgr *DeviceManager) NotifyInvalidTransition(pcDevice *voltha.Device) error {
- log.Errorw("NotifyInvalidTransition", log.Fields{"device": pcDevice.Id, "adminState": pcDevice.AdminState})
+ log.Errorw("NotifyInvalidTransition", log.Fields{
+ "device": pcDevice.Id,
+ "adminState": pcDevice.AdminState,
+ "operState": pcDevice.OperStatus,
+ "connState": pcDevice.ConnectStatus,
+ })
//TODO: notify over kafka?
return nil
}
@@ -1230,8 +1219,8 @@
}
func (dMgr *DeviceManager) UpdateDeviceAttribute(deviceId string, attribute string, value interface{}) {
- if agent, ok := dMgr.deviceAgents[deviceId]; ok {
- agent.updateDeviceAttribute(attribute, value)
+ if agent, ok := dMgr.deviceAgents.Load(deviceId); ok {
+ agent.(*DeviceAgent).updateDeviceAttribute(attribute, value)
}
}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 53faec8..70349d8 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -32,6 +32,7 @@
"google.golang.org/grpc/status"
"reflect"
"sync"
+ "time"
)
type LogicalDeviceAgent struct {
@@ -103,7 +104,7 @@
agent.lockLogicalDevice.Lock()
// Save the logical device
- if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
+ if added := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, ""); added == nil {
log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
} else {
log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -135,12 +136,15 @@
defer agent.lockLogicalDevice.Unlock()
agent.flowProxy = agent.clusterDataProxy.CreateProxy(
+ ctx,
fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
false)
agent.groupProxy = agent.clusterDataProxy.CreateProxy(
+ ctx,
fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceId),
false)
agent.ldProxy = agent.clusterDataProxy.CreateProxy(
+ ctx,
fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceId),
false)
@@ -162,7 +166,7 @@
defer agent.lockLogicalDevice.Unlock()
//Remove the logical device from the model
- if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
+ if removed := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
} else {
log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -176,7 +180,7 @@
log.Debug("GetLogicalDevice")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice, nil
}
@@ -187,7 +191,7 @@
log.Debug("ListLogicalDeviceFlows")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
cFlows := (proto.Clone(lDevice.Flows)).(*ofp.Flows)
return cFlows, nil
@@ -199,7 +203,7 @@
log.Debug("ListLogicalDeviceFlowGroups")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
cFlowGroups := (proto.Clone(lDevice.FlowGroups)).(*ofp.FlowGroups)
return cFlowGroups, nil
@@ -211,7 +215,7 @@
log.Debug("ListLogicalDevicePorts")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
lPorts := make([]*voltha.LogicalPort, 0)
for _, port := range lDevice.Ports {
@@ -227,7 +231,7 @@
log.Debug("listFlows")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice.Flows.Items
}
@@ -239,7 +243,7 @@
log.Debug("listFlowGroups")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice.FlowGroups.Items
}
@@ -248,7 +252,8 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
- afterUpdate := agent.flowProxy.Update("/", flows, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.flowProxy.Update(updateCtx, "/", flows, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
}
@@ -257,7 +262,8 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
- afterUpdate := agent.groupProxy.Update("/", flowGroups, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
}
@@ -268,7 +274,7 @@
// functions that have already acquired the logical device lock to the model
func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
log.Debug("getLogicalDeviceWithoutLock")
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
//log.Debug("getLogicalDeviceWithoutLock", log.Fields{"ldevice": lDevice})
return lDevice, nil
@@ -466,7 +472,8 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
- afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, logicalDevice, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/logical_devices/"+agent.logicalDeviceId, logicalDevice, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
}
@@ -1187,6 +1194,7 @@
// Set the proxy and callback for that port
agent.portProxiesLock.Lock()
agent.portProxies[port.Id] = agent.clusterDataProxy.CreateProxy(
+ context.Background(),
fmt.Sprintf("/logical_devices/%s/ports/%s", agent.logicalDeviceId, port.Id),
false)
agent.portProxies[port.Id].RegisterCallback(model.POST_UPDATE, agent.portUpdated)
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 96e3541..b871cd4 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -30,27 +30,24 @@
)
type LogicalDeviceManager struct {
- logicalDeviceAgents map[string]*LogicalDeviceAgent
- core *Core
- deviceMgr *DeviceManager
- grpcNbiHdlr *APIHandler
- adapterProxy *AdapterProxy
- kafkaICProxy *kafka.InterContainerProxy
- clusterDataProxy *model.Proxy
- exitChannel chan int
- lockLogicalDeviceAgentsMap sync.RWMutex
- defaultTimeout int64
+ logicalDeviceAgents sync.Map
+ core *Core
+ deviceMgr *DeviceManager
+ grpcNbiHdlr *APIHandler
+ adapterProxy *AdapterProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ defaultTimeout int64
}
func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
var logicalDeviceMgr LogicalDeviceManager
logicalDeviceMgr.core = core
logicalDeviceMgr.exitChannel = make(chan int, 1)
- logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
logicalDeviceMgr.deviceMgr = deviceMgr
logicalDeviceMgr.kafkaICProxy = kafkaICProxy
logicalDeviceMgr.clusterDataProxy = cdProxy
- logicalDeviceMgr.lockLogicalDeviceAgentsMap = sync.RWMutex{}
logicalDeviceMgr.defaultTimeout = timeout
return &logicalDeviceMgr
}
@@ -83,35 +80,26 @@
}
func (ldMgr *LogicalDeviceManager) addLogicalDeviceAgentToMap(agent *LogicalDeviceAgent) {
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
- defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
- if _, exist := ldMgr.logicalDeviceAgents[agent.logicalDeviceId]; !exist {
- ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+ if _, exist := ldMgr.logicalDeviceAgents.Load(agent.logicalDeviceId); !exist {
+ ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
}
}
func (ldMgr *LogicalDeviceManager) isLogicalDeviceInCache(logicalDeviceId string) bool {
- ldMgr.lockLogicalDeviceAgentsMap.RLock()
- defer ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
- _, inCache := ldMgr.logicalDeviceAgents[logicalDeviceId]
+ _, inCache := ldMgr.logicalDeviceAgents.Load(logicalDeviceId)
return inCache
}
// getLogicalDeviceAgent returns the logical device agent. If the device is not in memory then the device will
// be loaded from dB and a logical device agent created to managed it.
func (ldMgr *LogicalDeviceManager) getLogicalDeviceAgent(logicalDeviceId string) *LogicalDeviceAgent {
- ldMgr.lockLogicalDeviceAgentsMap.RLock()
- if agent, ok := ldMgr.logicalDeviceAgents[logicalDeviceId]; ok {
- ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
- return agent
+ if agent, ok := ldMgr.logicalDeviceAgents.Load(logicalDeviceId); ok {
+ return agent.(*LogicalDeviceAgent)
} else {
// Try to load into memory - loading will also create the logical device agent
- ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
if err := ldMgr.load(logicalDeviceId); err == nil {
- ldMgr.lockLogicalDeviceAgentsMap.RLock()
- defer ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
- if agent, ok = ldMgr.logicalDeviceAgents[logicalDeviceId]; ok {
- return agent
+ if agent, ok = ldMgr.logicalDeviceAgents.Load(logicalDeviceId); ok {
+ return agent.(*LogicalDeviceAgent)
}
}
}
@@ -119,9 +107,7 @@
}
func (ldMgr *LogicalDeviceManager) deleteLogicalDeviceAgent(logicalDeviceId string) {
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
- defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
- delete(ldMgr.logicalDeviceAgents, logicalDeviceId)
+ ldMgr.logicalDeviceAgents.Delete(logicalDeviceId)
}
// GetLogicalDevice provides a cloned most up to date logical device. If device is not in memory
@@ -137,20 +123,21 @@
func (ldMgr *LogicalDeviceManager) listManagedLogicalDevices() (*voltha.LogicalDevices, error) {
log.Debug("listManagedLogicalDevices")
result := &voltha.LogicalDevices{}
- ldMgr.lockLogicalDeviceAgentsMap.RLock()
- defer ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
- for _, agent := range ldMgr.logicalDeviceAgents {
+ ldMgr.logicalDeviceAgents.Range(func(key, value interface{}) bool {
+ agent := value.(*LogicalDeviceAgent)
if ld, _ := agent.GetLogicalDevice(); ld != nil {
result.Items = append(result.Items, ld)
}
- }
+ return true
+ })
+
return result, nil
}
func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
log.Debug("ListAllLogicalDevices")
result := &voltha.LogicalDevices{}
- if logicalDevices := ldMgr.clusterDataProxy.List("/logical_devices", 0, false, ""); logicalDevices != nil {
+ if logicalDevices := ldMgr.clusterDataProxy.List(context.Background(), "/logical_devices", 0, false, ""); logicalDevices != nil {
for _, logicalDevice := range logicalDevices.([]interface{}) {
if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
agent = newLogicalDeviceAgent(
@@ -203,23 +190,23 @@
func (ldMgr *LogicalDeviceManager) stopManagingLogicalDeviceWithDeviceId(id string) string {
log.Infow("stop-managing-logical-device", log.Fields{"deviceId": id})
// Go over the list of logical device agents to find the one which has rootDeviceId as id
- ldMgr.lockLogicalDeviceAgentsMap.RLock()
- defer ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
- for ldId, ldAgent := range ldMgr.logicalDeviceAgents {
+ var ldId = ""
+ ldMgr.logicalDeviceAgents.Range(func(key, value interface{}) bool {
+ ldAgent := value.(*LogicalDeviceAgent)
if ldAgent.rootDeviceId == id {
- log.Infow("stopping-logical-device-agent", log.Fields{"lDeviceId": ldId})
+ log.Infow("stopping-logical-device-agent", log.Fields{"lDeviceId": key})
ldAgent.stop(nil)
- delete(ldMgr.logicalDeviceAgents, ldId)
- return ldId
+ ldMgr.logicalDeviceAgents.Delete(ldId)
+ ldId = key.(string)
}
- }
- return ""
+ return true
+ })
+ return ldId
}
//getLogicalDeviceFromModel retrieves the logical device data from the model.
func (ldMgr *LogicalDeviceManager) getLogicalDeviceFromModel(lDeviceId string) (*voltha.LogicalDevice, error) {
-
- if logicalDevice := ldMgr.clusterDataProxy.Get("/logical_devices/"+lDeviceId, 0, false, ""); logicalDevice != nil {
+ if logicalDevice := ldMgr.clusterDataProxy.Get(context.Background(), "/logical_devices/"+lDeviceId, 0, false, ""); logicalDevice != nil {
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice, nil
}
@@ -232,9 +219,7 @@
log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
// To prevent a race condition, let's hold the logical device agent map lock. This will prevent a loading and
// a create logical device callback from occurring at the same time.
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
- defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
- if ldAgent, _ := ldMgr.logicalDeviceAgents[lDeviceId]; ldAgent == nil {
+ if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
// Proceed with the loading only if the logical device exist in the Model (could have been deleted)
if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
// Create a temp logical device Agent and let it load from memory
@@ -243,7 +228,7 @@
agent.stop(nil)
return err
}
- ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+ ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
}
}
// TODO: load the child device
@@ -430,7 +415,7 @@
}
func (ldMgr *LogicalDeviceManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
- log.Debugw("setupUNILogicalPorts", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId})
+ log.Debugw("setupUNILogicalPorts", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId, "current-data": childDevice})
// Sanity check
if childDevice.Root {
return errors.New("Device-root")
@@ -473,7 +458,7 @@
}
func (ldMgr *LogicalDeviceManager) updatePortsState(device *voltha.Device, state voltha.AdminState_AdminState) error {
- log.Debugw("updatePortsState", log.Fields{"deviceId": device.Id, "state": state})
+ log.Debugw("updatePortsState", log.Fields{"deviceId": device.Id, "state": state, "current-data": device})
var ldId *string
var err error