VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index ad4f362..257b707 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,34 +33,31 @@
)
type DeviceManager struct {
- deviceAgents map[string]*DeviceAgent
- rootDevices map[string]bool
- lockRootDeviceMap sync.RWMutex
- core *Core
- adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
- logicalDeviceMgr *LogicalDeviceManager
- kafkaICProxy *kafka.InterContainerProxy
- stateTransitions *TransitionMap
- clusterDataProxy *model.Proxy
- coreInstanceId string
- exitChannel chan int
- defaultTimeout int64
- lockDeviceAgentsMap sync.RWMutex
+ deviceAgents sync.Map
+ rootDevices map[string]bool
+ lockRootDeviceMap sync.RWMutex
+ core *Core
+ adapterProxy *AdapterProxy
+ adapterMgr *AdapterManager
+ logicalDeviceMgr *LogicalDeviceManager
+ kafkaICProxy *kafka.InterContainerProxy
+ stateTransitions *TransitionMap
+ clusterDataProxy *model.Proxy
+ coreInstanceId string
+ exitChannel chan int
+ defaultTimeout int64
}
func newDeviceManager(core *Core) *DeviceManager {
var deviceMgr DeviceManager
deviceMgr.core = core
deviceMgr.exitChannel = make(chan int, 1)
- deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
deviceMgr.rootDevices = make(map[string]bool)
deviceMgr.kafkaICProxy = core.kmp
deviceMgr.adapterProxy = NewAdapterProxy(core.kmp)
deviceMgr.coreInstanceId = core.instanceId
deviceMgr.clusterDataProxy = core.clusterDataProxy
deviceMgr.adapterMgr = core.adapterMgr
- deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
deviceMgr.lockRootDeviceMap = sync.RWMutex{}
deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
return &deviceMgr
@@ -92,12 +89,9 @@
}
func (dMgr *DeviceManager) addDeviceAgentToMap(agent *DeviceAgent) {
- dMgr.lockDeviceAgentsMap.Lock()
- //defer dMgr.lockDeviceAgentsMap.Unlock()
- if _, exist := dMgr.deviceAgents[agent.deviceId]; !exist {
- dMgr.deviceAgents[agent.deviceId] = agent
+ if _, exist := dMgr.deviceAgents.Load(agent.deviceId); !exist {
+ dMgr.deviceAgents.Store(agent.deviceId, agent)
}
- dMgr.lockDeviceAgentsMap.Unlock()
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
dMgr.rootDevices[agent.deviceId] = agent.isRootdevice
@@ -105,34 +99,25 @@
}
func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
- dMgr.lockDeviceAgentsMap.Lock()
- //defer dMgr.lockDeviceAgentsMap.Unlock()
- delete(dMgr.deviceAgents, agent.deviceId)
- dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.deviceAgents.Delete(agent.deviceId)
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
delete(dMgr.rootDevices, agent.deviceId)
-
}
// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
- dMgr.lockDeviceAgentsMap.RLock()
- if agent, ok := dMgr.deviceAgents[deviceId]; ok {
- dMgr.lockDeviceAgentsMap.RUnlock()
- return agent
+ if agent, ok := dMgr.deviceAgents.Load(deviceId); ok {
+ return agent.(*DeviceAgent)
} else {
// Try to load into memory - loading will also create the device agent and set the device ownership
- dMgr.lockDeviceAgentsMap.RUnlock()
if err := dMgr.load(deviceId); err == nil {
- dMgr.lockDeviceAgentsMap.RLock()
- defer dMgr.lockDeviceAgentsMap.RUnlock()
- if agent, ok = dMgr.deviceAgents[deviceId]; !ok {
+ if agent, ok = dMgr.deviceAgents.Load(deviceId); !ok {
return nil
} else {
// Register this device for ownership tracking
go dMgr.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: deviceId})
- return agent
+ return agent.(*DeviceAgent)
}
} else {
//TODO: Change the return params to return an error as well
@@ -144,12 +129,13 @@
// listDeviceIdsFromMap returns the list of device IDs that are in memory
func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
- dMgr.lockDeviceAgentsMap.RLock()
- defer dMgr.lockDeviceAgentsMap.RUnlock()
result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
- for key := range dMgr.deviceAgents {
- result.Items = append(result.Items, &voltha.ID{Id: key})
- }
+
+ dMgr.deviceAgents.Range(func(key, value interface{}) bool {
+ result.Items = append(result.Items, &voltha.ID{Id: key.(string)})
+ return true
+ })
+
return result
}
@@ -355,9 +341,7 @@
}
func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
- dMgr.lockDeviceAgentsMap.RLock()
- defer dMgr.lockDeviceAgentsMap.RUnlock()
- _, exist := dMgr.deviceAgents[id]
+ _, exist := dMgr.deviceAgents.Load(id)
return exist
}
@@ -374,7 +358,7 @@
func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
log.Debug("ListDevices")
result := &voltha.Devices{}
- if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
+ if devices := dMgr.clusterDataProxy.List(context.Background(), "/devices", 0, false, ""); devices != nil {
for _, device := range devices.([]interface{}) {
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
@@ -396,7 +380,7 @@
//getDeviceFromModelretrieves the device data from the model.
func (dMgr *DeviceManager) getDeviceFromModel(deviceId string) (*voltha.Device, error) {
- if device := dMgr.clusterDataProxy.Get("/devices/"+deviceId, 0, false, ""); device != nil {
+ if device := dMgr.clusterDataProxy.Get(context.Background(), "/devices/"+deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
return d, nil
}
@@ -791,7 +775,7 @@
log.Debugw("no-op-transition", log.Fields{"deviceId": current.Id})
return nil
}
- log.Debugw("handler-found", log.Fields{"num-handlers": len(handlers), "isParent": current.Root})
+ log.Debugw("handler-found", log.Fields{"num-handlers": len(handlers), "isParent": current.Root, "current-data": current})
for _, handler := range handlers {
log.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
if err := handler(current); err != nil {
@@ -1024,13 +1008,13 @@
func (dMgr *DeviceManager) getAllDeviceIdsWithDeviceParentId(id string) []string {
log.Debugw("getAllAgentsWithDeviceParentId", log.Fields{"parentDeviceId": id})
deviceIds := make([]string, 0)
- dMgr.lockDeviceAgentsMap.RLock()
- defer dMgr.lockDeviceAgentsMap.RUnlock()
- for deviceId, agent := range dMgr.deviceAgents {
+ dMgr.deviceAgents.Range(func(key, value interface{}) bool {
+ agent := value.(*DeviceAgent)
if agent.parentId == id {
- deviceIds = append(deviceIds, deviceId)
+ deviceIds = append(deviceIds, key.(string))
}
- }
+ return true
+ })
return deviceIds
}
@@ -1218,7 +1202,12 @@
}
func (dMgr *DeviceManager) NotifyInvalidTransition(pcDevice *voltha.Device) error {
- log.Errorw("NotifyInvalidTransition", log.Fields{"device": pcDevice.Id, "adminState": pcDevice.AdminState})
+ log.Errorw("NotifyInvalidTransition", log.Fields{
+ "device": pcDevice.Id,
+ "adminState": pcDevice.AdminState,
+ "operState": pcDevice.OperStatus,
+ "connState": pcDevice.ConnectStatus,
+ })
//TODO: notify over kafka?
return nil
}
@@ -1230,8 +1219,8 @@
}
func (dMgr *DeviceManager) UpdateDeviceAttribute(deviceId string, attribute string, value interface{}) {
- if agent, ok := dMgr.deviceAgents[deviceId]; ok {
- agent.updateDeviceAttribute(attribute, value)
+ if agent, ok := dMgr.deviceAgents.Load(deviceId); ok {
+ agent.(*DeviceAgent).updateDeviceAttribute(attribute, value)
}
}