[VOL-1890] Flow decomposition fails after a core switch over
Cherry Pick into the master branch from voltha-2.1
Change-Id: I84c3a83b5b9115d6ec334af29634486ba7148634
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 49e1463..65b3b30 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -50,6 +50,7 @@
portProxies map[string]*model.Proxy
portProxiesLock sync.RWMutex
lockLogicalDevice sync.RWMutex
+ lockDeviceGraph sync.RWMutex
logicalPortsNo map[uint32]bool //value is true for NNI port
lockLogicalPortsNo sync.RWMutex
flowDecomposer *fd.FlowDecomposer
@@ -71,6 +72,7 @@
agent.portProxies = make(map[string]*model.Proxy)
agent.portProxiesLock = sync.RWMutex{}
agent.lockLogicalPortsNo = sync.RWMutex{}
+ agent.lockDeviceGraph = sync.RWMutex{}
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
return &agent
@@ -130,8 +132,6 @@
// Setup the local list of logical ports
agent.addLogicalPortsToMap(ld.Ports)
- // Setup the device graph
- agent.generateDeviceGraph()
}
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -161,6 +161,10 @@
return status.Error(codes.Internal, "logical-device-proxy-null")
}
+ // Setup the device graph - run it in its own routine
+ if loadFromdB {
+ go agent.generateDeviceGraph()
+ }
return nil
}
@@ -267,32 +271,53 @@
return nil
}
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.flowProxy.Update(updateCtx, "/", flows, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
+ ld, err := agent.getLogicalDeviceWithoutLock()
+ if err != nil {
+ return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+ }
+ log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ cloned.Flows = flows
+
+ if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Errorw("error-updating-logical-device-with-flows", log.Fields{"error": err})
+ return err
}
return nil
}
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+//updateLogicalDeviceMetersWithoutLock updates the logical device with the meters info
func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.meterProxy.Update(updateCtx, "/", meters, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device-meters:%s", agent.logicalDeviceId)
+ ld, err := agent.getLogicalDeviceWithoutLock()
+ if err != nil {
+ return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+ }
+ log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ cloned.Meters = meters
+
+ if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
+ return err
}
return nil
}
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+//updateLogicalDeviceFlowGroupsWithoutLock updates the logical device with the flow groups
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
+ ld, err := agent.getLogicalDeviceWithoutLock()
+ if err != nil {
+ return status.Errorf(codes.Internal, "logical-device-absent:%s", agent.logicalDeviceId)
+ }
+ log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ cloned.FlowGroups = flowGroups
+
+ if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Errorw("error-updating-logical-device-with-flowgroups", log.Fields{"error": err})
+ return err
}
return nil
}
@@ -507,12 +532,33 @@
return nil
}
+//generateDeviceGraphIfNeeded generates the device graph if the logical device has been updated since the last time
+//that device graph was generated.
+func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded() error {
+ if ld, err := agent.GetLogicalDevice(); err != nil {
+ log.Errorw("get-logical-device-error", log.Fields{"error": err})
+ return err
+ } else {
+ agent.lockDeviceGraph.Lock()
+ defer agent.lockDeviceGraph.Unlock()
+ if agent.deviceGraph != nil && agent.deviceGraph.IsUpToDate(ld) {
+ return nil
+ }
+ log.Debug("Generation of device graph required")
+ agent.generateDeviceGraph()
+ }
+ return nil
+}
+
//updateFlowTable updates the flow table of that logical device
func (agent *LogicalDeviceAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
log.Debug("updateFlowTable")
if flow == nil {
return nil
}
+ if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ return err
+ }
switch flow.GetCommand() {
case ofp.OfpFlowModCommand_OFPFC_ADD:
return agent.flowAdd(flow)
@@ -535,6 +581,9 @@
if groupMod == nil {
return nil
}
+ if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ return err
+ }
switch groupMod.GetCommand() {
case ofp.OfpGroupModCommand_OFPGC_ADD:
return agent.groupAdd(groupMod)
@@ -553,6 +602,9 @@
if meterMod == nil {
return nil
}
+ if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+ return err
+ }
switch meterMod.GetCommand() {
case ofp.OfpMeterModCommand_OFPMC_ADD:
return agent.meterAdd(meterMod)
@@ -765,7 +817,7 @@
//flowAdd adds a flow to the flow table of that logical device
func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
- log.Debug("flowAdd")
+ log.Debugw("flowAdd", log.Fields{"flow": mod})
if mod == nil {
return nil
}
@@ -826,6 +878,8 @@
changed = true
}
}
+ log.Debugw("flowAdd-changed", log.Fields{"changed": changed})
+
if changed {
var flowMetadata voltha.FlowMetadata
if err := agent.GetMeterConfig(updatedFlows, meters, &flowMetadata); err != nil { // This should never happen,meters should be installed before flow arrives
@@ -1500,14 +1554,14 @@
//generateDeviceGraph regenerates the device graph
func (agent *LogicalDeviceAgent) generateDeviceGraph() {
- log.Debugf("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ log.Debugw("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
// Get the latest logical device
if ld, err := agent.getLogicalDeviceWithoutLock(); err != nil {
log.Errorw("logical-device-not-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
} else {
- log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "deviceGraph": agent.deviceGraph, "lPorts": len(ld.Ports)})
+ log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceId, "lPorts": len(ld.Ports)})
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
}