VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
index 90aadae..54cd455 100644
--- a/ro_core/core/core.go
+++ b/ro_core/core/core.go
@@ -66,8 +66,8 @@
PathPrefix: "service/voltha"}
core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
- core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
- core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
+ core.clusterDataProxy = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
+ core.localDataProxy = core.localDataRoot.CreateProxy(context.Background(), "/", false)
return &core
}
diff --git a/ro_core/core/device_agent.go b/ro_core/core/device_agent.go
index dfaf767..a64931c 100644
--- a/ro_core/core/device_agent.go
+++ b/ro_core/core/device_agent.go
@@ -56,7 +56,7 @@
defer agent.lockDevice.Unlock()
log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
if loadFromDb {
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
agent.lastData = proto.Clone(d).(*voltha.Device)
}
@@ -83,7 +83,7 @@
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -95,7 +95,7 @@
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
// This function is meant so that we do not have duplicate code all over the device agent functions
func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
diff --git a/ro_core/core/device_manager.go b/ro_core/core/device_manager.go
index f3a1f6c..90c7822 100644
--- a/ro_core/core/device_manager.go
+++ b/ro_core/core/device_manager.go
@@ -26,21 +26,18 @@
)
type DeviceManager struct {
- deviceAgents map[string]*DeviceAgent
- logicalDeviceMgr *LogicalDeviceManager
- clusterDataProxy *model.Proxy
- coreInstanceId string
- exitChannel chan int
- lockDeviceAgentsMap sync.RWMutex
+ deviceAgents sync.Map
+ logicalDeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ coreInstanceId string
+ exitChannel chan int
}
func newDeviceManager(cdProxy *model.Proxy, coreInstanceId string) *DeviceManager {
var deviceMgr DeviceManager
deviceMgr.exitChannel = make(chan int, 1)
- deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
deviceMgr.coreInstanceId = coreInstanceId
deviceMgr.clusterDataProxy = cdProxy
- deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
return &deviceMgr
}
@@ -69,32 +66,23 @@
}
func (dMgr *DeviceManager) addDeviceAgentToMap(agent *DeviceAgent) {
- dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
- if _, exist := dMgr.deviceAgents[agent.deviceId]; !exist {
- dMgr.deviceAgents[agent.deviceId] = agent
+ if _, exist := dMgr.deviceAgents.Load(agent.deviceId); !exist {
+ dMgr.deviceAgents.Store(agent.deviceId, agent)
}
}
func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
- dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
- delete(dMgr.deviceAgents, agent.deviceId)
+ dMgr.deviceAgents.Delete(agent.deviceId)
}
func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
- dMgr.lockDeviceAgentsMap.Lock()
- if agent, ok := dMgr.deviceAgents[deviceId]; ok {
- dMgr.lockDeviceAgentsMap.Unlock()
- return agent
+ if agent, ok := dMgr.deviceAgents.Load(deviceId); ok {
+ return agent.(*DeviceAgent)
} else {
// Try to load into memory - loading will also create the device agent
- dMgr.lockDeviceAgentsMap.Unlock()
if err := dMgr.load(deviceId); err == nil {
- dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
- if agent, ok = dMgr.deviceAgents[deviceId]; ok {
- return agent
+ if agent, ok = dMgr.deviceAgents.Load(deviceId); ok {
+ return agent.(*DeviceAgent)
}
}
}
@@ -103,12 +91,11 @@
// listDeviceIdsFromMap returns the list of device IDs that are in memory
func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
- dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
- for key, _ := range dMgr.deviceAgents {
- result.Items = append(result.Items, &voltha.ID{Id: key})
- }
+ dMgr.deviceAgents.Range(func(key, value interface{}) bool {
+ result.Items = append(result.Items, &voltha.ID{Id: key.(string)})
+ return true
+ })
return result
}
@@ -122,9 +109,7 @@
}
func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
- dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
- _, exist := dMgr.deviceAgents[id]
+ _, exist := dMgr.deviceAgents.Load(id)
return exist
}
@@ -140,7 +125,7 @@
func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
log.Debug("ListDevices")
result := &voltha.Devices{}
- if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
+ if devices := dMgr.clusterDataProxy.List(context.Background(), "/devices", 0, false, ""); devices != nil {
for _, device := range devices.([]interface{}) {
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
diff --git a/ro_core/core/logical_device_agent.go b/ro_core/core/logical_device_agent.go
index 1357bd4..d1c8887 100644
--- a/ro_core/core/logical_device_agent.go
+++ b/ro_core/core/logical_device_agent.go
@@ -57,7 +57,7 @@
if loadFromDb {
// load from dB - the logical may not exist at this time. On error, just return and the calling function
// will destroy this agent.
- if logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, ""); logicalDevice != nil {
+ if logicalDevice := agent.clusterDataProxy.Get(ctx, "/logical_devices/"+agent.logicalDeviceId, 0, false, ""); logicalDevice != nil {
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
agent.lastData = proto.Clone(lDevice).(*voltha.LogicalDevice)
}
@@ -85,8 +85,7 @@
log.Debug("GetLogicalDevice")
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- if logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false,
- ""); logicalDevice != nil {
+ if logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, ""); logicalDevice != nil {
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice, nil
}
diff --git a/ro_core/core/logical_device_manager.go b/ro_core/core/logical_device_manager.go
index 05b494a..db220d5 100644
--- a/ro_core/core/logical_device_manager.go
+++ b/ro_core/core/logical_device_manager.go
@@ -26,7 +26,7 @@
)
type LogicalDeviceManager struct {
- logicalDeviceAgents map[string]*LogicalDeviceAgent
+ logicalDeviceAgents sync.Map
deviceMgr *DeviceManager
grpcNbiHdlr *APIHandler
clusterDataProxy *model.Proxy
@@ -37,7 +37,6 @@
func newLogicalDeviceManager(deviceMgr *DeviceManager, cdProxy *model.Proxy) *LogicalDeviceManager {
var logicalDeviceMgr LogicalDeviceManager
logicalDeviceMgr.exitChannel = make(chan int, 1)
- logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
logicalDeviceMgr.deviceMgr = deviceMgr
logicalDeviceMgr.clusterDataProxy = cdProxy
logicalDeviceMgr.lockLogicalDeviceAgentsMap = sync.RWMutex{}
@@ -60,26 +59,21 @@
}
func (ldMgr *LogicalDeviceManager) addLogicalDeviceAgentToMap(agent *LogicalDeviceAgent) {
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
- defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
- if _, exist := ldMgr.logicalDeviceAgents[agent.logicalDeviceId]; !exist {
- ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+ if _, exist := ldMgr.logicalDeviceAgents.Load(agent.logicalDeviceId); !exist {
+ ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
}
}
func (ldMgr *LogicalDeviceManager) getLogicalDeviceAgent(logicalDeviceId string) *LogicalDeviceAgent {
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
- defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
- if agent, ok := ldMgr.logicalDeviceAgents[logicalDeviceId]; ok {
- return agent
+ if agent, ok := ldMgr.logicalDeviceAgents.Load(logicalDeviceId); ok {
+ //return agent
+ return agent.(*LogicalDeviceAgent)
}
return nil
}
func (ldMgr *LogicalDeviceManager) deleteLogicalDeviceAgent(logicalDeviceId string) {
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
- defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
- delete(ldMgr.logicalDeviceAgents, logicalDeviceId)
+ ldMgr.logicalDeviceAgents.Delete(logicalDeviceId)
}
// GetLogicalDevice provides a cloned most up to date logical device
@@ -94,14 +88,15 @@
func (ldMgr *LogicalDeviceManager) IsLogicalDeviceInCache(id string) bool {
ldMgr.lockLogicalDeviceAgentsMap.Lock()
defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
- _, exist := ldMgr.logicalDeviceAgents[id]
+ _, exist := ldMgr.logicalDeviceAgents.Load(id)
return exist
}
func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
log.Debug("ListAllLogicalDevices")
result := &voltha.LogicalDevices{}
- if logicalDevices := ldMgr.clusterDataProxy.List("/logical_devices", 0, false, ""); logicalDevices != nil {
+ if logicalDevices := ldMgr.clusterDataProxy.List(context.Background(), "/logical_devices", 0, false,
+ ""); logicalDevices != nil {
for _, logicalDevice := range logicalDevices.([]interface{}) {
// If device is not in memory then set it up
if !ldMgr.IsLogicalDeviceInCache(logicalDevice.(*voltha.LogicalDevice).Id) {
@@ -130,16 +125,14 @@
log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
// To prevent a race condition, let's hold the logical device agent map lock. This will prevent a loading and
// a create logical device callback from occurring at the same time.
- ldMgr.lockLogicalDeviceAgentsMap.Lock()
- defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
- if ldAgent, _ := ldMgr.logicalDeviceAgents[lDeviceId]; ldAgent == nil {
+ if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
// Logical device not in memory - create a temp logical device Agent and let it load from memory
agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
if err := agent.start(nil, true); err != nil {
agent.stop(nil)
return err
}
- ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+ ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
}
// TODO: load the child device
return nil
diff --git a/ro_core/core/model_proxy.go b/ro_core/core/model_proxy.go
index f5e6c3b..473e579 100644
--- a/ro_core/core/model_proxy.go
+++ b/ro_core/core/model_proxy.go
@@ -16,6 +16,7 @@
package core
import (
+ "context"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
"google.golang.org/grpc/codes"
@@ -55,7 +56,7 @@
log.Debugw("get-data", log.Fields{"path": path})
- if data := mp.rootProxy.Get(path, 1, false, ""); data != nil {
+ if data := mp.rootProxy.Get(context.Background(), path, 1, false, ""); data != nil {
return data, nil
}
return nil, status.Errorf(codes.NotFound, "data-path: %s", path)