[VOL-1890] Flow decomposition fails after a core switch over
Cherry Pick into the master branch from voltha-2.1
Change-Id: I84c3a83b5b9115d6ec334af29634486ba7148634
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index a61ca25..89b1fe9 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -95,18 +95,19 @@
log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
return status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
}
- log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
+ log.Debugw("device-loaded-from-dB", log.Fields{"deviceId": agent.deviceId})
} else {
// Add the initial device to the local model
if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceId, agent.lastData, ""); added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+ return status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceId)
}
}
agent.deviceProxy = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceId, false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
- log.Debug("device-agent-started")
+ log.Debugw("device-agent-started", log.Fields{"deviceId": agent.deviceId})
return nil
}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f417b05..f5b017c 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,19 +33,21 @@
)
type DeviceManager struct {
- deviceAgents sync.Map
- rootDevices map[string]bool
- lockRootDeviceMap sync.RWMutex
- core *Core
- adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
- logicalDeviceMgr *LogicalDeviceManager
- kafkaICProxy *kafka.InterContainerProxy
- stateTransitions *TransitionMap
- clusterDataProxy *model.Proxy
- coreInstanceId string
- exitChannel chan int
- defaultTimeout int64
+ deviceAgents sync.Map
+ rootDevices map[string]bool
+ lockRootDeviceMap sync.RWMutex
+ core *Core
+ adapterProxy *AdapterProxy
+ adapterMgr *AdapterManager
+ logicalDeviceMgr *LogicalDeviceManager
+ kafkaICProxy *kafka.InterContainerProxy
+ stateTransitions *TransitionMap
+ clusterDataProxy *model.Proxy
+ coreInstanceId string
+ exitChannel chan int
+ defaultTimeout int64
+ devicesLoadingLock sync.RWMutex
+ deviceLoadingInProgress map[string][]chan int
}
func newDeviceManager(core *Core) *DeviceManager {
@@ -60,6 +62,8 @@
deviceMgr.adapterMgr = core.adapterMgr
deviceMgr.lockRootDeviceMap = sync.RWMutex{}
deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
+ deviceMgr.devicesLoadingLock = sync.RWMutex{}
+ deviceMgr.deviceLoadingInProgress = make(map[string][]chan int)
return &deviceMgr
}
@@ -98,7 +102,7 @@
}
-func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
+func (dMgr *DeviceManager) deleteDeviceAgentFromMap(agent *DeviceAgent) {
dMgr.deviceAgents.Delete(agent.deviceId)
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
@@ -220,7 +224,7 @@
}
if agent := dMgr.getDeviceAgent(id); agent != nil {
agent.stop(nil)
- dMgr.deleteDeviceAgentToMap(agent)
+ dMgr.deleteDeviceAgentFromMap(agent)
// Abandon the device ownership
dMgr.core.deviceOwnership.AbandonDevice(id)
}
@@ -390,28 +394,52 @@
// loadDevice loads the deviceId in memory, if not present
func (dMgr *DeviceManager) loadDevice(deviceId string) (*DeviceAgent, error) {
- log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
- // Sanity check
if deviceId == "" {
return nil, status.Error(codes.InvalidArgument, "deviceId empty")
}
- if !dMgr.IsDeviceInCache(deviceId) {
- // Proceed with the loading only if the device exist in the Model (could have been deleted)
- if device, err := dMgr.getDeviceFromModel(deviceId); err == nil {
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
- agent.stop(nil)
- return nil, err
+ var err error
+ var device *voltha.Device
+ dMgr.devicesLoadingLock.Lock()
+ if _, exist := dMgr.deviceLoadingInProgress[deviceId]; !exist {
+ if !dMgr.IsDeviceInCache(deviceId) {
+ dMgr.deviceLoadingInProgress[deviceId] = []chan int{make(chan int, 1)}
+ dMgr.devicesLoadingLock.Unlock()
+ // Proceed with the loading only if the device exist in the Model (could have been deleted)
+ if device, err = dMgr.getDeviceFromModel(deviceId); err == nil {
+ log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
+ agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ if err = agent.start(nil, true); err != nil {
+ log.Warnw("Failure loading device", log.Fields{"deviceId": deviceId, "error": err})
+ agent.stop(nil)
+ } else {
+ dMgr.addDeviceAgentToMap(agent)
+ }
+ } else {
+ log.Debugw("Device not in model", log.Fields{"deviceId": deviceId})
}
- dMgr.addDeviceAgentToMap(agent)
+ // announce completion of task to any number of waiting channels
+ dMgr.devicesLoadingLock.Lock()
+ if v, ok := dMgr.deviceLoadingInProgress[deviceId]; ok {
+ for _, ch := range v {
+ close(ch)
+ }
+ delete(dMgr.deviceLoadingInProgress, deviceId)
+ }
+ dMgr.devicesLoadingLock.Unlock()
} else {
- return nil, status.Error(codes.NotFound, deviceId)
+ dMgr.devicesLoadingLock.Unlock()
}
+ } else {
+ ch := make(chan int, 1)
+ dMgr.deviceLoadingInProgress[deviceId] = append(dMgr.deviceLoadingInProgress[deviceId], ch)
+ dMgr.devicesLoadingLock.Unlock()
+ // Wait for the channel to be closed, implying the process loading this device is done.
+ <-ch
}
- if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent, nil
+ if agent, ok := dMgr.deviceAgents.Load(deviceId); ok {
+ return agent.(*DeviceAgent), nil
}
- return nil, status.Error(codes.NotFound, deviceId) // This should not happen
+ return nil, status.Errorf(codes.Aborted, "Error loading device %s", deviceId)
}
// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
@@ -431,7 +459,7 @@
if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
for _, childDeviceId := range childDeviceIds {
if _, err := dMgr.loadDevice(childDeviceId); err != nil {
- log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId})
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId, "error": err})
return err
}
}
@@ -490,35 +518,24 @@
return dMgr.listDeviceIdsFromMap(), nil
}
-//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+//ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
+//trigger loading the devices along with their children and parent in memory
func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
- log.Debug("ReconcileDevices")
+ log.Debugw("ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
var res interface{}
- if ids != nil {
+ if ids != nil && len(ids.Items) != 0 {
toReconcile := len(ids.Items)
reconciled := 0
+ var err error
for _, id := range ids.Items {
- // Act on the device only if its not present in the agent map
- if !dMgr.IsDeviceInCache(id.Id) {
- // Device Id not in memory
- log.Debugw("reconciling-device", log.Fields{"id": id.Id})
- // Proceed with the loading only if the device exist in the Model (could have been deleted)
- if device, err := dMgr.getDeviceFromModel(id.Id); err == nil {
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
- log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
- agent.stop(nil)
- } else {
- dMgr.addDeviceAgentToMap(agent)
- reconciled += 1
- }
- } else {
- reconciled += 1
- }
+ if err = dMgr.load(id.Id); err != nil {
+ log.Warnw("failure-reconciling-device", log.Fields{"deviceId": id.Id, "error": err})
+ } else {
+ reconciled += 1
}
}
if toReconcile != reconciled {
- res = status.Errorf(codes.DataLoss, "less-device-reconciled:%d/%d", reconciled, toReconcile)
+ res = status.Errorf(codes.DataLoss, "less-device-reconciled-than-requested:%d/%d", reconciled, toReconcile)
}
} else {
res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
@@ -1137,7 +1154,7 @@
allChildDeleted = false
} else {
agent.stop(nil)
- dMgr.deleteDeviceAgentToMap(agent)
+ dMgr.deleteDeviceAgentFromMap(agent)
}
}
}
@@ -1369,12 +1386,12 @@
}
}
-func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) *string {
+func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) string {
if device, _ := dMgr.GetDevice(deviceId); device != nil {
log.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
- return &device.ParentId
+ return device.ParentId
}
- return nil
+ return ""
}
func (dMgr *DeviceManager) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 49e1463..65b3b30 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -50,6 +50,7 @@
portProxies map[string]*model.Proxy
portProxiesLock sync.RWMutex
lockLogicalDevice sync.RWMutex
+ lockDeviceGraph sync.RWMutex
logicalPortsNo map[uint32]bool //value is true for NNI port
lockLogicalPortsNo sync.RWMutex
flowDecomposer *fd.FlowDecomposer
@@ -71,6 +72,7 @@
agent.portProxies = make(map[string]*model.Proxy)
agent.portProxiesLock = sync.RWMutex{}
agent.lockLogicalPortsNo = sync.RWMutex{}
+ agent.lockDeviceGraph = sync.RWMutex{}
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
return &agent
@@ -130,8 +132,6 @@
// Setup the local list of logical ports
agent.addLogicalPortsToMap(ld.Ports)
- // Setup the device graph
- agent.generateDeviceGraph()
}
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -161,6 +161,10 @@
return status.Error(codes.Internal, "logical-device-proxy-null")
}
+ // Setup the device graph - run it in its own routine
+ if loadFromdB {
+ go agent.generateDeviceGraph()
+ }
return nil
}
@@ -267,32 +271,53 @@
return nil
}
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.flowProxy.Update(updateCtx, "/", flows, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
+ ld, err := agent.getLogicalDeviceWithoutLock()
+ if err != nil {
+ return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+ }
+ log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ cloned.Flows = flows
+
+ if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Errorw("error-updating-logical-device-with-flows", log.Fields{"error": err})
+ return err
}
return nil
}
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+//updateLogicalDeviceMetersWithoutLock updates the logical device with the meters info
func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.meterProxy.Update(updateCtx, "/", meters, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device-meters:%s", agent.logicalDeviceId)
+ ld, err := agent.getLogicalDeviceWithoutLock()
+ if err != nil {
+ return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+ }
+ log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ cloned.Meters = meters
+
+ if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
+ return err
}
return nil
}
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowGroupsWithoutLock updates the logical device with the flow groups
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
+ ld, err := agent.getLogicalDeviceWithoutLock()
+ if err != nil {
+ return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+ }
+ log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ cloned.FlowGroups = flowGroups
+
+ if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Errorw("error-updating-logical-device-with-flowgroups", log.Fields{"error": err})
+ return err
}
return nil
}
@@ -507,12 +532,33 @@
return nil
}
+//generateDeviceGraphIfNeeded generates the device graph if the logical device has been updated since the last time
+//that device graph was generated.
+func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded() error {
+ if ld, err := agent.GetLogicalDevice(); err != nil {
+ log.Errorw("get-logical-device-error", log.Fields{"error": err})
+ return err
+ } else {
+ agent.lockDeviceGraph.Lock()
+ defer agent.lockDeviceGraph.Unlock()
+ if agent.deviceGraph != nil && agent.deviceGraph.IsUpToDate(ld) {
+ return nil
+ }
+ log.Debug("Generation of device graph required")
+ agent.generateDeviceGraph()
+ }
+ return nil
+}
+
//updateFlowTable updates the flow table of that logical device
func (agent *LogicalDeviceAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
log.Debug("updateFlowTable")
if flow == nil {
return nil
}
+ if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ return err
+ }
switch flow.GetCommand() {
case ofp.OfpFlowModCommand_OFPFC_ADD:
return agent.flowAdd(flow)
@@ -535,6 +581,9 @@
if groupMod == nil {
return nil
}
+ if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ return err
+ }
switch groupMod.GetCommand() {
case ofp.OfpGroupModCommand_OFPGC_ADD:
return agent.groupAdd(groupMod)
@@ -553,6 +602,9 @@
if meterMod == nil {
return nil
}
+ if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ return err
+ }
switch meterMod.GetCommand() {
case ofp.OfpMeterModCommand_OFPMC_ADD:
return agent.meterAdd(meterMod)
@@ -765,7 +817,7 @@
//flowAdd adds a flow to the flow table of that logical device
func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
- log.Debug("flowAdd")
+ log.Debugw("flowAdd", log.Fields{"flow": mod})
if mod == nil {
return nil
}
@@ -826,6 +878,8 @@
changed = true
}
}
+ log.Debugw("flowAdd-changed", log.Fields{"changed": changed})
+
if changed {
var flowMetadata voltha.FlowMetadata
if err := agent.GetMeterConfig(updatedFlows, meters, &flowMetadata); err != nil { // This should never happen,meters should be installed before flow arrives
@@ -1500,14 +1554,14 @@
//generateDeviceGraph regenerates the device graph
func (agent *LogicalDeviceAgent) generateDeviceGraph() {
- log.Debugf("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ log.Debugw("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
// Get the latest logical device
if ld, err := agent.getLogicalDeviceWithoutLock(); err != nil {
log.Errorw("logical-device-not-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
} else {
- log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "deviceGraph": agent.deviceGraph, "lPorts": len(ld.Ports)})
+ log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "lPorts": len(ld.Ports)})
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 71843ff..235aca5 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -30,15 +30,17 @@
)
type LogicalDeviceManager struct {
- logicalDeviceAgents sync.Map
- core *Core
- deviceMgr *DeviceManager
- grpcNbiHdlr *APIHandler
- adapterProxy *AdapterProxy
- kafkaICProxy *kafka.InterContainerProxy
- clusterDataProxy *model.Proxy
- exitChannel chan int
- defaultTimeout int64
+ logicalDeviceAgents sync.Map
+ core *Core
+ deviceMgr *DeviceManager
+ grpcNbiHdlr *APIHandler
+ adapterProxy *AdapterProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ defaultTimeout int64
+ logicalDevicesLoadingLock sync.RWMutex
+ logicalDeviceLoadingInProgress map[string][]chan int
}
func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
@@ -49,6 +51,8 @@
logicalDeviceMgr.kafkaICProxy = kafkaICProxy
logicalDeviceMgr.clusterDataProxy = cdProxy
logicalDeviceMgr.defaultTimeout = timeout
+ logicalDeviceMgr.logicalDevicesLoadingLock = sync.RWMutex{}
+ logicalDeviceMgr.logicalDeviceLoadingInProgress = make(map[string][]chan int)
return &logicalDeviceMgr
}
@@ -205,23 +209,49 @@
// load loads a logical device manager in memory
func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
- log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
- // To prevent a race condition, let's hold the logical device agent map lock. This will prevent a loading and
- // a create logical device callback from occurring at the same time.
- if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
- // Proceed with the loading only if the logical device exist in the Model (could have been deleted)
- if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
- // Create a temp logical device Agent and let it load from memory
- agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
- agent.stop(nil)
- return err
- }
- ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
- }
+ if lDeviceId == "" {
+ return nil
}
- // TODO: load the child device
- return nil
+ // Add a lock to prevent two concurrent calls from loading the same device twice
+ ldMgr.logicalDevicesLoadingLock.Lock()
+ if _, exist := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; !exist {
+ if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
+ ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = []chan int{make(chan int, 1)}
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
+ log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+ agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ if err := agent.start(nil, true); err != nil {
+ agent.stop(nil)
+ } else {
+ ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
+ }
+ } else {
+ log.Debugw("logicalDevice not in model", log.Fields{"lDeviceId": lDeviceId})
+ }
+ // announce completion of task to any number of waiting channels
+ ldMgr.logicalDevicesLoadingLock.Lock()
+ if v, ok := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; ok {
+ for _, ch := range v {
+ close(ch)
+ }
+ delete(ldMgr.logicalDeviceLoadingInProgress, lDeviceId)
+ }
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ } else {
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ }
+ } else {
+ ch := make(chan int, 1)
+ ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = append(ldMgr.logicalDeviceLoadingInProgress[lDeviceId], ch)
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ // Wait for the channel to be closed, implying the process loading this device is done.
+ <-ch
+ }
+ if _, exist := ldMgr.logicalDeviceAgents.Load(lDeviceId); exist {
+ return nil
+ }
+ return status.Errorf(codes.Aborted, "Error loading logical device %s", lDeviceId)
}
func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
@@ -416,11 +446,11 @@
log.Debugw("setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
- if parentId == "" || logDeviceId == nil || *logDeviceId == "" {
+ if parentId == "" || logDeviceId == "" {
return errors.New("device-in-invalid-state")
}
- if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
if err := agent.setupUNILogicalPorts(ctx, childDevice); err != nil {
return err
}
diff --git a/rw_core/graph/device_graph.go b/rw_core/graph/device_graph.go
index ec6f8d2..357a709 100644
--- a/rw_core/graph/device_graph.go
+++ b/rw_core/graph/device_graph.go
@@ -210,6 +210,25 @@
return nil
}
+func (dg *DeviceGraph) IsUpToDate(ld *voltha.LogicalDevice) bool {
+ if ld != nil {
+ if len(dg.boundaryPorts) != len(ld.Ports) {
+ return false
+ }
+ var portId string
+ var val uint32
+ var exist bool
+ for _, lp := range ld.Ports {
+ portId = concatDeviceIdPortId(lp.DeviceId, lp.DevicePortNo)
+ if val, exist = dg.boundaryPorts[portId]; !exist || val != lp.OfpPort.PortNo {
+ return false
+ }
+ }
+ return true
+ }
+ return len(dg.boundaryPorts) == 0
+}
+
//getDevice returns the device either from the local cache (default) or from the model.
//TODO: Set a cache timeout such that we do not use invalid data. The full device lifecycle should also
//be taken in consideration