[VOL-2576] Improve route calculation
This commit changes the way device routes are calculated. It
replaces the device graph method. The graph method relies on the
shortest path calculation which is quite resource intensive. For
instance, generating the routes for a PON network with 1 OLT having
8 PON ports, 64 ONUs per Port and 4 UNIs per ONUs took 96 secs to
generate the 4096 routes. The new method creates the routes from
the devices data with no middle step. Generating routes for the
above topology now takes 4ms.
Change-Id: I32bffe06d12ad0fea94002a39f217547dc55cdbf
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 7274e9f..857776f 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -21,6 +21,7 @@
"encoding/hex"
"errors"
"fmt"
+ "github.com/opencord/voltha-go/rw_core/route"
"reflect"
"sync"
"time"
@@ -28,7 +29,6 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
- "github.com/opencord/voltha-go/rw_core/graph"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -47,7 +47,7 @@
ldeviceMgr *LogicalDeviceManager
clusterDataProxy *model.Proxy
exitChannel chan int
- deviceGraph *graph.DeviceGraph
+ deviceRoutes *route.DeviceRoutes
flowProxy *model.Proxy
groupProxy *model.Proxy
meterProxy *model.Proxy
@@ -55,7 +55,7 @@
portProxies map[string]*model.Proxy
portProxiesLock sync.RWMutex
lockLogicalDevice sync.RWMutex
- lockDeviceGraph sync.RWMutex
+ lockDeviceRoutes sync.RWMutex
logicalPortsNo map[uint32]bool //value is true for NNI port
lockLogicalPortsNo sync.RWMutex
flowDecomposer *fd.FlowDecomposer
@@ -78,7 +78,7 @@
agent.portProxies = make(map[string]*model.Proxy)
agent.portProxiesLock = sync.RWMutex{}
agent.lockLogicalPortsNo = sync.RWMutex{}
- agent.lockDeviceGraph = sync.RWMutex{}
+ agent.lockDeviceRoutes = sync.RWMutex{}
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
return &agent
@@ -205,9 +205,13 @@
return status.Error(codes.Internal, "logical-device-proxy-null")
}
- // Setup the device graph - run it in its own routine
+ // Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
if loadFromdB {
- go agent.generateDeviceGraph(context.Background())
+ go func() {
+ if err := agent.buildRoutes(context.Background()); err != nil {
+ log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+ }()
}
return nil
}
@@ -344,10 +348,10 @@
}
agent.addLogicalPortToMap(port.PortNo, false)
} else {
- // Update the device graph to ensure all routes on the logical device have been calculated
- if err = agent.updateRoutes(ctx, device, port); err != nil {
- log.Errorw("failed-to-update-routes", log.Fields{"deviceId": device.Id, "port": port, "error": err})
- return err
+ // Update the device routes to ensure all routes on the logical device have been calculated
+ if err = agent.buildRoutes(ctx); err != nil {
+ // Not an error - temporary state
+ log.Warnw("failed-to-update-routes", log.Fields{"device-id": device.Id, "port": port, "error": err})
}
}
return nil
@@ -530,17 +534,21 @@
return nil
}
-//generateDeviceGraphIfNeeded generates the device graph if the logical device has been updated since the last time
+//generateDeviceRoutesIfNeeded generates the device routes if the logical device has been updated since the last time
//that device graph was generated.
-func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded(ctx context.Context) error {
+func (agent *LogicalDeviceAgent) generateDeviceRoutesIfNeeded(ctx context.Context) error {
+ agent.lockDeviceRoutes.Lock()
+ defer agent.lockDeviceRoutes.Unlock()
+
ld := agent.GetLogicalDevice()
- agent.lockDeviceGraph.Lock()
- defer agent.lockDeviceGraph.Unlock()
- if agent.deviceGraph != nil && agent.deviceGraph.IsUpToDate(ld) {
+
+ if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
return nil
}
log.Debug("Generation of device graph required")
- agent.generateDeviceGraph(ctx)
+ if err := agent.buildRoutes(ctx); err != nil {
+ return err
+ }
return nil
}
@@ -550,7 +558,7 @@
if flow == nil {
return nil
}
- if err := agent.generateDeviceGraphIfNeeded(ctx); err != nil {
+ if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
return err
}
switch flow.GetCommand() {
@@ -575,9 +583,10 @@
if groupMod == nil {
return nil
}
- if err := agent.generateDeviceGraphIfNeeded(ctx); err != nil {
+ if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
return err
}
+
switch groupMod.GetCommand() {
case ofp.OfpGroupModCommand_OFPGC_ADD:
return agent.groupAdd(ctx, groupMod)
@@ -596,7 +605,7 @@
if meterMod == nil {
return nil
}
- if err := agent.generateDeviceGraphIfNeeded(ctx); err != nil {
+ if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
return err
}
switch meterMod.GetCommand() {
@@ -857,7 +866,10 @@
log.Error("Meter-referred-in-flows-not-present")
return err
}
- deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
+ deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
+ if err != nil {
+ return err
+ }
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
@@ -973,7 +985,10 @@
log.Error("Meter-referred-in-flows-not-present")
return errors.New("Meter-referred-in-flows-not-present")
}
- deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
+ deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
+ if err != nil {
+ return err
+ }
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
@@ -1112,7 +1127,10 @@
log.Error("meter-referred-in-flows-not-present")
return err
}
- deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
+ deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
+ if err != nil {
+ return err
+ }
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
@@ -1203,7 +1221,10 @@
groupsChanged = true
}
if flowsChanged || groupsChanged {
- deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
+ deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
+ if err != nil {
+ return err
+ }
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, nil); err != nil {
@@ -1292,8 +1313,16 @@
log.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
- // Reset the logical device graph
- go agent.generateDeviceGraph(context.Background())
+
+ // Remove the logical port from cache
+ agent.deleteLogicalPortsFromMap([]uint32{lPort.DevicePortNo})
+
+ // Reset the logical device routes
+ go func() {
+ if err := agent.buildRoutes(context.Background()); err != nil {
+ log.Warn("device-routes-not-ready", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
+ }
+ }()
}
return nil
}
@@ -1304,20 +1333,31 @@
defer agent.lockLogicalDevice.Unlock()
logicalDevice := agent.getLogicalDeviceWithoutLock()
- updatedLPorts := []*voltha.LogicalPort{}
+ lPortstoKeep := []*voltha.LogicalPort{}
+ lPortsNoToDelete := []uint32{}
for _, logicalPort := range logicalDevice.Ports {
if logicalPort.DeviceId != deviceID {
- updatedLPorts = append(updatedLPorts, logicalPort)
+ lPortstoKeep = append(lPortstoKeep, logicalPort)
+ } else {
+ lPortsNoToDelete = append(lPortsNoToDelete, logicalPort.DevicePortNo)
}
}
- logicalDevice.Ports = updatedLPorts
- log.Debugw("updated-logical-ports", log.Fields{"ports": updatedLPorts})
+ logicalDevice.Ports = lPortstoKeep
+
+ log.Debugw("updated-logical-ports", log.Fields{"ports": lPortstoKeep})
if err := agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice); err != nil {
log.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
- // Reset the logical device graph
- go agent.generateDeviceGraph(context.Background())
+ // Remove the port from the cached logical ports set
+ agent.deleteLogicalPortsFromMap(lPortsNoToDelete)
+
+ // Reset the logical device routes
+ go func() {
+ if err := agent.buildRoutes(context.Background()); err != nil {
+ log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+ }()
return nil
}
@@ -1364,22 +1404,21 @@
return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
}
-func (agent *LogicalDeviceAgent) getPreCalculatedRoute(ingress, egress uint32) []graph.RouteHop {
- log.Debugw("ROUTE", log.Fields{"len": len(agent.deviceGraph.Routes)})
- for routeLink, route := range agent.deviceGraph.Routes {
+func (agent *LogicalDeviceAgent) getPreCalculatedRoute(ingress, egress uint32) ([]route.Hop, error) {
+ log.Debugw("ROUTE", log.Fields{"len": len(agent.deviceRoutes.Routes)})
+ for routeLink, route := range agent.deviceRoutes.Routes {
log.Debugw("ROUTELINKS", log.Fields{"ingress": ingress, "egress": egress, "routelink": routeLink})
if ingress == routeLink.Ingress && egress == routeLink.Egress {
- return route
+ return route, nil
}
}
- log.Warnw("no-route", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "ingress": ingress, "egress": egress})
- return nil
+ return nil, status.Errorf(codes.FailedPrecondition, "no route from:%d to:%d", ingress, egress)
}
// GetRoute returns route
-func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
+func (agent *LogicalDeviceAgent) GetRoute(ctx context.Context, ingressPortNo uint32, egressPortNo uint32) ([]route.Hop, error) {
log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
- routes := make([]graph.RouteHop, 0)
+ routes := make([]route.Hop, 0)
// Note: A port value of 0 is equivalent to a nil port
@@ -1388,30 +1427,29 @@
log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
if agent.isNNIPort(ingressPortNo) {
//This is a trap on the NNI Port
- if len(agent.deviceGraph.Routes) == 0 {
+ if len(agent.deviceRoutes.Routes) == 0 {
// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
// route with same IngressHop and EgressHop
- hop := graph.RouteHop{DeviceID: agent.rootDeviceID, Ingress: ingressPortNo, Egress: ingressPortNo}
+ hop := route.Hop{DeviceID: agent.rootDeviceID, Ingress: ingressPortNo, Egress: ingressPortNo}
routes = append(routes, hop)
routes = append(routes, hop)
- return routes
+ return routes, nil
}
//Return a 'half' route to make the flow decomposer logic happy
- for routeLink, route := range agent.deviceGraph.Routes {
+ for routeLink, path := range agent.deviceRoutes.Routes {
if agent.isNNIPort(routeLink.Egress) {
- routes = append(routes, graph.RouteHop{}) // first hop is set to empty
- routes = append(routes, route[1])
- return routes
+ routes = append(routes, route.Hop{}) // first hop is set to empty
+ routes = append(routes, path[1])
+ return routes, nil
}
}
- log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
- return nil
+ return nil, status.Errorf(codes.FailedPrecondition, "no upstream route from:%d to:%d", ingressPortNo, egressPortNo)
}
//treat it as if the output port is the first NNI of the OLT
var err error
if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
log.Warnw("no-nni-port", log.Fields{"error": err})
- return nil
+ return nil, err
}
}
//If ingress port is not specified (nil), it may be a wildcarded
@@ -1420,27 +1458,25 @@
//hop is filled, the first hop is nil
if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
// We can use the 2nd hop of any upstream route, so just find the first upstream:
- for routeLink, route := range agent.deviceGraph.Routes {
+ for routeLink, path := range agent.deviceRoutes.Routes {
if agent.isNNIPort(routeLink.Egress) {
- routes = append(routes, graph.RouteHop{}) // first hop is set to empty
- routes = append(routes, route[1])
- return routes
+ routes = append(routes, route.Hop{}) // first hop is set to empty
+ routes = append(routes, path[1])
+ return routes, nil
}
}
- log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
- return nil
+ return nil, status.Errorf(codes.FailedPrecondition, "no upstream route from:%d to:%d", ingressPortNo, egressPortNo)
}
//If egress port is not specified (nil), we can also can return a "half" route
if egressPortNo == 0 {
- for routeLink, route := range agent.deviceGraph.Routes {
+ for routeLink, path := range agent.deviceRoutes.Routes {
if routeLink.Ingress == ingressPortNo {
- routes = append(routes, route[0])
- routes = append(routes, graph.RouteHop{})
- return routes
+ routes = append(routes, path[0])
+ routes = append(routes, route.Hop{})
+ return routes, nil
}
}
- log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
- return nil
+ return nil, status.Errorf(codes.FailedPrecondition, "no downstream route from:%d to:%d", ingressPortNo, egressPortNo)
}
// Return the pre-calculated route
return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
@@ -1464,53 +1500,47 @@
return lPorts
}
-// GetDeviceGraph returns device graph
-func (agent *LogicalDeviceAgent) GetDeviceGraph() *graph.DeviceGraph {
- return agent.deviceGraph
+// GetDeviceRoutes returns device graph
+func (agent *LogicalDeviceAgent) GetDeviceRoutes() *route.DeviceRoutes {
+ return agent.deviceRoutes
}
-//updateRoutes rebuilds the device graph if not done already
-func (agent *LogicalDeviceAgent) updateRoutes(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
- log.Debugf("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "device": device.Id, "port": port})
+//rebuildRoutes rebuilds the device routes
+func (agent *LogicalDeviceAgent) buildRoutes(ctx context.Context) error {
+ log.Debugf("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- if agent.deviceGraph == nil {
- agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
+ if agent.deviceRoutes == nil {
+ agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
}
// Get all the logical ports on that logical device
lDevice := agent.getLogicalDeviceWithoutLock()
- //TODO: Find a better way to refresh only missing routes
- agent.deviceGraph.ComputeRoutes(ctx, lDevice.Ports)
- agent.deviceGraph.Print()
+ if err := agent.deviceRoutes.ComputeRoutes(ctx, lDevice.Ports); err != nil {
+ return err
+ }
+ if err := agent.deviceRoutes.Print(); err != nil {
+ return err
+ }
+
return nil
}
-//updateDeviceGraph updates the device graph if not done already and setup the default rules as well
-func (agent *LogicalDeviceAgent) updateDeviceGraph(ctx context.Context, lp *voltha.LogicalPort) {
- log.Debugf("updateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+//updateRoutes updates the device routes
+func (agent *LogicalDeviceAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
+ log.Debugw("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- if agent.deviceGraph == nil {
- agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
+ if agent.deviceRoutes == nil {
+ agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
}
- agent.deviceGraph.AddPort(ctx, lp)
- agent.deviceGraph.Print()
-}
-
-//generateDeviceGraph regenerates the device graph
-func (agent *LogicalDeviceAgent) generateDeviceGraph(ctx context.Context) {
- log.Debugw("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- // Get the latest logical device
- ld := agent.getLogicalDeviceWithoutLock()
- log.Debugw("generating-graph", log.Fields{"lDeviceId": agent.logicalDeviceID, "lPorts": len(ld.Ports)})
- if agent.deviceGraph == nil {
- agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
+ if err := agent.deviceRoutes.AddPort(ctx, lp, agent.logicalDevice.Ports); err != nil {
+ return err
}
- agent.deviceGraph.ComputeRoutes(ctx, ld.Ports)
- agent.deviceGraph.Print()
+ if err := agent.deviceRoutes.Print(); err != nil {
+ return err
+ }
+ return nil
}
// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
@@ -1649,9 +1679,13 @@
return false, err
}
- // Update the device graph with this new logical port
+ // Update the device routes with this new logical port
clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
- go agent.updateDeviceGraph(context.Background(), clonedLP)
+ go func() {
+ if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
+ log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
+ }
+ }()
return true, nil
}
@@ -1716,7 +1750,13 @@
}
// Update the device graph with this new logical port
clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
- go agent.updateDeviceGraph(context.Background(), clonedLP)
+
+ go func() {
+ if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
+ log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+ }()
+
return true, nil
}
@@ -1752,6 +1792,14 @@
}
}
+func (agent *LogicalDeviceAgent) deleteLogicalPortsFromMap(portsNo []uint32) {
+ agent.lockLogicalPortsNo.Lock()
+ defer agent.lockLogicalPortsNo.Unlock()
+ for _, pNo := range portsNo {
+ delete(agent.logicalPortsNo, pNo)
+ }
+}
+
func (agent *LogicalDeviceAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
agent.lockLogicalPortsNo.Lock()
defer agent.lockLogicalPortsNo.Unlock()