[VOL-1890] Flow decomposition fails after a core switch over
Cherry Pick into the master branch from voltha-2.1
Change-Id: I84c3a83b5b9115d6ec334af29634486ba7148634
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f417b05..f5b017c 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,19 +33,21 @@
)
type DeviceManager struct {
- deviceAgents sync.Map
- rootDevices map[string]bool
- lockRootDeviceMap sync.RWMutex
- core *Core
- adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
- logicalDeviceMgr *LogicalDeviceManager
- kafkaICProxy *kafka.InterContainerProxy
- stateTransitions *TransitionMap
- clusterDataProxy *model.Proxy
- coreInstanceId string
- exitChannel chan int
- defaultTimeout int64
+ deviceAgents sync.Map
+ rootDevices map[string]bool
+ lockRootDeviceMap sync.RWMutex
+ core *Core
+ adapterProxy *AdapterProxy
+ adapterMgr *AdapterManager
+ logicalDeviceMgr *LogicalDeviceManager
+ kafkaICProxy *kafka.InterContainerProxy
+ stateTransitions *TransitionMap
+ clusterDataProxy *model.Proxy
+ coreInstanceId string
+ exitChannel chan int
+ defaultTimeout int64
+ devicesLoadingLock sync.RWMutex
+ deviceLoadingInProgress map[string][]chan int
}
func newDeviceManager(core *Core) *DeviceManager {
@@ -60,6 +62,8 @@
deviceMgr.adapterMgr = core.adapterMgr
deviceMgr.lockRootDeviceMap = sync.RWMutex{}
deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
+ deviceMgr.devicesLoadingLock = sync.RWMutex{}
+ deviceMgr.deviceLoadingInProgress = make(map[string][]chan int)
return &deviceMgr
}
@@ -98,7 +102,7 @@
}
-func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
+func (dMgr *DeviceManager) deleteDeviceAgentFromMap(agent *DeviceAgent) {
dMgr.deviceAgents.Delete(agent.deviceId)
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
@@ -220,7 +224,7 @@
}
if agent := dMgr.getDeviceAgent(id); agent != nil {
agent.stop(nil)
- dMgr.deleteDeviceAgentToMap(agent)
+ dMgr.deleteDeviceAgentFromMap(agent)
// Abandon the device ownership
dMgr.core.deviceOwnership.AbandonDevice(id)
}
@@ -390,28 +394,52 @@
// loadDevice loads the deviceId in memory, if not present
func (dMgr *DeviceManager) loadDevice(deviceId string) (*DeviceAgent, error) {
- log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
- // Sanity check
if deviceId == "" {
return nil, status.Error(codes.InvalidArgument, "deviceId empty")
}
- if !dMgr.IsDeviceInCache(deviceId) {
- // Proceed with the loading only if the device exist in the Model (could have been deleted)
- if device, err := dMgr.getDeviceFromModel(deviceId); err == nil {
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
- agent.stop(nil)
- return nil, err
+ var err error
+ var device *voltha.Device
+ dMgr.devicesLoadingLock.Lock()
+ if _, exist := dMgr.deviceLoadingInProgress[deviceId]; !exist {
+ if !dMgr.IsDeviceInCache(deviceId) {
+ dMgr.deviceLoadingInProgress[deviceId] = []chan int{make(chan int, 1)}
+ dMgr.devicesLoadingLock.Unlock()
+ // Proceed with the loading only if the device exist in the Model (could have been deleted)
+ if device, err = dMgr.getDeviceFromModel(deviceId); err == nil {
+ log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
+ agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ if err = agent.start(nil, true); err != nil {
+ log.Warnw("Failure loading device", log.Fields{"deviceId": deviceId, "error": err})
+ agent.stop(nil)
+ } else {
+ dMgr.addDeviceAgentToMap(agent)
+ }
+ } else {
+ log.Debugw("Device not in model", log.Fields{"deviceId": deviceId})
}
- dMgr.addDeviceAgentToMap(agent)
+ // announce completion of task to any number of waiting channels
+ dMgr.devicesLoadingLock.Lock()
+ if v, ok := dMgr.deviceLoadingInProgress[deviceId]; ok {
+ for _, ch := range v {
+ close(ch)
+ }
+ delete(dMgr.deviceLoadingInProgress, deviceId)
+ }
+ dMgr.devicesLoadingLock.Unlock()
} else {
- return nil, status.Error(codes.NotFound, deviceId)
+ dMgr.devicesLoadingLock.Unlock()
}
+ } else {
+ ch := make(chan int, 1)
+ dMgr.deviceLoadingInProgress[deviceId] = append(dMgr.deviceLoadingInProgress[deviceId], ch)
+ dMgr.devicesLoadingLock.Unlock()
+ // Wait for the channel to be closed, implying the process loading this device is done.
+ <-ch
}
- if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent, nil
+ if agent, ok := dMgr.deviceAgents.Load(deviceId); ok {
+ return agent.(*DeviceAgent), nil
}
- return nil, status.Error(codes.NotFound, deviceId) // This should not happen
+ return nil, status.Errorf(codes.Aborted, "Error loading device %s", deviceId)
}
// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
@@ -431,7 +459,7 @@
if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
for _, childDeviceId := range childDeviceIds {
if _, err := dMgr.loadDevice(childDeviceId); err != nil {
- log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId})
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId, "error": err})
return err
}
}
@@ -490,35 +518,24 @@
return dMgr.listDeviceIdsFromMap(), nil
}
-//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+//ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
+//trigger loading the devices along with their children and parent in memory
func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
- log.Debug("ReconcileDevices")
+ log.Debugw("ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
var res interface{}
- if ids != nil {
+ if ids != nil && len(ids.Items) != 0 {
toReconcile := len(ids.Items)
reconciled := 0
+ var err error
for _, id := range ids.Items {
- // Act on the device only if its not present in the agent map
- if !dMgr.IsDeviceInCache(id.Id) {
- // Device Id not in memory
- log.Debugw("reconciling-device", log.Fields{"id": id.Id})
- // Proceed with the loading only if the device exist in the Model (could have been deleted)
- if device, err := dMgr.getDeviceFromModel(id.Id); err == nil {
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
- log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
- agent.stop(nil)
- } else {
- dMgr.addDeviceAgentToMap(agent)
- reconciled += 1
- }
- } else {
- reconciled += 1
- }
+ if err = dMgr.load(id.Id); err != nil {
+ log.Warnw("failure-reconciling-device", log.Fields{"deviceId": id.Id, "error": err})
+ } else {
+ reconciled += 1
}
}
if toReconcile != reconciled {
- res = status.Errorf(codes.DataLoss, "less-device-reconciled:%d/%d", reconciled, toReconcile)
+ res = status.Errorf(codes.DataLoss, "less-device-reconciled-than-requested:%d/%d", reconciled, toReconcile)
}
} else {
res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
@@ -1137,7 +1154,7 @@
allChildDeleted = false
} else {
agent.stop(nil)
- dMgr.deleteDeviceAgentToMap(agent)
+ dMgr.deleteDeviceAgentFromMap(agent)
}
}
}
@@ -1369,12 +1386,12 @@
}
}
-func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) *string {
+func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) string {
if device, _ := dMgr.GetDevice(deviceId); device != nil {
log.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
- return &device.ParentId
+ return device.ParentId
}
- return nil
+ return ""
}
func (dMgr *DeviceManager) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {