[VOL-1553] Improve device graph performance
This update focussed on generating routes using the device graph.
It improves the performance by at least 5 times.
Change-Id: I79bdbca8ea3d134e87848e45140d07ee3831f12c
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 732e9cf..e123be7 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -35,7 +35,7 @@
)
type LogicalDeviceAgent struct {
- logicalDeviceId string
+ logicalDeviceId string
//lastData *voltha.LogicalDevice
rootDeviceId string
deviceMgr *DeviceManager
@@ -46,9 +46,9 @@
DefaultFlowRules *fu.DeviceRules
flowProxy *model.Proxy
groupProxy *model.Proxy
- ldProxy *model.Proxy
- portProxies map[string]*model.Proxy
- portProxiesLock sync.RWMutex
+ ldProxy *model.Proxy
+ portProxies map[string]*model.Proxy
+ portProxiesLock sync.RWMutex
lockLogicalDevice sync.RWMutex
flowDecomposer *fd.FlowDecomposer
}
@@ -246,26 +246,21 @@
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
-
-func (agent *LogicalDeviceAgent) addLogicalPort (device *voltha.Device, port *voltha.Port) error {
+func (agent *LogicalDeviceAgent) addLogicalPort(device *voltha.Device, port *voltha.Port) error {
log.Debugw("addLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
- var changed bool
var err error
if port.Type == voltha.Port_ETHERNET_NNI {
- if changed, err = agent.addNNILogicalPort(device, port); err != nil {
+ if _, err = agent.addNNILogicalPort(device, port); err != nil {
return err
}
} else if port.Type == voltha.Port_ETHERNET_UNI {
- if changed, err = agent.addUNILogicalPort(device, port); err != nil {
+ if _, err = agent.addUNILogicalPort(device, port); err != nil {
return err
}
} else {
log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
return nil
}
- if changed {
- go agent.setupDeviceGraph()
- }
return nil
}
@@ -284,24 +279,16 @@
}
//Get UNI port number
- changesMade := false
for _, port := range device.Ports {
- changed := false
if port.Type == voltha.Port_ETHERNET_NNI {
- if changed, err = agent.addNNILogicalPort(device, port); err != nil {
+ if _, err = agent.addNNILogicalPort(device, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
- } else {
- changesMade = changed || changesMade
}
}
}
- if changesMade {
- go agent.setupDeviceGraph()
- }
return err
}
-
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
//now := time.Now()
@@ -311,20 +298,13 @@
var err error
//Get UNI port number
- changesMade := false
for _, port := range childDevice.Ports {
- changed := false
if port.Type == voltha.Port_ETHERNET_UNI {
- if changed, err = agent.addUNILogicalPort(childDevice, port); err != nil {
+ if _, err = agent.addUNILogicalPort(childDevice, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
- } else {
- changesMade = changed || changesMade
}
}
}
- if changesMade {
- go agent.setupDeviceGraph()
- }
return err
}
@@ -811,7 +791,7 @@
if len(agent.deviceGraph.Routes) == 0 {
// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
// internal route
- hop := graph.RouteHop{DeviceID:ld.RootDeviceId, Ingress:ingressPortNo, Egress:egressPortNo}
+ hop := graph.RouteHop{DeviceID: ld.RootDeviceId, Ingress: ingressPortNo, Egress: egressPortNo}
routes = append(routes, hop)
routes = append(routes, hop)
return routes
@@ -863,15 +843,6 @@
return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
}
-// updateRoutes updates the device routes whenever there is a device or port changes relevant to this
-// logical device. TODO: Add more heuristics to this process to update the routes where a change has occurred
-// instead of rebuilding the entire set of routes
-func (agent *LogicalDeviceAgent) updateRoutes() {
- if ld, err := agent.GetLogicalDevice(); err == nil {
- agent.deviceGraph.ComputeRoutes(ld.Ports)
- }
-}
-
func (agent *LogicalDeviceAgent) rootDeviceDefaultRules() *fu.FlowsAndGroups {
return fu.NewFlowsAndGroups()
}
@@ -895,7 +866,7 @@
}
//it is possible that the downstream ports are not created, but the flow_decomposition has already
//kicked in. In such scenarios, cut short the processing and return.
- if len(downstreamPorts) == 0 || len(upstreamPorts) == 0{
+ if len(downstreamPorts) == 0 || len(upstreamPorts) == 0 {
return fg
}
// set up the default flows
@@ -965,14 +936,13 @@
func (agent *LogicalDeviceAgent) GetAllDefaultRules() *fu.DeviceRules {
// Get latest
- var lDevice *voltha.LogicalDevice
var err error
- if lDevice, err = agent.GetLogicalDevice(); err != nil {
+ if _, err = agent.GetLogicalDevice(); err != nil {
return fu.NewDeviceRules()
}
if agent.DefaultFlowRules == nil { // Nothing setup yet
- agent.deviceGraph = graph.NewDeviceGraph(agent.deviceMgr.GetDevice)
- agent.deviceGraph.ComputeRoutes(lDevice.Ports)
+ // Setup device graph if needed
+ agent.setupDeviceGraph()
agent.DefaultFlowRules = agent.generateDefaultRules()
}
return agent.DefaultFlowRules
@@ -1000,12 +970,24 @@
//setupDeviceGraph creates the device graph if not done already
func (agent *LogicalDeviceAgent) setupDeviceGraph() {
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
if agent.deviceGraph == nil {
- agent.deviceGraph = graph.NewDeviceGraph(agent.deviceMgr.GetDevice)
- agent.updateRoutes()
+ agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
+ if ld, err := agent.getLogicalDeviceWithoutLock(); err == nil {
+ agent.deviceGraph.ComputeRoutes(ld.Ports)
+ }
}
}
+//updateDeviceGraph updates the device graph if not done already
+func (agent *LogicalDeviceAgent) updateDeviceGraph(lp *voltha.LogicalPort) {
+ if agent.deviceGraph == nil {
+ agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
+ }
+ agent.deviceGraph.AddPort(lp)
+}
+
func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
@@ -1035,10 +1017,10 @@
var err error
for deviceId, value := range deviceRules.GetRules() {
if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
- log.Error("update-flows-failed", log.Fields{"deviceID":deviceId})
+ log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
}
if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
- log.Error("update-groups-failed", log.Fields{"deviceID":deviceId})
+ log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
}
}
@@ -1073,10 +1055,10 @@
var err error
for deviceId, value := range deviceRules.GetRules() {
if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
- log.Error("update-flows-failed", log.Fields{"deviceID":deviceId})
+ log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
}
if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
- log.Error("update-groups-failed", log.Fields{"deviceID":deviceId})
+ log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
}
}
@@ -1110,7 +1092,7 @@
// Send the port change event to the OF controller
agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
- &ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_ADD, Desc:port.OfpPort})
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: port.OfpPort})
return nil
}
@@ -1140,13 +1122,13 @@
// Send the port change event to the OF controller
agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
- &ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_DELETE, Desc:port.OfpPort})
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: port.OfpPort})
return nil
}
// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
-func diff(oldList , newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts []*voltha.LogicalPort) {
+func diff(oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts []*voltha.LogicalPort) {
newPorts = make([]*voltha.LogicalPort, 0)
changedPorts = make([]*voltha.LogicalPort, 0)
deletedPorts = make([]*voltha.LogicalPort, 0)
@@ -1212,26 +1194,25 @@
// Send the port change events to the OF controller
for _, new := range newPorts {
go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
- &ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_ADD, Desc:new.OfpPort})
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: new.OfpPort})
}
for _, change := range changedPorts {
go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
- &ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_MODIFY, Desc:change.OfpPort})
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
}
for _, del := range deletedPorts {
go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
- &ofp.OfpPortStatus{Reason:ofp.OfpPortReason_OFPPR_DELETE, Desc:del.OfpPort})
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
}
return nil
}
-
// addNNILogicalPort adds an NNI port to the logical device. It returns a bool representing whether a port has been
// added and an eror in case a valid error is encountered. If the port was successfully added it will return
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port) (bool, error) {
+func (agent *LogicalDeviceAgent) addNNILogicalPort(device *voltha.Device, port *voltha.Port) (bool, error) {
//now := time.Now()
//defer fmt.Println("setupNNILogicalPorts:", device.Id, time.Since(now))
log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
@@ -1286,10 +1267,15 @@
log.Errorw("error-updating-logical-device", log.Fields{"error": err})
return false, err
}
+
+ // Update the device graph with this new logical port
+ clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
+ go agent.updateDeviceGraph(clonedLP)
+
return true, nil
}
-func (agent *LogicalDeviceAgent) portExist (device *voltha.Device, port *voltha.Port) bool {
+func (agent *LogicalDeviceAgent) portExist(device *voltha.Device, port *voltha.Port) bool {
if ldevice, _ := agent.getLogicalDeviceWithoutLock(); ldevice != nil {
for _, lPort := range ldevice.Ports {
if lPort.DeviceId == device.Id && lPort.DevicePortNo == port.PortNo && lPort.Id == port.Label {
@@ -1300,12 +1286,11 @@
return false
}
-
// addUNILogicalPort adds an UNI port to the logical device. It returns a bool representing whether a port has been
// added and an eror in case a valid error is encountered. If the port was successfully added it will return
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port) (bool, error) {
+func (agent *LogicalDeviceAgent) addUNILogicalPort(childDevice *voltha.Device, port *voltha.Port) (bool, error) {
//now := time.Now()
//defer fmt.Println("addUNILogicalPort:", childDevice.Id, time.Since(now))
log.Debugw("addUNILogicalPort", log.Fields{"port": port})
@@ -1350,7 +1335,14 @@
cloned.Ports = make([]*voltha.LogicalPort, 0)
}
cloned.Ports = append(cloned.Ports, portCap.Port)
- return true, agent.updateLogicalDeviceWithoutLock(cloned)
+ if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ return false, err
+ }
+
+ // Update the device graph with this new logical port
+ clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+ go agent.updateDeviceGraph(clonedLP)
+ return true, nil
}
}
@@ -1360,7 +1352,7 @@
//frame := packet.GetData()
//TODO: Use a channel between the logical agent and the device agent
if err := agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet); err != nil {
- log.Error("packetout-failed", log.Fields{"logicalDeviceID":agent.rootDeviceId})
+ log.Error("packetout-failed", log.Fields{"logicalDeviceID": agent.rootDeviceId})
}
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 2fc0f1e..2417b60 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -336,7 +336,7 @@
return err
}
// Update the device routes - let it run in its own go routine as it can take time
- go agent.updateRoutes()
+ //go agent.updateRoutes()
}
return nil
}