[VOL-1890] Flow decomposition fails after a core switch over
Cherry Pick into the master branch from voltha-2.1
Change-Id: I84c3a83b5b9115d6ec334af29634486ba7148634
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 71843ff..235aca5 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -30,15 +30,17 @@
)
type LogicalDeviceManager struct {
- logicalDeviceAgents sync.Map
- core *Core
- deviceMgr *DeviceManager
- grpcNbiHdlr *APIHandler
- adapterProxy *AdapterProxy
- kafkaICProxy *kafka.InterContainerProxy
- clusterDataProxy *model.Proxy
- exitChannel chan int
- defaultTimeout int64
+ logicalDeviceAgents sync.Map
+ core *Core
+ deviceMgr *DeviceManager
+ grpcNbiHdlr *APIHandler
+ adapterProxy *AdapterProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ defaultTimeout int64
+ logicalDevicesLoadingLock sync.RWMutex
+ logicalDeviceLoadingInProgress map[string][]chan int
}
func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
@@ -49,6 +51,8 @@
logicalDeviceMgr.kafkaICProxy = kafkaICProxy
logicalDeviceMgr.clusterDataProxy = cdProxy
logicalDeviceMgr.defaultTimeout = timeout
+ logicalDeviceMgr.logicalDevicesLoadingLock = sync.RWMutex{}
+ logicalDeviceMgr.logicalDeviceLoadingInProgress = make(map[string][]chan int)
return &logicalDeviceMgr
}
@@ -205,23 +209,49 @@
// load loads a logical device manager in memory
func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
- log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
- // To prevent a race condition, let's hold the logical device agent map lock. This will prevent a loading and
- // a create logical device callback from occurring at the same time.
- if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
- // Proceed with the loading only if the logical device exist in the Model (could have been deleted)
- if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
- // Create a temp logical device Agent and let it load from memory
- agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
- agent.stop(nil)
- return err
- }
- ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
- }
+ if lDeviceId == "" {
+ return nil
}
- // TODO: load the child device
- return nil
+ // Add a lock to prevent two concurrent calls from loading the same device twice
+ ldMgr.logicalDevicesLoadingLock.Lock()
+ if _, exist := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; !exist {
+ if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
+ ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = []chan int{make(chan int, 1)}
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
+ log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+ agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ if err := agent.start(nil, true); err != nil {
+ agent.stop(nil)
+ } else {
+ ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
+ }
+ } else {
+ log.Debugw("logicalDevice not in model", log.Fields{"lDeviceId": lDeviceId})
+ }
+ // announce completion of task to any number of waiting channels
+ ldMgr.logicalDevicesLoadingLock.Lock()
+ if v, ok := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; ok {
+ for _, ch := range v {
+ close(ch)
+ }
+ delete(ldMgr.logicalDeviceLoadingInProgress, lDeviceId)
+ }
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ } else {
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ }
+ } else {
+ ch := make(chan int, 1)
+ ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = append(ldMgr.logicalDeviceLoadingInProgress[lDeviceId], ch)
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ // Wait for the channel to be closed, implying the process loading this device is done.
+ <-ch
+ }
+ if _, exist := ldMgr.logicalDeviceAgents.Load(lDeviceId); exist {
+ return nil
+ }
+ return status.Errorf(codes.Aborted, "Error loading logical device %s", lDeviceId)
}
func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
@@ -416,11 +446,11 @@
log.Debugw("setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
- if parentId == "" || logDeviceId == nil || *logDeviceId == "" {
+ if parentId == "" || logDeviceId == "" {
return errors.New("device-in-invalid-state")
}
- if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
if err := agent.setupUNILogicalPorts(ctx, childDevice); err != nil {
return err
}