[VOL-1588] Improve Flow Add performance
This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.
The flow update/deletion performance will be addressed in a separate
commit.
Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 9496aa1..a2b4494 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -37,25 +37,29 @@
type LogicalDeviceAgent struct {
logicalDeviceId string
//lastData *voltha.LogicalDevice
- rootDeviceId string
- deviceMgr *DeviceManager
- ldeviceMgr *LogicalDeviceManager
- clusterDataProxy *model.Proxy
- exitChannel chan int
- deviceGraph *graph.DeviceGraph
- DefaultFlowRules *fu.DeviceRules
- flowProxy *model.Proxy
- groupProxy *model.Proxy
- ldProxy *model.Proxy
- portProxies map[string]*model.Proxy
- portProxiesLock sync.RWMutex
- lockLogicalDevice sync.RWMutex
- flowDecomposer *fd.FlowDecomposer
+ rootDeviceId string
+ deviceMgr *DeviceManager
+ ldeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ deviceGraph *graph.DeviceGraph
+ DefaultFlowRules *fu.DeviceRules
+ flowProxy *model.Proxy
+ groupProxy *model.Proxy
+ ldProxy *model.Proxy
+ portProxies map[string]*model.Proxy
+ portProxiesLock sync.RWMutex
+ lockLogicalDevice sync.RWMutex
+ logicalPortsNo map[uint32]bool //value is true for NNI port
+ lockLogicalPortsNo sync.RWMutex
+ flowDecomposer *fd.FlowDecomposer
+ includeDefaultFlows bool
+ defaultTimeout int64
}
func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager,
deviceMgr *DeviceManager,
- cdProxy *model.Proxy) *LogicalDeviceAgent {
+ cdProxy *model.Proxy, timeout int64) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
agent.exitChannel = make(chan int, 1)
agent.logicalDeviceId = id
@@ -67,6 +71,10 @@
agent.lockLogicalDevice = sync.RWMutex{}
agent.portProxies = make(map[string]*model.Proxy)
agent.portProxiesLock = sync.RWMutex{}
+ agent.lockLogicalPortsNo = sync.RWMutex{}
+ agent.logicalPortsNo = make(map[uint32]bool)
+ agent.includeDefaultFlows = true
+ agent.defaultTimeout = timeout
return &agent
}
@@ -131,12 +139,11 @@
fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceId),
false)
- agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
- agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-
// TODO: Use a port proxy once the POST_ADD is fixed
agent.ldProxy.RegisterCallback(model.POST_UPDATE, agent.portUpdated)
+ agent.includeDefaultFlows = true
+
agent.lockLogicalDevice.Unlock()
return nil
@@ -148,13 +155,6 @@
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- // Unregister to teh callbacks
- if agent.flowProxy != nil {
- agent.flowProxy.UnregisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
- }
- if agent.groupProxy != nil {
- agent.groupProxy.UnregisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
- }
//Remove the logical device from the model
if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -246,6 +246,29 @@
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
+func (agent *LogicalDeviceAgent) updateLogicalPort(device *voltha.Device, port *voltha.Port) error {
+ log.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
+ var err error
+ if port.Type == voltha.Port_ETHERNET_NNI {
+ if _, err = agent.addNNILogicalPort(device, port); err != nil {
+ return err
+ }
+ agent.addLogicalPortToMap(port.PortNo, true)
+ } else if port.Type == voltha.Port_ETHERNET_UNI {
+ if _, err = agent.addUNILogicalPort(device, port); err != nil {
+ return err
+ }
+ agent.addLogicalPortToMap(port.PortNo, false)
+ } else {
+ // Update the device graph to ensure all routes on the logical device have been calculated
+ if err = agent.updateRoutes(device, port); err != nil {
+ log.Errorw("failed-to-update-routes", log.Fields{"deviceId": device.Id, "port": port, "error": err})
+ return err
+ }
+ }
+ return nil
+}
+
func (agent *LogicalDeviceAgent) addLogicalPort(device *voltha.Device, port *voltha.Port) error {
log.Debugw("addLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
var err error
@@ -253,10 +276,12 @@
if _, err = agent.addNNILogicalPort(device, port); err != nil {
return err
}
+ agent.addLogicalPortToMap(port.PortNo, true)
} else if port.Type == voltha.Port_ETHERNET_UNI {
if _, err = agent.addUNILogicalPort(device, port); err != nil {
return err
}
+ agent.addLogicalPortToMap(port.PortNo, false)
} else {
log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
return nil
@@ -266,15 +291,13 @@
// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
- //now := time.Now()
- //defer fmt.Println("setupNNILogicalPorts:", deviceId, time.Since(now))
log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var err error
var device *voltha.Device
if device, err = agent.deviceMgr.GetDevice(deviceId); err != nil {
- log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": device.Id})
+ log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceId})
return err
}
@@ -284,6 +307,7 @@
if _, err = agent.addNNILogicalPort(device, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
+ agent.addLogicalPortToMap(port.PortNo, true)
}
}
return err
@@ -291,8 +315,6 @@
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
- //now := time.Now()
- //defer fmt.Println("setupUNILogicalPorts:", childDevice.Id, time.Since(now))
log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var err error
@@ -303,6 +325,7 @@
if _, err = agent.addUNILogicalPort(childDevice, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
+ agent.addLogicalPortToMap(port.PortNo, false)
}
}
return err
@@ -357,17 +380,6 @@
"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
}
-//updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device. This function
-//must only be called by a function that is holding the lock on the logical device
-func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
- groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
- copy(groupsCloned, groups)
- if afterUpdate := agent.groupProxy.Update("/", groupsCloned, true, ""); afterUpdate == nil {
- return errors.New(fmt.Sprintf("update-flow-group-failed:%s", agent.logicalDeviceId))
- }
- return nil
-}
-
//flowAdd adds a flow to the flow table of that logical device
func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
log.Debug("flowAdd")
@@ -389,6 +401,7 @@
flows = lDevice.Flows.Items
}
+ updatedFlows := make([]*ofp.OfpFlowStats, 0)
//oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
changed := false
checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
@@ -400,6 +413,7 @@
// Add flow
flow := fd.FlowStatsEntryFromFlowModMessage(mod)
flows = append(flows, flow)
+ updatedFlows = append(updatedFlows, flow)
changed = true
}
} else {
@@ -411,26 +425,65 @@
flow.ByteCount = oldFlow.ByteCount
flow.PacketCount = oldFlow.PacketCount
}
- flows[idx] = flow
+ if !reflect.DeepEqual(oldFlow, flow) {
+ flows[idx] = flow
+ updatedFlows = append(updatedFlows, flow)
+ changed = true
+ }
} else {
flows = append(flows, flow)
+ updatedFlows = append(updatedFlows, flow)
+ changed = true
}
- changed = true
}
if changed {
+ // Launch a routine to decompose the flows
+ if err := agent.decomposeAndSendFlows(&ofp.Flows{Items: updatedFlows}, lDevice.FlowGroups, agent.includeDefaultFlows); err != nil {
+ log.Errorf("decomposing-and-sending-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return err
+ }
+
+ // We no longer need to sent the default flows, unless there is a change in device topology
+ agent.includeDefaultFlows = false
+
// Update model
flowsToUpdate := &ofp.Flows{}
if lDevice.Flows != nil {
flowsToUpdate = &ofp.Flows{Items: flows}
}
if err := agent.updateLogicalDeviceFlowsWithoutLock(flowsToUpdate); err != nil {
- log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return err
}
}
return nil
}
+func (agent *LogicalDeviceAgent) decomposeAndSendFlows(flows *ofp.Flows, groups *ofp.FlowGroups, includeDefaultFlows bool) error {
+ log.Debugw("decomposeAndSendFlows", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *groups, includeDefaultFlows)
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ chnlsList := make([]chan interface{}, 0)
+ for deviceId, value := range deviceRules.GetRules() {
+ ch := make(chan interface{})
+ chnlsList = append(chnlsList, ch)
+ go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+ if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
+ log.Error("flow-update-failed", log.Fields{"deviceID": deviceId})
+ ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+ }
+ ch <- nil
+ }(deviceId, value.ListFlows(), value.ListGroups())
+ }
+ // Wait for completion
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
+ }
+ return nil
+}
+
//flowDelete deletes a flow from the flow table of that logical device
func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
log.Debug("flowDelete")
@@ -740,15 +793,6 @@
return nil
}
-func isNNIPort(portNo uint32, nniPortsNo []uint32) bool {
- for _, pNo := range nniPortsNo {
- if pNo == portNo {
- return true
- }
- }
- return false
-}
-
func (agent *LogicalDeviceAgent) getPreCalculatedRoute(ingress, egress uint32) []graph.RouteHop {
log.Debugw("ROUTE", log.Fields{"len": len(agent.deviceGraph.Routes)})
for routeLink, route := range agent.deviceGraph.Routes {
@@ -763,67 +807,56 @@
func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
- // Get the updated logical device
- var ld *ic.LogicalDevice
routes := make([]graph.RouteHop, 0)
- var err error
- if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
- return nil
- }
- nniLogicalPortsNo := make([]uint32, 0)
- for _, logicalPort := range ld.Ports {
- if logicalPort.RootPort {
- nniLogicalPortsNo = append(nniLogicalPortsNo, logicalPort.OfpPort.PortNo)
- }
- }
- if len(nniLogicalPortsNo) == 0 {
- log.Errorw("no-nni-ports", log.Fields{"LogicalDeviceId": ld.Id})
- return nil
- }
+
// Note: A port value of 0 is equivalent to a nil port
// Consider different possibilities
if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
- log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
- if isNNIPort(ingressPortNo, nniLogicalPortsNo) {
+ log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
+ if agent.isNNIPort(ingressPortNo) {
log.Debug("returning-half-route")
//This is a trap on the NNI Port
if len(agent.deviceGraph.Routes) == 0 {
// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
// internal route
- hop := graph.RouteHop{DeviceID: ld.RootDeviceId, Ingress: ingressPortNo, Egress: egressPortNo}
+ hop := graph.RouteHop{DeviceID: agent.rootDeviceId, Ingress: ingressPortNo, Egress: egressPortNo}
routes = append(routes, hop)
routes = append(routes, hop)
return routes
}
//Return a 'half' route to make the flow decomposer logic happy
for routeLink, route := range agent.deviceGraph.Routes {
- if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+ if agent.isNNIPort(routeLink.Egress) {
routes = append(routes, graph.RouteHop{}) // first hop is set to empty
routes = append(routes, route[1])
return routes
}
}
- log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
return nil
}
//treat it as if the output port is the first NNI of the OLT
- egressPortNo = nniLogicalPortsNo[0]
+ var err error
+ if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
+ log.Warnw("no-nni-port", log.Fields{"error": err})
+ return nil
+ }
}
//If ingress port is not specified (nil), it may be a wildcarded
//route if egress port is OFPP_CONTROLLER or a nni logical port,
//in which case we need to create a half-route where only the egress
//hop is filled, the first hop is nil
- if ingressPortNo == 0 && isNNIPort(egressPortNo, nniLogicalPortsNo) {
+ if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
// We can use the 2nd hop of any upstream route, so just find the first upstream:
for routeLink, route := range agent.deviceGraph.Routes {
- if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+ if agent.isNNIPort(routeLink.Egress) {
routes = append(routes, graph.RouteHop{}) // first hop is set to empty
routes = append(routes, route[1])
return routes
}
}
- log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
return nil
}
//If egress port is not specified (nil), we can also can return a "half" route
@@ -835,10 +868,9 @@
return routes
}
}
- log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
return nil
}
-
// Return the pre-calculated route
return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
}
@@ -935,16 +967,6 @@
}
func (agent *LogicalDeviceAgent) GetAllDefaultRules() *fu.DeviceRules {
- // Get latest
- var err error
- if _, err = agent.GetLogicalDevice(); err != nil {
- return fu.NewDeviceRules()
- }
- if agent.DefaultFlowRules == nil { // Nothing setup yet
- // Setup device graph if needed
- agent.setupDeviceGraph()
- agent.DefaultFlowRules = agent.generateDefaultRules()
- }
return agent.DefaultFlowRules
}
@@ -968,101 +990,64 @@
return agent.deviceGraph
}
-//setupDeviceGraph creates the device graph if not done already
-func (agent *LogicalDeviceAgent) setupDeviceGraph() {
+//updateRoutes redo the device graph if not done already and setup the default rules as well
+func (agent *LogicalDeviceAgent) updateRoutes(device *voltha.Device, port *voltha.Port) error {
+ log.Debugf("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "device": device.Id, "port": port})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
+ rules := fu.NewDeviceRules()
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
- if ld, err := agent.getLogicalDeviceWithoutLock(); err == nil {
- agent.deviceGraph.ComputeRoutes(ld.Ports)
+ }
+ // Get all the logical ports on that logical device
+ if lDevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
+ log.Errorf("unknown-logical-device", log.Fields{"error": err, "logicalDeviceId": agent.logicalDeviceId})
+ return err
+ } else {
+ //TODO: Find a better way to refresh only missing routes
+ agent.deviceGraph.ComputeRoutes(lDevice.Ports)
+ }
+ deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+ for deviceId := range deviceNodeIds {
+ if deviceId == agent.rootDeviceId {
+ rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+ } else {
+ rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
}
}
+ agent.DefaultFlowRules = rules
+
+ // Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
+ // one when a flow request is received.
+ agent.includeDefaultFlows = true
+ agent.deviceGraph.Print()
+ return nil
}
-//updateDeviceGraph updates the device graph if not done already
+//updateDeviceGraph updates the device graph if not done already and setup the default rules as well
func (agent *LogicalDeviceAgent) updateDeviceGraph(lp *voltha.LogicalPort) {
+ log.Debugf("updateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ rules := fu.NewDeviceRules()
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
}
agent.deviceGraph.AddPort(lp)
-}
-
-func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
- log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
- var previousData *ofp.Flows
- var latestData *ofp.Flows
-
- var ok bool
- if previousData, ok = args[0].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- }
- if latestData, ok = args[1].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- }
-
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debug("flow-update-not-required")
- return nil
- }
-
- var groups *ofp.FlowGroups
- lDevice, _ := agent.getLogicalDeviceWithoutLock()
- groups = lDevice.FlowGroups
- log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
- log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
- var err error
- for deviceId, value := range deviceRules.GetRules() {
- if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
- log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
- }
- if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
- log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
+ deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+ for deviceId := range deviceNodeIds {
+ if deviceId == agent.rootDeviceId {
+ rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+ } else {
+ rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
}
}
+ agent.DefaultFlowRules = rules
- return nil
-}
-
-func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
- log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
- var previousData *ofp.FlowGroups
- var latestData *ofp.FlowGroups
-
- var ok bool
- if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- }
- if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- }
-
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debug("flow-update-not-required")
- return nil
- }
-
- var flows *ofp.Flows
- lDevice, _ := agent.getLogicalDeviceWithoutLock()
- flows = lDevice.Flows
- log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
- log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- var err error
- for deviceId, value := range deviceRules.GetRules() {
- if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
- log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
- }
- if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
- log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
- }
-
- }
- return nil
+ // Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
+ // one when a flow request is received.
+ agent.includeDefaultFlows = true
+ agent.deviceGraph.Print()
}
// portAdded is a callback invoked when a port is added to the logical device.
@@ -1192,9 +1177,9 @@
newPorts, changedPorts, deletedPorts := diff(oldLD.Ports, newlD.Ports)
// Send the port change events to the OF controller
- for _, new := range newPorts {
+ for _, newP := range newPorts {
go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: new.OfpPort})
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
}
for _, change := range changedPorts {
go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
@@ -1213,8 +1198,6 @@
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
func (agent *LogicalDeviceAgent) addNNILogicalPort(device *voltha.Device, port *voltha.Port) (bool, error) {
- //now := time.Now()
- //defer fmt.Println("setupNNILogicalPorts:", device.Id, time.Since(now))
log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
@@ -1291,8 +1274,6 @@
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
func (agent *LogicalDeviceAgent) addUNILogicalPort(childDevice *voltha.Device, port *voltha.Port) (bool, error) {
- //now := time.Now()
- //defer fmt.Println("addUNILogicalPort:", childDevice.Id, time.Since(now))
log.Debugw("addUNILogicalPort", log.Fields{"port": port})
if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
@@ -1337,7 +1318,6 @@
if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
return false, err
}
-
// Update the device graph with this new logical port
clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
go agent.updateDeviceGraph(clonedLP)
@@ -1361,3 +1341,39 @@
agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, transactionId, packetIn)
log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
}
+
+func (agent *LogicalDeviceAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
+ agent.lockLogicalPortsNo.Lock()
+ defer agent.lockLogicalPortsNo.Unlock()
+ if exist := agent.logicalPortsNo[portNo]; !exist {
+ agent.logicalPortsNo[portNo] = nniPort
+ }
+}
+
+func (agent *LogicalDeviceAgent) deleteLogicalPortFromMap(portNo uint32) {
+ agent.lockLogicalPortsNo.Lock()
+ defer agent.lockLogicalPortsNo.Unlock()
+ if exist := agent.logicalPortsNo[portNo]; exist {
+ delete(agent.logicalPortsNo, portNo)
+ }
+}
+
+func (agent *LogicalDeviceAgent) isNNIPort(portNo uint32) bool {
+ agent.lockLogicalPortsNo.RLock()
+ defer agent.lockLogicalPortsNo.RUnlock()
+ if exist := agent.logicalPortsNo[portNo]; exist {
+ return agent.logicalPortsNo[portNo]
+ }
+ return false
+}
+
+func (agent *LogicalDeviceAgent) getFirstNNIPort() (uint32, error) {
+ agent.lockLogicalPortsNo.RLock()
+ defer agent.lockLogicalPortsNo.RUnlock()
+ for portNo, nni := range agent.logicalPortsNo {
+ if nni {
+ return portNo, nil
+ }
+ }
+ return 0, status.Error(codes.NotFound, "No NNI port found")
+}