[VOL-1890] Flow decomposition fails after a core switch over
This commit fixes the issue reported in VOL-1890 by doing
the following:
1) Update the device graph if the device graph was built
with an older logical device topology (this can happen in
the standby by core as currently the model does not trigger
a core callback after it updates its data in memory following
a watch event from the KV store - this is a TODO item).
2) Update flows in a logical device without using the data
model proxy to flows directly.
3) Reconcile the list of devices and logical devices in the
core memory after a restart upon receive a reconcile request
from the api server.
After first review:
1) Update the code as per the comments
2) Change the device loading channel to have two simultaneous
routines invoking the device loading for the same device to
return the same result (i.e. if it's a success then both
should return success and conversely in the failure case)
After second review:
Change the sync map controlling the device in loading state
to regular map.
Change-Id: I4331ac2a8f87a7de272e919a31dfe4bbaaf327a0
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index a61ca25..89b1fe9 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -95,18 +95,19 @@
log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
return status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
}
- log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
+ log.Debugw("device-loaded-from-dB", log.Fields{"deviceId": agent.deviceId})
} else {
// Add the initial device to the local model
if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceId, agent.lastData, ""); added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+ return status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceId)
}
}
agent.deviceProxy = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceId, false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
- log.Debug("device-agent-started")
+ log.Debugw("device-agent-started", log.Fields{"deviceId": agent.deviceId})
return nil
}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f417b05..f5b017c 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,19 +33,21 @@
)
type DeviceManager struct {
- deviceAgents sync.Map
- rootDevices map[string]bool
- lockRootDeviceMap sync.RWMutex
- core *Core
- adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
- logicalDeviceMgr *LogicalDeviceManager
- kafkaICProxy *kafka.InterContainerProxy
- stateTransitions *TransitionMap
- clusterDataProxy *model.Proxy
- coreInstanceId string
- exitChannel chan int
- defaultTimeout int64
+ deviceAgents sync.Map
+ rootDevices map[string]bool
+ lockRootDeviceMap sync.RWMutex
+ core *Core
+ adapterProxy *AdapterProxy
+ adapterMgr *AdapterManager
+ logicalDeviceMgr *LogicalDeviceManager
+ kafkaICProxy *kafka.InterContainerProxy
+ stateTransitions *TransitionMap
+ clusterDataProxy *model.Proxy
+ coreInstanceId string
+ exitChannel chan int
+ defaultTimeout int64
+ devicesLoadingLock sync.RWMutex
+ deviceLoadingInProgress map[string][]chan int
}
func newDeviceManager(core *Core) *DeviceManager {
@@ -60,6 +62,8 @@
deviceMgr.adapterMgr = core.adapterMgr
deviceMgr.lockRootDeviceMap = sync.RWMutex{}
deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
+ deviceMgr.devicesLoadingLock = sync.RWMutex{}
+ deviceMgr.deviceLoadingInProgress = make(map[string][]chan int)
return &deviceMgr
}
@@ -98,7 +102,7 @@
}
-func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
+func (dMgr *DeviceManager) deleteDeviceAgentFromMap(agent *DeviceAgent) {
dMgr.deviceAgents.Delete(agent.deviceId)
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
@@ -220,7 +224,7 @@
}
if agent := dMgr.getDeviceAgent(id); agent != nil {
agent.stop(nil)
- dMgr.deleteDeviceAgentToMap(agent)
+ dMgr.deleteDeviceAgentFromMap(agent)
// Abandon the device ownership
dMgr.core.deviceOwnership.AbandonDevice(id)
}
@@ -390,28 +394,52 @@
// loadDevice loads the deviceId in memory, if not present
func (dMgr *DeviceManager) loadDevice(deviceId string) (*DeviceAgent, error) {
- log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
- // Sanity check
if deviceId == "" {
return nil, status.Error(codes.InvalidArgument, "deviceId empty")
}
- if !dMgr.IsDeviceInCache(deviceId) {
- // Proceed with the loading only if the device exist in the Model (could have been deleted)
- if device, err := dMgr.getDeviceFromModel(deviceId); err == nil {
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
- agent.stop(nil)
- return nil, err
+ var err error
+ var device *voltha.Device
+ dMgr.devicesLoadingLock.Lock()
+ if _, exist := dMgr.deviceLoadingInProgress[deviceId]; !exist {
+ if !dMgr.IsDeviceInCache(deviceId) {
+ dMgr.deviceLoadingInProgress[deviceId] = []chan int{make(chan int, 1)}
+ dMgr.devicesLoadingLock.Unlock()
+ // Proceed with the loading only if the device exist in the Model (could have been deleted)
+ if device, err = dMgr.getDeviceFromModel(deviceId); err == nil {
+ log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
+ agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ if err = agent.start(nil, true); err != nil {
+ log.Warnw("Failure loading device", log.Fields{"deviceId": deviceId, "error": err})
+ agent.stop(nil)
+ } else {
+ dMgr.addDeviceAgentToMap(agent)
+ }
+ } else {
+ log.Debugw("Device not in model", log.Fields{"deviceId": deviceId})
}
- dMgr.addDeviceAgentToMap(agent)
+ // announce completion of task to any number of waiting channels
+ dMgr.devicesLoadingLock.Lock()
+ if v, ok := dMgr.deviceLoadingInProgress[deviceId]; ok {
+ for _, ch := range v {
+ close(ch)
+ }
+ delete(dMgr.deviceLoadingInProgress, deviceId)
+ }
+ dMgr.devicesLoadingLock.Unlock()
} else {
- return nil, status.Error(codes.NotFound, deviceId)
+ dMgr.devicesLoadingLock.Unlock()
}
+ } else {
+ ch := make(chan int, 1)
+ dMgr.deviceLoadingInProgress[deviceId] = append(dMgr.deviceLoadingInProgress[deviceId], ch)
+ dMgr.devicesLoadingLock.Unlock()
+ // Wait for the channel to be closed, implying the process loading this device is done.
+ <-ch
}
- if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent, nil
+ if agent, ok := dMgr.deviceAgents.Load(deviceId); ok {
+ return agent.(*DeviceAgent), nil
}
- return nil, status.Error(codes.NotFound, deviceId) // This should not happen
+ return nil, status.Errorf(codes.Aborted, "Error loading device %s", deviceId)
}
// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
@@ -431,7 +459,7 @@
if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
for _, childDeviceId := range childDeviceIds {
if _, err := dMgr.loadDevice(childDeviceId); err != nil {
- log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId})
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId, "error": err})
return err
}
}
@@ -490,35 +518,24 @@
return dMgr.listDeviceIdsFromMap(), nil
}
-//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+//ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
+//trigger loading the devices along with their children and parent in memory
func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
- log.Debug("ReconcileDevices")
+ log.Debugw("ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
var res interface{}
- if ids != nil {
+ if ids != nil && len(ids.Items) != 0 {
toReconcile := len(ids.Items)
reconciled := 0
+ var err error
for _, id := range ids.Items {
- // Act on the device only if its not present in the agent map
- if !dMgr.IsDeviceInCache(id.Id) {
- // Device Id not in memory
- log.Debugw("reconciling-device", log.Fields{"id": id.Id})
- // Proceed with the loading only if the device exist in the Model (could have been deleted)
- if device, err := dMgr.getDeviceFromModel(id.Id); err == nil {
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
- log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
- agent.stop(nil)
- } else {
- dMgr.addDeviceAgentToMap(agent)
- reconciled += 1
- }
- } else {
- reconciled += 1
- }
+ if err = dMgr.load(id.Id); err != nil {
+ log.Warnw("failure-reconciling-device", log.Fields{"deviceId": id.Id, "error": err})
+ } else {
+ reconciled += 1
}
}
if toReconcile != reconciled {
- res = status.Errorf(codes.DataLoss, "less-device-reconciled:%d/%d", reconciled, toReconcile)
+ res = status.Errorf(codes.DataLoss, "less-device-reconciled-than-requested:%d/%d", reconciled, toReconcile)
}
} else {
res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
@@ -1137,7 +1154,7 @@
allChildDeleted = false
} else {
agent.stop(nil)
- dMgr.deleteDeviceAgentToMap(agent)
+ dMgr.deleteDeviceAgentFromMap(agent)
}
}
}
@@ -1369,12 +1386,12 @@
}
}
-func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) *string {
+func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) string {
if device, _ := dMgr.GetDevice(deviceId); device != nil {
log.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
- return &device.ParentId
+ return device.ParentId
}
- return nil
+ return ""
}
func (dMgr *DeviceManager) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 49e1463..65b3b30 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -50,6 +50,7 @@
portProxies map[string]*model.Proxy
portProxiesLock sync.RWMutex
lockLogicalDevice sync.RWMutex
+ lockDeviceGraph sync.RWMutex
logicalPortsNo map[uint32]bool //value is true for NNI port
lockLogicalPortsNo sync.RWMutex
flowDecomposer *fd.FlowDecomposer
@@ -71,6 +72,7 @@
agent.portProxies = make(map[string]*model.Proxy)
agent.portProxiesLock = sync.RWMutex{}
agent.lockLogicalPortsNo = sync.RWMutex{}
+ agent.lockDeviceGraph = sync.RWMutex{}
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
return &agent
@@ -130,8 +132,6 @@
// Setup the local list of logical ports
agent.addLogicalPortsToMap(ld.Ports)
- // Setup the device graph
- agent.generateDeviceGraph()
}
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -161,6 +161,10 @@
return status.Error(codes.Internal, "logical-device-proxy-null")
}
+ // Setup the device graph - run it in its own routine
+ if loadFromdB {
+ go agent.generateDeviceGraph()
+ }
return nil
}
@@ -267,32 +271,53 @@
return nil
}
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.flowProxy.Update(updateCtx, "/", flows, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
+ ld, err := agent.getLogicalDeviceWithoutLock()
+ if err != nil {
+ return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+ }
+ log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ cloned.Flows = flows
+
+ if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Errorw("error-updating-logical-device-with-flows", log.Fields{"error": err})
+ return err
}
return nil
}
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+//updateLogicalDeviceMetersWithoutLock updates the logical device with the meters info
func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.meterProxy.Update(updateCtx, "/", meters, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device-meters:%s", agent.logicalDeviceId)
+ ld, err := agent.getLogicalDeviceWithoutLock()
+ if err != nil {
+ return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+ }
+ log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ cloned.Meters = meters
+
+ if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
+ return err
}
return nil
}
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowGroupsWithoutLock updates the logical device with the flow groups
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
+ ld, err := agent.getLogicalDeviceWithoutLock()
+ if err != nil {
+ return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+ }
+ log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ cloned.FlowGroups = flowGroups
+
+ if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Errorw("error-updating-logical-device-with-flowgroups", log.Fields{"error": err})
+ return err
}
return nil
}
@@ -507,12 +532,33 @@
return nil
}
+//generateDeviceGraphIfNeeded generates the device graph if the logical device has been updated since the last time
+//that device graph was generated.
+func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded() error {
+ if ld, err := agent.GetLogicalDevice(); err != nil {
+ log.Errorw("get-logical-device-error", log.Fields{"error": err})
+ return err
+ } else {
+ agent.lockDeviceGraph.Lock()
+ defer agent.lockDeviceGraph.Unlock()
+ if agent.deviceGraph != nil && agent.deviceGraph.IsUpToDate(ld) {
+ return nil
+ }
+ log.Debug("Generation of device graph required")
+ agent.generateDeviceGraph()
+ }
+ return nil
+}
+
//updateFlowTable updates the flow table of that logical device
func (agent *LogicalDeviceAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
log.Debug("updateFlowTable")
if flow == nil {
return nil
}
+ if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ return err
+ }
switch flow.GetCommand() {
case ofp.OfpFlowModCommand_OFPFC_ADD:
return agent.flowAdd(flow)
@@ -535,6 +581,9 @@
if groupMod == nil {
return nil
}
+ if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ return err
+ }
switch groupMod.GetCommand() {
case ofp.OfpGroupModCommand_OFPGC_ADD:
return agent.groupAdd(groupMod)
@@ -553,6 +602,9 @@
if meterMod == nil {
return nil
}
+ if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ return err
+ }
switch meterMod.GetCommand() {
case ofp.OfpMeterModCommand_OFPMC_ADD:
return agent.meterAdd(meterMod)
@@ -765,7 +817,7 @@
//flowAdd adds a flow to the flow table of that logical device
func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
- log.Debug("flowAdd")
+ log.Debugw("flowAdd", log.Fields{"flow": mod})
if mod == nil {
return nil
}
@@ -826,6 +878,8 @@
changed = true
}
}
+ log.Debugw("flowAdd-changed", log.Fields{"changed": changed})
+
if changed {
var flowMetadata voltha.FlowMetadata
if err := agent.GetMeterConfig(updatedFlows, meters, &flowMetadata); err != nil { // This should never happen,meters should be installed before flow arrives
@@ -1500,14 +1554,14 @@
//generateDeviceGraph regenerates the device graph
func (agent *LogicalDeviceAgent) generateDeviceGraph() {
- log.Debugf("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ log.Debugw("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
// Get the latest logical device
if ld, err := agent.getLogicalDeviceWithoutLock(); err != nil {
log.Errorw("logical-device-not-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
} else {
- log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "deviceGraph": agent.deviceGraph, "lPorts": len(ld.Ports)})
+ log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "lPorts": len(ld.Ports)})
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 71843ff..235aca5 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -30,15 +30,17 @@
)
type LogicalDeviceManager struct {
- logicalDeviceAgents sync.Map
- core *Core
- deviceMgr *DeviceManager
- grpcNbiHdlr *APIHandler
- adapterProxy *AdapterProxy
- kafkaICProxy *kafka.InterContainerProxy
- clusterDataProxy *model.Proxy
- exitChannel chan int
- defaultTimeout int64
+ logicalDeviceAgents sync.Map
+ core *Core
+ deviceMgr *DeviceManager
+ grpcNbiHdlr *APIHandler
+ adapterProxy *AdapterProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ defaultTimeout int64
+ logicalDevicesLoadingLock sync.RWMutex
+ logicalDeviceLoadingInProgress map[string][]chan int
}
func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
@@ -49,6 +51,8 @@
logicalDeviceMgr.kafkaICProxy = kafkaICProxy
logicalDeviceMgr.clusterDataProxy = cdProxy
logicalDeviceMgr.defaultTimeout = timeout
+ logicalDeviceMgr.logicalDevicesLoadingLock = sync.RWMutex{}
+ logicalDeviceMgr.logicalDeviceLoadingInProgress = make(map[string][]chan int)
return &logicalDeviceMgr
}
@@ -205,23 +209,49 @@
// load loads a logical device manager in memory
func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
- log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
- // To prevent a race condition, let's hold the logical device agent map lock. This will prevent a loading and
- // a create logical device callback from occurring at the same time.
- if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
- // Proceed with the loading only if the logical device exist in the Model (could have been deleted)
- if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
- // Create a temp logical device Agent and let it load from memory
- agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
- agent.stop(nil)
- return err
- }
- ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
- }
+ if lDeviceId == "" {
+ return nil
}
- // TODO: load the child device
- return nil
+ // Add a lock to prevent two concurrent calls from loading the same device twice
+ ldMgr.logicalDevicesLoadingLock.Lock()
+ if _, exist := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; !exist {
+ if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
+ ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = []chan int{make(chan int, 1)}
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
+ log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+ agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ if err := agent.start(nil, true); err != nil {
+ agent.stop(nil)
+ } else {
+ ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
+ }
+ } else {
+ log.Debugw("logicalDevice not in model", log.Fields{"lDeviceId": lDeviceId})
+ }
+ // announce completion of task to any number of waiting channels
+ ldMgr.logicalDevicesLoadingLock.Lock()
+ if v, ok := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; ok {
+ for _, ch := range v {
+ close(ch)
+ }
+ delete(ldMgr.logicalDeviceLoadingInProgress, lDeviceId)
+ }
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ } else {
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ }
+ } else {
+ ch := make(chan int, 1)
+ ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = append(ldMgr.logicalDeviceLoadingInProgress[lDeviceId], ch)
+ ldMgr.logicalDevicesLoadingLock.Unlock()
+ // Wait for the channel to be closed, implying the process loading this device is done.
+ <-ch
+ }
+ if _, exist := ldMgr.logicalDeviceAgents.Load(lDeviceId); exist {
+ return nil
+ }
+ return status.Errorf(codes.Aborted, "Error loading logical device %s", lDeviceId)
}
func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
@@ -416,11 +446,11 @@
log.Debugw("setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
- if parentId == "" || logDeviceId == nil || *logDeviceId == "" {
+ if parentId == "" || logDeviceId == "" {
return errors.New("device-in-invalid-state")
}
- if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
if err := agent.setupUNILogicalPorts(ctx, childDevice); err != nil {
return err
}
diff --git a/rw_core/graph/device_graph.go b/rw_core/graph/device_graph.go
index ec6f8d2..357a709 100644
--- a/rw_core/graph/device_graph.go
+++ b/rw_core/graph/device_graph.go
@@ -210,6 +210,25 @@
return nil
}
+func (dg *DeviceGraph) IsUpToDate(ld *voltha.LogicalDevice) bool {
+ if ld != nil {
+ if len(dg.boundaryPorts) != len(ld.Ports) {
+ return false
+ }
+ var portId string
+ var val uint32
+ var exist bool
+ for _, lp := range ld.Ports {
+ portId = concatDeviceIdPortId(lp.DeviceId, lp.DevicePortNo)
+ if val, exist = dg.boundaryPorts[portId]; !exist || val != lp.OfpPort.PortNo {
+ return false
+ }
+ }
+ return true
+ }
+ return len(dg.boundaryPorts) == 0
+}
+
//getDevice returns the device either from the local cache (default) or from the model.
//TODO: Set a cache timeout such that we do not use invalid data. The full device lifecycle should also
//be taken in consideration