[VOL-2995] Improve Core performance
This commit consists of the following changes with the aim to
improve the Core performance:
1) Use a hybrid approach of pre-route calculation and route
calculation on-demand. For example, attempts to pre-calculate
routes will be done whenever a nni/uni port is discovered. The
attempt may fail if there are not enough ports to generate a
route. When a flow is received and the route is not available
then only the route relevant to that flow will be created on
demand.
2) Changes some of the route calculation flow such that the
process does not need to go and grab the latest version of the
device which could lead to higher latency, expecially if that
device is busy with other processing.
3) Change the logic when reporting added ports to ONOS such that
routes are calculated (at least an attempt made) before sending
a port create notification to ONOS.
4) Move peer port creation into its own go routine thereby
removing the lock on a child device much earlier.
5) Wait until a request for port capability is received before
removing the lock on a device. A better approach is required
where the adapter will need to report the port capability along
with the port creation event. However, this require another
Jira as changes will be required in the API.
6) Remove some unnecessary proto.clones. Those are the obvious
ones. Removal of other proto.clones will be done in a separate
commit.
7) Fix a core panic when concurrent requests are made to the
route map
Change-Id: I2bafc99dbf10d7026572a44af0b88a31b5eb1887
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index fa1f657..0c373ea 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -598,6 +598,11 @@
}
func (nb *NBTest) deleteAllDevices(t *testing.T, nbi *NBIHandler) {
+ devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
+ if len(devices.Items) == 0 {
+ // Nothing to do
+ return
+ }
//Get an OLT device
oltDevice, err := nb.getADevice(true, nbi)
assert.Nil(t, err)
@@ -1080,11 +1085,26 @@
},
}
flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
- _, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP)
+ maxTries := 3
+ var err error
+ for {
+ if _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP); err == nil {
+ if maxTries < 3 {
+ t.Log("Re-sending EAPOL flow succeeded for port:", port)
+ }
+ break
+ }
+ t.Log("Sending EAPOL flows fail:", err)
+ time.Sleep(50 * time.Millisecond)
+ maxTries--
+ if maxTries == 0 {
+ break
+ }
+ }
assert.Nil(t, err)
}
-func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup, flowAddFail bool, flowDelete bool) {
+func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup, flowAddFail bool, flowDeleteFail bool) {
defer wg.Done()
// Clear any existing flows on the adapters
@@ -1092,8 +1112,8 @@
nb.onuAdapter.ClearFlows()
// Set the adapter actions on flow addition/deletion
- nb.oltAdapter.SetFlowAction(flowAddFail, flowDelete)
- nb.onuAdapter.SetFlowAction(flowAddFail, flowDelete)
+ nb.oltAdapter.SetFlowAction(flowAddFail, flowDeleteFail)
+ nb.onuAdapter.SetFlowAction(flowAddFail, flowDeleteFail)
// Wait until a logical device is ready
var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
@@ -1199,7 +1219,7 @@
// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
var wg sync.WaitGroup
wg.Add(1)
- go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg, true, true)
+ go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg, true, false)
// Create the device with valid data
oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
@@ -1280,7 +1300,7 @@
// 4. Test Enable a device
nb.testEnableDevice(t, nbi)
- // 5. Test disable and ReEnable a root device
+ //// 5. Test disable and ReEnable a root device
nb.testDisableAndReEnableRootDevice(t, nbi)
// 6. Test disable and Enable pon port of OLT device
@@ -1298,6 +1318,9 @@
// 10. Test omci test
nb.testStartOmciTestAction(t, nbi)
+ // 11. Remove all devices from tests above
+ nb.deleteAllDevices(t, nbi)
+
// 11. Test flow add failure
nb.testFlowAddFailure(t, nbi)
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 901b27f..c8f03e7 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -240,7 +240,7 @@
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
func (agent *Agent) getDeviceWithoutLock() *voltha.Device {
- return proto.Clone(agent.device).(*voltha.Device)
+ return agent.device
}
// enableDevice activates a preprovisioned or a disable device
@@ -891,8 +891,7 @@
cloned := agent.getDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- return agent.updateDeviceInStoreWithoutLock(updateCtx, cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
@@ -1461,6 +1460,7 @@
cloned.Ports = append(cloned.Ports, ponPort)
logger.Infow("adding-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
}
+
// Store the device
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
@@ -1553,13 +1553,12 @@
return errors.New("device agent stopped")
}
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if err := agent.clusterDataProxy.Update(updateCtx, "devices/"+agent.deviceID, device); err != nil {
+ if err := agent.clusterDataProxy.Update(ctx, "devices/"+agent.deviceID, device); err != nil {
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
logger.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
- agent.device = proto.Clone(device).(*voltha.Device)
+ agent.device = device
return nil
}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 15eb677..7723b74 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -47,7 +47,6 @@
clusterDataProxy *model.Proxy
stopped bool
deviceRoutes *route.DeviceRoutes
- lockDeviceRoutes sync.RWMutex
logicalPortsNo map[uint32]bool //value is true for NNI port
lockLogicalPortsNo sync.RWMutex
flowDecomposer *fd.FlowDecomposer
@@ -66,7 +65,7 @@
}
func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
- deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *LogicalAgent {
+ deviceMgr *Manager, cdProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
var agent LogicalAgent
agent.logicalDeviceID = id
agent.serialNumber = sn
@@ -76,11 +75,12 @@
agent.ldeviceMgr = ldeviceMgr
agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
agent.logicalPortsNo = make(map[uint32]bool)
- agent.defaultTimeout = timeout
+ agent.defaultTimeout = defaultTimeout
agent.requestQueue = coreutils.NewRequestQueue()
agent.meters = make(map[uint32]*MeterChunk)
agent.flows = make(map[uint64]*FlowChunk)
agent.groups = make(map[uint32]*GroupChunk)
+ agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
return &agent
}
@@ -251,7 +251,7 @@
return nil
}
-func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) addFlowsAndGroupsToDevices(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
responses := make([]coreutils.Response, 0)
@@ -259,10 +259,11 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
- logger.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
+ start := time.Now()
+ if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ logger.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err, "wait-time": time.Since(start)})
response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
}
response.Done()
@@ -272,7 +273,7 @@
return responses
}
-func (agent *LogicalAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) deleteFlowsAndGroupsFromDevices(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
@@ -280,10 +281,11 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
- logger.Errorw("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
+ start := time.Now()
+ if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ logger.Errorw("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err, "wait-time": time.Since(start)})
response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
}
response.Done()
@@ -292,7 +294,7 @@
return responses
}
-func (agent *LogicalAgent) updateFlowsAndGroupsOfDevice(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) updateFlowsAndGroupsOfDevice(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
@@ -300,9 +302,9 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.updateFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
logger.Errorw("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
}
@@ -312,7 +314,7 @@
return responses
}
-func (agent *LogicalAgent) deleteFlowsFromParentDevice(ctx context.Context, flows ofp.Flows, metadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) deleteFlowsFromParentDevice(flows ofp.Flows, metadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
responses := make([]coreutils.Response, 0)
for _, flow := range flows.Items {
@@ -327,9 +329,9 @@
}
logger.Debugw("uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
go func(uniPort uint32, metadata *voltha.FlowMetadata) {
- ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.deleteParentFlows(ctx, agent.rootDeviceID, uniPort, metadata); err != nil {
+ if err := agent.deviceMgr.deleteParentFlows(subCtx, agent.rootDeviceID, uniPort, metadata); err != nil {
logger.Error("flow-delete-failed", log.Fields{"device-id": agent.rootDeviceID, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s %v", agent.rootDeviceID, err))
}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 5d35251..0e811e7 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -20,8 +20,6 @@
"context"
"errors"
"fmt"
- "strconv"
-
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/route"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
@@ -31,6 +29,7 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "strconv"
)
//updateFlowTable updates the flow table of that logical device
@@ -40,9 +39,6 @@
return nil
}
- if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
- return err
- }
switch flow.GetCommand() {
case ofp.OfpFlowModCommand_OFPFC_ADD:
return agent.flowAdd(ctx, flow)
@@ -170,7 +166,8 @@
return changed, updated, err
}
}
- respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &flowMetadata)
+
+ respChannels := agent.addFlowsAndGroupsToDevices(deviceRules, &flowMetadata)
// Create the go routines to wait
go func() {
// Wait for completion
@@ -218,7 +215,7 @@
}
// Update the devices
- respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, metadata)
+ respChnls := agent.deleteFlowsAndGroupsFromDevices(deviceRules, metadata)
// Wait for the responses
go func() {
@@ -300,9 +297,9 @@
// Update the devices
if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: toDelete}, &flowMetadata)
+ respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: toDelete}, &flowMetadata)
} else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
}
// Wait for the responses
@@ -378,9 +375,9 @@
}
// Update the devices
if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: flowsToDelete}, &flowMetadata)
+ respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: flowsToDelete}, &flowMetadata)
} else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
}
// Wait for completion
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index a0d6c4a..73caa07 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -37,10 +37,6 @@
return nil
}
- if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
- return err
- }
-
switch groupMod.GetCommand() {
case ofp.OfpGroupModCommand_OFPGC_ADD:
return agent.groupAdd(ctx, groupMod)
@@ -92,7 +88,7 @@
logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
// Update the devices
- respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
+ respChnls := agent.addFlowsAndGroupsToDevices(deviceRules, &voltha.FlowMetadata{})
// Wait for completion
go func() {
@@ -172,7 +168,7 @@
logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
// Update the devices
- respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
+ respChnls := agent.updateFlowsAndGroupsOfDevice(deviceRules, nil)
// Wait for completion
go func() {
@@ -217,7 +213,7 @@
}
// Update the devices
- respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
+ respChnls := agent.updateFlowsAndGroupsOfDevice(deviceRules, &voltha.FlowMetadata{})
// Wait for completion
go func() {
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 7845ad5..7229e05 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -49,22 +49,36 @@
func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
logger.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
var err error
- if port.Type == voltha.Port_ETHERNET_NNI {
+ switch port.Type {
+ case voltha.Port_ETHERNET_NNI:
if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
return err
}
agent.addLogicalPortToMap(port.PortNo, true)
- } else if port.Type == voltha.Port_ETHERNET_UNI {
+ case voltha.Port_ETHERNET_UNI:
if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
return err
}
agent.addLogicalPortToMap(port.PortNo, false)
- } else {
- // Update the device routes to ensure all routes on the logical device have been calculated
- if err = agent.buildRoutes(ctx); err != nil {
- // Not an error - temporary state
- logger.Warnw("failed-to-update-routes", log.Fields{"device-id": device.Id, "port": port, "error": err})
- }
+ case voltha.Port_PON_OLT:
+ // Rebuilt the routes on Parent PON port addition
+ go func() {
+ if err = agent.buildRoutes(ctx); err != nil {
+ // Not an error - temporary state
+ logger.Infow("failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
+ }
+ }()
+ //fallthrough
+ case voltha.Port_PON_ONU:
+ // Add the routes corresponding to that child device
+ go func() {
+ if err = agent.updateAllRoutes(ctx, device); err != nil {
+ // Not an error - temporary state
+ logger.Infow("failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
+ }
+ }()
+ default:
+ return fmt.Errorf("invalid port type %v", port)
}
return nil
}
@@ -360,13 +374,15 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return false, err
}
+
+ defer agent.requestQueue.RequestComplete()
if agent.portExist(device, port) {
logger.Debugw("port-already-exist", log.Fields{"port": port})
- agent.requestQueue.RequestComplete()
return false, nil
}
- agent.requestQueue.RequestComplete()
+ // TODO: Change the port creation logic to include the port capability. This will eliminate the port capability
+ // request that the Core makes following a port create event.
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -375,17 +391,6 @@
return false, err
}
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
-
- defer agent.requestQueue.RequestComplete()
- // Double check again if this port has been already added since the getPortCapability could have taken a long time
- if agent.portExist(device, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
-
portCap.Port.RootPort = true
lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
lp.DeviceId = device.Id
@@ -402,19 +407,11 @@
}
clonedPorts = append(clonedPorts, lp)
- if err = agent.updateLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts); err != nil {
+ if err = agent.addLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts, lp, device); err != nil {
logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
return false, err
}
- // Update the device routes with this new logical port
- clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
- go func() {
- if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
- logger.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
- }
- }()
-
return true, nil
}
@@ -441,13 +438,16 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return false, err
}
+ defer agent.requestQueue.RequestComplete()
if agent.portExist(childDevice, port) {
logger.Debugw("port-already-exist", log.Fields{"port": port})
- agent.requestQueue.RequestComplete()
return false, nil
}
- agent.requestQueue.RequestComplete()
+
+ // TODO: Change the port creation logic to include the port capability. This will eliminate the port capability
+ // request that the Core makes following a port create event.
+
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -455,15 +455,7 @@
logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
return false, err
}
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
- defer agent.requestQueue.RequestComplete()
- // Double check again if this port has been already added since the getPortCapability could have taken a long time
- if agent.portExist(childDevice, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
+
// Get stored logical device
ldevice := agent.getLogicalDeviceWithoutLock()
@@ -478,17 +470,10 @@
clonedPorts = make([]*voltha.LogicalPort, 0)
}
clonedPorts = append(clonedPorts, portCap.Port)
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts); err != nil {
+
+ if err = agent.addLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts, portCap.Port, childDevice); err != nil {
return false, err
}
- // Update the device graph with this new logical port
- clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
-
- go func() {
- if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
- logger.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
- }
- }()
return true, nil
}
@@ -508,6 +493,31 @@
return nil
}
+// addLogicalDevicePortsWithoutLock add the new ports to the logical device, update routes associated with those new
+// ports and send an add port event to the OF controller
+func (agent *LogicalAgent) addLogicalDevicePortsWithoutLock(ctx context.Context, lDevice *voltha.LogicalDevice, newPorts []*voltha.LogicalPort, lp *voltha.LogicalPort, device *voltha.Device) error {
+ oldPorts := lDevice.Ports
+ lDevice.Ports = newPorts
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, lDevice); err != nil {
+ return err
+ }
+
+ // Setup the routes for this device and then send the port update event to the OF Controller
+ go func() {
+ // First setup the routes
+ if err := agent.updateRoutes(context.Background(), device, lp, newPorts); err != nil {
+ // This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
+ // created yet.
+ logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
+ }
+
+ // Send a port update event
+ agent.portUpdated(oldPorts, newPorts)
+ }()
+
+ return nil
+}
+
// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
func diff(oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
newPorts = make(map[string]*voltha.LogicalPort, len(newList))
diff --git a/rw_core/core/device/logical_agent_route.go b/rw_core/core/device/logical_agent_route.go
index 6736160..fa96caf 100644
--- a/rw_core/core/device/logical_agent_route.go
+++ b/rw_core/core/device/logical_agent_route.go
@@ -18,30 +18,26 @@
import (
"context"
- "errors"
"fmt"
-
"github.com/opencord/voltha-go/rw_core/route"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
-// GetRoute returns route
+// GetRoute returns a route
func (agent *LogicalAgent) GetRoute(ctx context.Context, ingressPortNo uint32, egressPortNo uint32) ([]route.Hop, error) {
logger.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
routes := make([]route.Hop, 0)
// Note: A port value of 0 is equivalent to a nil port
- // Consider different possibilities
+ // Controller-bound flow
if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
logger.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
if agent.isNNIPort(ingressPortNo) {
//This is a trap on the NNI Port
- if len(agent.deviceRoutes.Routes) == 0 {
+ if agent.deviceRoutes.IsRoutesEmpty() {
// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
// route with same IngressHop and EgressHop
hop := route.Hop{DeviceID: agent.rootDeviceID, Ingress: ingressPortNo, Egress: ingressPortNo}
@@ -49,62 +45,43 @@
routes = append(routes, hop)
return routes, nil
}
- //Return a 'half' route to make the flow decomposer logic happy
- for routeLink, path := range agent.deviceRoutes.Routes {
- if agent.isNNIPort(routeLink.Egress) {
- routes = append(routes, route.Hop{}) // first hop is set to empty
- routes = append(routes, path[1])
- return routes, nil
- }
+
+ // Return a 'half' route to make the flow decomposer logic happy
+ routes, err := agent.deviceRoutes.GetHalfRoute(true, 0, 0)
+ if err != nil {
+ return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
- return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
+ return routes, nil
}
- //treat it as if the output port is the first NNI of the OLT
+ // Treat it as if the output port is the first NNI of the OLT
var err error
if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
logger.Warnw("no-nni-port", log.Fields{"error": err})
return nil, err
}
}
- //If ingress port is not specified (nil), it may be a wildcarded
- //route if egress port is OFPP_CONTROLLER or a nni logical port,
- //in which case we need to create a half-route where only the egress
- //hop is filled, the first hop is nil
+
+ //If ingress port is not specified (nil), it may be a wildcarded route if egress port is OFPP_CONTROLLER or a nni
+ // logical port, in which case we need to create a half-route where only the egress hop is filled, the first hop is nil
if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
- // We can use the 2nd hop of any upstream route, so just find the first upstream:
- for routeLink, path := range agent.deviceRoutes.Routes {
- if agent.isNNIPort(routeLink.Egress) {
- routes = append(routes, route.Hop{}) // first hop is set to empty
- routes = append(routes, path[1])
- return routes, nil
- }
+ routes, err := agent.deviceRoutes.GetHalfRoute(true, ingressPortNo, egressPortNo)
+ if err != nil {
+ return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
- return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
+ return routes, nil
}
+
//If egress port is not specified (nil), we can also can return a "half" route
if egressPortNo == 0 {
- for routeLink, path := range agent.deviceRoutes.Routes {
- if routeLink.Ingress == ingressPortNo {
- routes = append(routes, path[0])
- routes = append(routes, route.Hop{})
- return routes, nil
- }
+ routes, err := agent.deviceRoutes.GetHalfRoute(false, ingressPortNo, egressPortNo)
+ if err != nil {
+ return nil, fmt.Errorf("no downstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
- return nil, fmt.Errorf("no downstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
+ return routes, nil
}
- // Return the pre-calculated route
- return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
-}
-func (agent *LogicalAgent) getPreCalculatedRoute(ingress, egress uint32) ([]route.Hop, error) {
- logger.Debugw("ROUTE", log.Fields{"len": len(agent.deviceRoutes.Routes)})
- for routeLink, route := range agent.deviceRoutes.Routes {
- logger.Debugw("ROUTELINKS", log.Fields{"ingress": ingress, "egress": egress, "routelink": routeLink})
- if ingress == routeLink.Ingress && egress == routeLink.Egress {
- return route, nil
- }
- }
- return nil, status.Errorf(codes.FailedPrecondition, "no route from:%d to:%d", ingress, egress)
+ // Return the pre-calculated route
+ return agent.deviceRoutes.GetRoute(ctx, ingressPortNo, egressPortNo)
}
// GetDeviceRoutes returns device graph
@@ -112,30 +89,6 @@
return agent.deviceRoutes
}
-//generateDeviceRoutesIfNeeded generates the device routes if the logical device has been updated since the last time
-//that device graph was generated.
-func (agent *LogicalAgent) generateDeviceRoutesIfNeeded(ctx context.Context) error {
- agent.lockDeviceRoutes.Lock()
- defer agent.lockDeviceRoutes.Unlock()
-
- ld, err := agent.GetLogicalDevice(ctx)
- if err != nil {
- return err
- }
-
- if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
- return nil
- }
- logger.Debug("Generation of device route required")
- if err := agent.buildRoutes(ctx); err != nil {
- // No Route is not an error
- if !errors.Is(err, route.ErrNoRoute) {
- return err
- }
- }
- return nil
-}
-
//rebuildRoutes rebuilds the device routes
func (agent *LogicalAgent) buildRoutes(ctx context.Context) error {
logger.Debugf("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
@@ -160,17 +113,28 @@
}
//updateRoutes updates the device routes
-func (agent *LogicalAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
- logger.Debugw("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+func (agent *LogicalAgent) updateRoutes(ctx context.Context, device *voltha.Device, lp *voltha.LogicalPort, lps []*voltha.LogicalPort) error {
+ logger.Debugw("updateRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": device.Id, "port:": lp})
+
+ if err := agent.deviceRoutes.AddPort(ctx, lp, device, lps); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
-
- if agent.deviceRoutes == nil {
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
+ if err := agent.deviceRoutes.Print(); err != nil {
+ return err
}
- if err := agent.deviceRoutes.AddPort(ctx, lp, agent.logicalDevice.Ports); err != nil {
+ return nil
+}
+
+//updateAllRoutes updates the device routes using all the logical ports on that device
+func (agent *LogicalAgent) updateAllRoutes(ctx context.Context, device *voltha.Device) error {
+ logger.Debugw("updateAllRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": device.Id, "ports-count": len(device.Ports)})
+
+ ld, err := agent.GetLogicalDevice(ctx)
+ if err != nil {
+ return err
+ }
+
+ if err := agent.deviceRoutes.AddAllPorts(ctx, device, ld.Ports); err != nil {
return err
}
if err := agent.deviceRoutes.Print(); err != nil {
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 0c4448b..cf1301f 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -725,36 +725,40 @@
return status.Errorf(codes.NotFound, "%s", device.Id)
}
+func (dMgr *Manager) addPeerPort(ctx context.Context, deviceID string, port *voltha.Port) error {
+ meAsPeer := &voltha.Port_PeerPort{DeviceId: deviceID, PortNo: port.PortNo}
+ for _, peerPort := range port.Peers {
+ if agent := dMgr.getDeviceAgent(ctx, peerPort.DeviceId); agent != nil {
+ if err := agent.addPeerPort(ctx, meAsPeer); err != nil {
+ return err
+ }
+ }
+ }
+ // Notify the logical device manager to setup a logical port, if needed. If the added port is an NNI or UNI
+ // then a logical port will be added to the logical device and the device route generated. If the port is a
+ // PON port then only the device graph will be generated.
+ device, err := dMgr.getDevice(ctx, deviceID)
+ if err != nil {
+ return err
+ }
+ if err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, port); err != nil {
+ return err
+ }
+ return nil
+}
+
func (dMgr *Manager) AddPort(ctx context.Context, deviceID string, port *voltha.Port) error {
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent != nil {
if err := agent.addPort(ctx, port); err != nil {
return err
}
- // Setup peer ports
- meAsPeer := &voltha.Port_PeerPort{DeviceId: deviceID, PortNo: port.PortNo}
- for _, peerPort := range port.Peers {
- if agent := dMgr.getDeviceAgent(ctx, peerPort.DeviceId); agent != nil {
- if err := agent.addPeerPort(ctx, meAsPeer); err != nil {
- logger.Errorw("failed-to-add-peer", log.Fields{"peer-device-id": peerPort.DeviceId})
- return err
- }
+ // Setup peer ports in its own routine
+ go func() {
+ if err := dMgr.addPeerPort(ctx, deviceID, port); err != nil {
+ logger.Errorw("unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
}
- }
- // Notify the logical device manager to setup a logical port, if needed. If the added port is an NNI or UNI
- // then a logical port will be added to the logical device and the device graph generated. If the port is a
- // PON port then only the device graph will be generated.
- if device, err := dMgr.getDevice(ctx, deviceID); err == nil {
- go func() {
- err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, port)
- if err != nil {
- logger.Errorw("unable-to-update-logical-port", log.Fields{"error": err})
- }
- }()
- } else {
- logger.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceID})
- return err
- }
+ }()
return nil
}
return status.Errorf(codes.NotFound, "%s", deviceID)