[VOL-2995] Improve Core performance
This commit consists of the following changes with the aim to
improve the Core performance:
1) Use a hybrid approach of pre-route calculation and route
calculation on-demand. For example, attempts to pre-calculate
routes will be done whenever a nni/uni port is discovered. The
attempt may fail if there are not enough ports to generate a
route. When a flow is received and the route is not available
then only the route relevant to that flow will be created on
demand.
2) Changes some of the route calculation flow such that the
process does not need to go and grab the latest version of the
device which could lead to higher latency, expecially if that
device is busy with other processing.
3) Change the logic when reporting added ports to ONOS such that
routes are calculated (at least an attempt made) before sending
a port create notification to ONOS.
4) Move peer port creation into its own go routine thereby
removing the lock on a child device much earlier.
5) Wait until a request for port capability is received before
removing the lock on a device. A better approach is required
where the adapter will need to report the port capability along
with the port creation event. However, this require another
Jira as changes will be required in the API.
6) Remove some unnecessary proto.clones. Those are the obvious
ones. Removal of other proto.clones will be done in a separate
commit.
7) Fix a core panic when concurrent requests are made to the
route map
Change-Id: I2bafc99dbf10d7026572a44af0b88a31b5eb1887
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index fa1f657..0c373ea 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -598,6 +598,11 @@
}
func (nb *NBTest) deleteAllDevices(t *testing.T, nbi *NBIHandler) {
+ devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
+ if len(devices.Items) == 0 {
+ // Nothing to do
+ return
+ }
//Get an OLT device
oltDevice, err := nb.getADevice(true, nbi)
assert.Nil(t, err)
@@ -1080,11 +1085,26 @@
},
}
flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
- _, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP)
+ maxTries := 3
+ var err error
+ for {
+ if _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP); err == nil {
+ if maxTries < 3 {
+ t.Log("Re-sending EAPOL flow succeeded for port:", port)
+ }
+ break
+ }
+ t.Log("Sending EAPOL flows fail:", err)
+ time.Sleep(50 * time.Millisecond)
+ maxTries--
+ if maxTries == 0 {
+ break
+ }
+ }
assert.Nil(t, err)
}
-func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup, flowAddFail bool, flowDelete bool) {
+func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup, flowAddFail bool, flowDeleteFail bool) {
defer wg.Done()
// Clear any existing flows on the adapters
@@ -1092,8 +1112,8 @@
nb.onuAdapter.ClearFlows()
// Set the adapter actions on flow addition/deletion
- nb.oltAdapter.SetFlowAction(flowAddFail, flowDelete)
- nb.onuAdapter.SetFlowAction(flowAddFail, flowDelete)
+ nb.oltAdapter.SetFlowAction(flowAddFail, flowDeleteFail)
+ nb.onuAdapter.SetFlowAction(flowAddFail, flowDeleteFail)
// Wait until a logical device is ready
var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
@@ -1199,7 +1219,7 @@
// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
var wg sync.WaitGroup
wg.Add(1)
- go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg, true, true)
+ go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg, true, false)
// Create the device with valid data
oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
@@ -1280,7 +1300,7 @@
// 4. Test Enable a device
nb.testEnableDevice(t, nbi)
- // 5. Test disable and ReEnable a root device
+ //// 5. Test disable and ReEnable a root device
nb.testDisableAndReEnableRootDevice(t, nbi)
// 6. Test disable and Enable pon port of OLT device
@@ -1298,6 +1318,9 @@
// 10. Test omci test
nb.testStartOmciTestAction(t, nbi)
+ // 11. Remove all devices from tests above
+ nb.deleteAllDevices(t, nbi)
+
// 11. Test flow add failure
nb.testFlowAddFailure(t, nbi)
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 901b27f..c8f03e7 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -240,7 +240,7 @@
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
func (agent *Agent) getDeviceWithoutLock() *voltha.Device {
- return proto.Clone(agent.device).(*voltha.Device)
+ return agent.device
}
// enableDevice activates a preprovisioned or a disable device
@@ -891,8 +891,7 @@
cloned := agent.getDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- return agent.updateDeviceInStoreWithoutLock(updateCtx, cloned, false, "")
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
@@ -1461,6 +1460,7 @@
cloned.Ports = append(cloned.Ports, ponPort)
logger.Infow("adding-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
}
+
// Store the device
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
@@ -1553,13 +1553,12 @@
return errors.New("device agent stopped")
}
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if err := agent.clusterDataProxy.Update(updateCtx, "devices/"+agent.deviceID, device); err != nil {
+ if err := agent.clusterDataProxy.Update(ctx, "devices/"+agent.deviceID, device); err != nil {
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
logger.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
- agent.device = proto.Clone(device).(*voltha.Device)
+ agent.device = device
return nil
}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 15eb677..7723b74 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -47,7 +47,6 @@
clusterDataProxy *model.Proxy
stopped bool
deviceRoutes *route.DeviceRoutes
- lockDeviceRoutes sync.RWMutex
logicalPortsNo map[uint32]bool //value is true for NNI port
lockLogicalPortsNo sync.RWMutex
flowDecomposer *fd.FlowDecomposer
@@ -66,7 +65,7 @@
}
func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
- deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *LogicalAgent {
+ deviceMgr *Manager, cdProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
var agent LogicalAgent
agent.logicalDeviceID = id
agent.serialNumber = sn
@@ -76,11 +75,12 @@
agent.ldeviceMgr = ldeviceMgr
agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
agent.logicalPortsNo = make(map[uint32]bool)
- agent.defaultTimeout = timeout
+ agent.defaultTimeout = defaultTimeout
agent.requestQueue = coreutils.NewRequestQueue()
agent.meters = make(map[uint32]*MeterChunk)
agent.flows = make(map[uint64]*FlowChunk)
agent.groups = make(map[uint32]*GroupChunk)
+ agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
return &agent
}
@@ -251,7 +251,7 @@
return nil
}
-func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) addFlowsAndGroupsToDevices(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
responses := make([]coreutils.Response, 0)
@@ -259,10 +259,11 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
- logger.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
+ start := time.Now()
+ if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ logger.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err, "wait-time": time.Since(start)})
response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
}
response.Done()
@@ -272,7 +273,7 @@
return responses
}
-func (agent *LogicalAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) deleteFlowsAndGroupsFromDevices(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
@@ -280,10 +281,11 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
- logger.Errorw("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
+ start := time.Now()
+ if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ logger.Errorw("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err, "wait-time": time.Since(start)})
response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
}
response.Done()
@@ -292,7 +294,7 @@
return responses
}
-func (agent *LogicalAgent) updateFlowsAndGroupsOfDevice(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) updateFlowsAndGroupsOfDevice(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
@@ -300,9 +302,9 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.updateFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
logger.Errorw("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
}
@@ -312,7 +314,7 @@
return responses
}
-func (agent *LogicalAgent) deleteFlowsFromParentDevice(ctx context.Context, flows ofp.Flows, metadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) deleteFlowsFromParentDevice(flows ofp.Flows, metadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
responses := make([]coreutils.Response, 0)
for _, flow := range flows.Items {
@@ -327,9 +329,9 @@
}
logger.Debugw("uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
go func(uniPort uint32, metadata *voltha.FlowMetadata) {
- ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.deleteParentFlows(ctx, agent.rootDeviceID, uniPort, metadata); err != nil {
+ if err := agent.deviceMgr.deleteParentFlows(subCtx, agent.rootDeviceID, uniPort, metadata); err != nil {
logger.Error("flow-delete-failed", log.Fields{"device-id": agent.rootDeviceID, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s %v", agent.rootDeviceID, err))
}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 5d35251..0e811e7 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -20,8 +20,6 @@
"context"
"errors"
"fmt"
- "strconv"
-
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/route"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
@@ -31,6 +29,7 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "strconv"
)
//updateFlowTable updates the flow table of that logical device
@@ -40,9 +39,6 @@
return nil
}
- if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
- return err
- }
switch flow.GetCommand() {
case ofp.OfpFlowModCommand_OFPFC_ADD:
return agent.flowAdd(ctx, flow)
@@ -170,7 +166,8 @@
return changed, updated, err
}
}
- respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &flowMetadata)
+
+ respChannels := agent.addFlowsAndGroupsToDevices(deviceRules, &flowMetadata)
// Create the go routines to wait
go func() {
// Wait for completion
@@ -218,7 +215,7 @@
}
// Update the devices
- respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, metadata)
+ respChnls := agent.deleteFlowsAndGroupsFromDevices(deviceRules, metadata)
// Wait for the responses
go func() {
@@ -300,9 +297,9 @@
// Update the devices
if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: toDelete}, &flowMetadata)
+ respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: toDelete}, &flowMetadata)
} else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
}
// Wait for the responses
@@ -378,9 +375,9 @@
}
// Update the devices
if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: flowsToDelete}, &flowMetadata)
+ respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: flowsToDelete}, &flowMetadata)
} else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
}
// Wait for completion
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index a0d6c4a..73caa07 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -37,10 +37,6 @@
return nil
}
- if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
- return err
- }
-
switch groupMod.GetCommand() {
case ofp.OfpGroupModCommand_OFPGC_ADD:
return agent.groupAdd(ctx, groupMod)
@@ -92,7 +88,7 @@
logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
// Update the devices
- respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
+ respChnls := agent.addFlowsAndGroupsToDevices(deviceRules, &voltha.FlowMetadata{})
// Wait for completion
go func() {
@@ -172,7 +168,7 @@
logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
// Update the devices
- respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
+ respChnls := agent.updateFlowsAndGroupsOfDevice(deviceRules, nil)
// Wait for completion
go func() {
@@ -217,7 +213,7 @@
}
// Update the devices
- respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
+ respChnls := agent.updateFlowsAndGroupsOfDevice(deviceRules, &voltha.FlowMetadata{})
// Wait for completion
go func() {
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 7845ad5..7229e05 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -49,22 +49,36 @@
func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
logger.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
var err error
- if port.Type == voltha.Port_ETHERNET_NNI {
+ switch port.Type {
+ case voltha.Port_ETHERNET_NNI:
if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
return err
}
agent.addLogicalPortToMap(port.PortNo, true)
- } else if port.Type == voltha.Port_ETHERNET_UNI {
+ case voltha.Port_ETHERNET_UNI:
if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
return err
}
agent.addLogicalPortToMap(port.PortNo, false)
- } else {
- // Update the device routes to ensure all routes on the logical device have been calculated
- if err = agent.buildRoutes(ctx); err != nil {
- // Not an error - temporary state
- logger.Warnw("failed-to-update-routes", log.Fields{"device-id": device.Id, "port": port, "error": err})
- }
+ case voltha.Port_PON_OLT:
+ // Rebuilt the routes on Parent PON port addition
+ go func() {
+ if err = agent.buildRoutes(ctx); err != nil {
+ // Not an error - temporary state
+ logger.Infow("failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
+ }
+ }()
+ //fallthrough
+ case voltha.Port_PON_ONU:
+ // Add the routes corresponding to that child device
+ go func() {
+ if err = agent.updateAllRoutes(ctx, device); err != nil {
+ // Not an error - temporary state
+ logger.Infow("failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
+ }
+ }()
+ default:
+ return fmt.Errorf("invalid port type %v", port)
}
return nil
}
@@ -360,13 +374,15 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return false, err
}
+
+ defer agent.requestQueue.RequestComplete()
if agent.portExist(device, port) {
logger.Debugw("port-already-exist", log.Fields{"port": port})
- agent.requestQueue.RequestComplete()
return false, nil
}
- agent.requestQueue.RequestComplete()
+ // TODO: Change the port creation logic to include the port capability. This will eliminate the port capability
+ // request that the Core makes following a port create event.
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -375,17 +391,6 @@
return false, err
}
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
-
- defer agent.requestQueue.RequestComplete()
- // Double check again if this port has been already added since the getPortCapability could have taken a long time
- if agent.portExist(device, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
-
portCap.Port.RootPort = true
lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
lp.DeviceId = device.Id
@@ -402,19 +407,11 @@
}
clonedPorts = append(clonedPorts, lp)
- if err = agent.updateLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts); err != nil {
+ if err = agent.addLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts, lp, device); err != nil {
logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
return false, err
}
- // Update the device routes with this new logical port
- clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
- go func() {
- if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
- logger.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
- }
- }()
-
return true, nil
}
@@ -441,13 +438,16 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return false, err
}
+ defer agent.requestQueue.RequestComplete()
if agent.portExist(childDevice, port) {
logger.Debugw("port-already-exist", log.Fields{"port": port})
- agent.requestQueue.RequestComplete()
return false, nil
}
- agent.requestQueue.RequestComplete()
+
+ // TODO: Change the port creation logic to include the port capability. This will eliminate the port capability
+ // request that the Core makes following a port create event.
+
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -455,15 +455,7 @@
logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
return false, err
}
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
- defer agent.requestQueue.RequestComplete()
- // Double check again if this port has been already added since the getPortCapability could have taken a long time
- if agent.portExist(childDevice, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
+
// Get stored logical device
ldevice := agent.getLogicalDeviceWithoutLock()
@@ -478,17 +470,10 @@
clonedPorts = make([]*voltha.LogicalPort, 0)
}
clonedPorts = append(clonedPorts, portCap.Port)
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts); err != nil {
+
+ if err = agent.addLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts, portCap.Port, childDevice); err != nil {
return false, err
}
- // Update the device graph with this new logical port
- clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
-
- go func() {
- if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
- logger.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
- }
- }()
return true, nil
}
@@ -508,6 +493,31 @@
return nil
}
+// addLogicalDevicePortsWithoutLock add the new ports to the logical device, update routes associated with those new
+// ports and send an add port event to the OF controller
+func (agent *LogicalAgent) addLogicalDevicePortsWithoutLock(ctx context.Context, lDevice *voltha.LogicalDevice, newPorts []*voltha.LogicalPort, lp *voltha.LogicalPort, device *voltha.Device) error {
+ oldPorts := lDevice.Ports
+ lDevice.Ports = newPorts
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, lDevice); err != nil {
+ return err
+ }
+
+ // Setup the routes for this device and then send the port update event to the OF Controller
+ go func() {
+ // First setup the routes
+ if err := agent.updateRoutes(context.Background(), device, lp, newPorts); err != nil {
+ // This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
+ // created yet.
+ logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
+ }
+
+ // Send a port update event
+ agent.portUpdated(oldPorts, newPorts)
+ }()
+
+ return nil
+}
+
// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
func diff(oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
newPorts = make(map[string]*voltha.LogicalPort, len(newList))
diff --git a/rw_core/core/device/logical_agent_route.go b/rw_core/core/device/logical_agent_route.go
index 6736160..fa96caf 100644
--- a/rw_core/core/device/logical_agent_route.go
+++ b/rw_core/core/device/logical_agent_route.go
@@ -18,30 +18,26 @@
import (
"context"
- "errors"
"fmt"
-
"github.com/opencord/voltha-go/rw_core/route"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
-// GetRoute returns route
+// GetRoute returns a route
func (agent *LogicalAgent) GetRoute(ctx context.Context, ingressPortNo uint32, egressPortNo uint32) ([]route.Hop, error) {
logger.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
routes := make([]route.Hop, 0)
// Note: A port value of 0 is equivalent to a nil port
- // Consider different possibilities
+ // Controller-bound flow
if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
logger.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
if agent.isNNIPort(ingressPortNo) {
//This is a trap on the NNI Port
- if len(agent.deviceRoutes.Routes) == 0 {
+ if agent.deviceRoutes.IsRoutesEmpty() {
// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
// route with same IngressHop and EgressHop
hop := route.Hop{DeviceID: agent.rootDeviceID, Ingress: ingressPortNo, Egress: ingressPortNo}
@@ -49,62 +45,43 @@
routes = append(routes, hop)
return routes, nil
}
- //Return a 'half' route to make the flow decomposer logic happy
- for routeLink, path := range agent.deviceRoutes.Routes {
- if agent.isNNIPort(routeLink.Egress) {
- routes = append(routes, route.Hop{}) // first hop is set to empty
- routes = append(routes, path[1])
- return routes, nil
- }
+
+ // Return a 'half' route to make the flow decomposer logic happy
+ routes, err := agent.deviceRoutes.GetHalfRoute(true, 0, 0)
+ if err != nil {
+ return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
- return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
+ return routes, nil
}
- //treat it as if the output port is the first NNI of the OLT
+ // Treat it as if the output port is the first NNI of the OLT
var err error
if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
logger.Warnw("no-nni-port", log.Fields{"error": err})
return nil, err
}
}
- //If ingress port is not specified (nil), it may be a wildcarded
- //route if egress port is OFPP_CONTROLLER or a nni logical port,
- //in which case we need to create a half-route where only the egress
- //hop is filled, the first hop is nil
+
+ //If ingress port is not specified (nil), it may be a wildcarded route if egress port is OFPP_CONTROLLER or a nni
+ // logical port, in which case we need to create a half-route where only the egress hop is filled, the first hop is nil
if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
- // We can use the 2nd hop of any upstream route, so just find the first upstream:
- for routeLink, path := range agent.deviceRoutes.Routes {
- if agent.isNNIPort(routeLink.Egress) {
- routes = append(routes, route.Hop{}) // first hop is set to empty
- routes = append(routes, path[1])
- return routes, nil
- }
+ routes, err := agent.deviceRoutes.GetHalfRoute(true, ingressPortNo, egressPortNo)
+ if err != nil {
+ return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
- return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
+ return routes, nil
}
+
//If egress port is not specified (nil), we can also can return a "half" route
if egressPortNo == 0 {
- for routeLink, path := range agent.deviceRoutes.Routes {
- if routeLink.Ingress == ingressPortNo {
- routes = append(routes, path[0])
- routes = append(routes, route.Hop{})
- return routes, nil
- }
+ routes, err := agent.deviceRoutes.GetHalfRoute(false, ingressPortNo, egressPortNo)
+ if err != nil {
+ return nil, fmt.Errorf("no downstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
- return nil, fmt.Errorf("no downstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
+ return routes, nil
}
- // Return the pre-calculated route
- return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
-}
-func (agent *LogicalAgent) getPreCalculatedRoute(ingress, egress uint32) ([]route.Hop, error) {
- logger.Debugw("ROUTE", log.Fields{"len": len(agent.deviceRoutes.Routes)})
- for routeLink, route := range agent.deviceRoutes.Routes {
- logger.Debugw("ROUTELINKS", log.Fields{"ingress": ingress, "egress": egress, "routelink": routeLink})
- if ingress == routeLink.Ingress && egress == routeLink.Egress {
- return route, nil
- }
- }
- return nil, status.Errorf(codes.FailedPrecondition, "no route from:%d to:%d", ingress, egress)
+ // Return the pre-calculated route
+ return agent.deviceRoutes.GetRoute(ctx, ingressPortNo, egressPortNo)
}
// GetDeviceRoutes returns device graph
@@ -112,30 +89,6 @@
return agent.deviceRoutes
}
-//generateDeviceRoutesIfNeeded generates the device routes if the logical device has been updated since the last time
-//that device graph was generated.
-func (agent *LogicalAgent) generateDeviceRoutesIfNeeded(ctx context.Context) error {
- agent.lockDeviceRoutes.Lock()
- defer agent.lockDeviceRoutes.Unlock()
-
- ld, err := agent.GetLogicalDevice(ctx)
- if err != nil {
- return err
- }
-
- if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
- return nil
- }
- logger.Debug("Generation of device route required")
- if err := agent.buildRoutes(ctx); err != nil {
- // No Route is not an error
- if !errors.Is(err, route.ErrNoRoute) {
- return err
- }
- }
- return nil
-}
-
//rebuildRoutes rebuilds the device routes
func (agent *LogicalAgent) buildRoutes(ctx context.Context) error {
logger.Debugf("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
@@ -160,17 +113,28 @@
}
//updateRoutes updates the device routes
-func (agent *LogicalAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
- logger.Debugw("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+func (agent *LogicalAgent) updateRoutes(ctx context.Context, device *voltha.Device, lp *voltha.LogicalPort, lps []*voltha.LogicalPort) error {
+ logger.Debugw("updateRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": device.Id, "port:": lp})
+
+ if err := agent.deviceRoutes.AddPort(ctx, lp, device, lps); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
-
- if agent.deviceRoutes == nil {
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
+ if err := agent.deviceRoutes.Print(); err != nil {
+ return err
}
- if err := agent.deviceRoutes.AddPort(ctx, lp, agent.logicalDevice.Ports); err != nil {
+ return nil
+}
+
+//updateAllRoutes updates the device routes using all the logical ports on that device
+func (agent *LogicalAgent) updateAllRoutes(ctx context.Context, device *voltha.Device) error {
+ logger.Debugw("updateAllRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": device.Id, "ports-count": len(device.Ports)})
+
+ ld, err := agent.GetLogicalDevice(ctx)
+ if err != nil {
+ return err
+ }
+
+ if err := agent.deviceRoutes.AddAllPorts(ctx, device, ld.Ports); err != nil {
return err
}
if err := agent.deviceRoutes.Print(); err != nil {
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 0c4448b..cf1301f 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -725,36 +725,40 @@
return status.Errorf(codes.NotFound, "%s", device.Id)
}
+func (dMgr *Manager) addPeerPort(ctx context.Context, deviceID string, port *voltha.Port) error {
+ meAsPeer := &voltha.Port_PeerPort{DeviceId: deviceID, PortNo: port.PortNo}
+ for _, peerPort := range port.Peers {
+ if agent := dMgr.getDeviceAgent(ctx, peerPort.DeviceId); agent != nil {
+ if err := agent.addPeerPort(ctx, meAsPeer); err != nil {
+ return err
+ }
+ }
+ }
+ // Notify the logical device manager to setup a logical port, if needed. If the added port is an NNI or UNI
+ // then a logical port will be added to the logical device and the device route generated. If the port is a
+ // PON port then only the device graph will be generated.
+ device, err := dMgr.getDevice(ctx, deviceID)
+ if err != nil {
+ return err
+ }
+ if err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, port); err != nil {
+ return err
+ }
+ return nil
+}
+
func (dMgr *Manager) AddPort(ctx context.Context, deviceID string, port *voltha.Port) error {
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent != nil {
if err := agent.addPort(ctx, port); err != nil {
return err
}
- // Setup peer ports
- meAsPeer := &voltha.Port_PeerPort{DeviceId: deviceID, PortNo: port.PortNo}
- for _, peerPort := range port.Peers {
- if agent := dMgr.getDeviceAgent(ctx, peerPort.DeviceId); agent != nil {
- if err := agent.addPeerPort(ctx, meAsPeer); err != nil {
- logger.Errorw("failed-to-add-peer", log.Fields{"peer-device-id": peerPort.DeviceId})
- return err
- }
+ // Setup peer ports in its own routine
+ go func() {
+ if err := dMgr.addPeerPort(ctx, deviceID, port); err != nil {
+ logger.Errorw("unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
}
- }
- // Notify the logical device manager to setup a logical port, if needed. If the added port is an NNI or UNI
- // then a logical port will be added to the logical device and the device graph generated. If the port is a
- // PON port then only the device graph will be generated.
- if device, err := dMgr.getDevice(ctx, deviceID); err == nil {
- go func() {
- err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, port)
- if err != nil {
- logger.Errorw("unable-to-update-logical-port", log.Fields{"error": err})
- }
- }()
- } else {
- logger.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceID})
- return err
- }
+ }()
return nil
}
return status.Errorf(codes.NotFound, "%s", deviceID)
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index 3c29a01..7bad0c2 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -150,7 +150,7 @@
ta.flowLock.Lock()
defer ta.flowLock.Unlock()
- if flows.ToAdd != nil {
+ if flows.ToAdd != nil && len(flows.ToAdd.Items) > 0 {
if ta.failFlowAdd {
return fmt.Errorf("flow-add-error")
}
@@ -158,7 +158,7 @@
ta.flows[f.Id] = f
}
}
- if flows.ToRemove != nil {
+ if flows.ToRemove != nil && len(flows.ToRemove.Items) > 0 {
if ta.failFlowDelete {
return fmt.Errorf("flow-delete-error")
}
diff --git a/rw_core/route/device_route.go b/rw_core/route/device_route.go
index 292ef83..71002c7 100644
--- a/rw_core/route/device_route.go
+++ b/rw_core/route/device_route.go
@@ -54,13 +54,13 @@
type DeviceRoutes struct {
logicalDeviceID string
getDeviceFromModel GetDeviceFunc
- logicalPorts []*voltha.LogicalPort
+ logicalPorts map[uint32]*voltha.LogicalPort
RootPorts map[uint32]uint32
rootPortsLock sync.RWMutex
Routes map[PathID][]Hop
routeBuildLock sync.RWMutex
devicesPonPorts map[string][]*voltha.Port
- devicesPonPortsLock sync.RWMutex
+ childConnectionPort map[string]uint32
}
// NewDeviceRoutes creates device graph instance
@@ -71,6 +71,8 @@
dr.RootPorts = make(map[uint32]uint32)
dr.Routes = make(map[PathID][]Hop)
dr.devicesPonPorts = make(map[string][]*voltha.Port)
+ dr.childConnectionPort = make(map[string]uint32)
+ dr.logicalPorts = make(map[uint32]*voltha.LogicalPort)
logger.Debug("new device routes created ...")
return &dr
}
@@ -83,6 +85,39 @@
return exist
}
+func (dr *DeviceRoutes) GetRoute(ctx context.Context, ingress, egress uint32) ([]Hop, error) {
+ dr.routeBuildLock.Lock()
+ defer dr.routeBuildLock.Unlock()
+
+ if route, exist := dr.Routes[PathID{Ingress: ingress, Egress: egress}]; exist {
+ return route, nil
+ }
+
+ uniPort, nniPort, err := dr.getLogicalPorts(ingress, egress)
+ if err != nil {
+ return nil, fmt.Errorf("no route from:%d to:%d %w", ingress, egress, err)
+ }
+
+ childPonPort, err := dr.getChildPonPort(ctx, uniPort.DeviceId)
+ if err != nil {
+ return nil, err
+ }
+ rootDevicePonPort, err := dr.getParentPonPort(ctx, nniPort.DeviceId, uniPort.DeviceId)
+ if err != nil {
+ return nil, err
+ }
+
+ dr.Routes[PathID{Ingress: nniPort.OfpPort.PortNo, Egress: uniPort.DevicePortNo}] = []Hop{
+ {DeviceID: nniPort.DeviceId, Ingress: nniPort.DevicePortNo, Egress: rootDevicePonPort},
+ {DeviceID: uniPort.DeviceId, Ingress: childPonPort, Egress: uniPort.DevicePortNo},
+ }
+ dr.Routes[PathID{Ingress: uniPort.DevicePortNo, Egress: nniPort.OfpPort.PortNo}] = getReverseRoute(
+ dr.Routes[PathID{Ingress: nniPort.OfpPort.PortNo, Egress: uniPort.DevicePortNo}])
+
+ return dr.Routes[PathID{Ingress: ingress, Egress: egress}], nil
+
+}
+
//ComputeRoutes calculates all the routes between the logical ports. This will clear up any existing route
func (dr *DeviceRoutes) ComputeRoutes(ctx context.Context, lps []*voltha.LogicalPort) error {
dr.routeBuildLock.Lock()
@@ -102,7 +137,6 @@
}
dr.reset()
- dr.logicalPorts = append(dr.logicalPorts, lps...)
// Setup the physical ports to logical ports map, the nni ports as well as the root ports map
physPortToLogicalPortMap := make(map[string]uint32)
@@ -113,7 +147,9 @@
nniPorts = append(nniPorts, lp)
dr.RootPorts[lp.OfpPort.PortNo] = lp.OfpPort.PortNo
}
+ dr.logicalPorts[lp.OfpPort.PortNo] = lp
}
+
if len(nniPorts) == 0 {
return fmt.Errorf("no nni port :%w", ErrNoRoute)
}
@@ -128,7 +164,7 @@
return nil
}
// Get root device
- rootDevice, err = dr.getDevice(ctx, nniPort.DeviceId)
+ rootDevice, err = dr.getDeviceWithCacheUpdate(ctx, nniPort.DeviceId)
if err != nil {
return err
}
@@ -140,17 +176,14 @@
if rootDevicePort.Type == voltha.Port_PON_OLT {
logger.Debugw("peers", log.Fields{"root-device-id": rootDevice.Id, "port-no": rootDevicePort.PortNo, "len-peers": len(rootDevicePort.Peers)})
for _, rootDevicePeer := range rootDevicePort.Peers {
- childDevice, err = dr.getDevice(ctx, rootDevicePeer.DeviceId)
+ childDevice, err = dr.getDeviceWithCacheUpdate(ctx, rootDevicePeer.DeviceId)
if err != nil {
return err
}
- childPonPorts := dr.getDevicePonPorts(childDevice.Id, nniPort.DeviceId)
- if len(childPonPorts) < 1 {
- err = status.Errorf(codes.Aborted, "no-child-pon-port-%s", childDevice.Id)
+ childPonPort, err := dr.getChildPonPort(ctx, childDevice.Id)
+ if err != nil {
return err
}
- // We use the first PON port on the ONU whose parent is the root device.
- childPonPort := childPonPorts[0].PortNo
for _, childDevicePort := range childDevice.Ports {
if childDevicePort.Type == voltha.Port_ETHERNET_UNI {
childLogicalPort, exist := physPortToLogicalPortMap[concatDeviceIDPortID(childDevice.Id, childDevicePort.PortNo)]
@@ -173,102 +206,124 @@
return nil
}
-// verifyPrecondition verify whether the preconditions are met to proceed with addition of the new logical port
-func (dr *DeviceRoutes) addPortAndVerifyPrecondition(lp *voltha.LogicalPort) error {
- var exist, nniLogicalPortExist, uniLogicalPortExist bool
- for _, existingLogicalPort := range dr.logicalPorts {
- nniLogicalPortExist = nniLogicalPortExist || existingLogicalPort.RootPort
- uniLogicalPortExist = uniLogicalPortExist || !existingLogicalPort.RootPort
- exist = exist || existingLogicalPort.OfpPort.PortNo == lp.OfpPort.PortNo
- if nniLogicalPortExist && uniLogicalPortExist && exist {
- break
- }
- }
- if !exist {
- dr.logicalPorts = append(dr.logicalPorts, lp)
- nniLogicalPortExist = nniLogicalPortExist || lp.RootPort
- uniLogicalPortExist = uniLogicalPortExist || !lp.RootPort
+// AddPort augments the current set of routes with new routes corresponding to the logical port "lp". If the routes have
+// not been built yet then use logical port "lps" to compute all current routes (lps includes lp)
+func (dr *DeviceRoutes) AddPort(ctx context.Context, lp *voltha.LogicalPort, device *voltha.Device, lps []*voltha.LogicalPort) error {
+ logger.Debugw("add-port-to-routes", log.Fields{"port": lp, "count-logical-ports": len(lps)})
+
+ // Adding NNI port
+ if lp.RootPort {
+ return dr.AddNNIPort(ctx, lp, device, lps)
}
- // If we do not have both NNI and UNI ports then return an error
- if !(nniLogicalPortExist && uniLogicalPortExist) {
- fmt.Println("errors", nniLogicalPortExist, uniLogicalPortExist)
- return status.Error(codes.FailedPrecondition, "no-uni-and-nni-ports-combination")
+ // Adding UNI port
+ return dr.AddUNIPort(ctx, lp, device, lps)
+}
+
+// AddUNIPort setup routes between the logical UNI port lp and all registered NNI ports
+func (dr *DeviceRoutes) AddUNIPort(ctx context.Context, lp *voltha.LogicalPort, device *voltha.Device, lps []*voltha.LogicalPort) error {
+ logger.Debugw("add-uni-port-to-routes", log.Fields{"port": lp, "count-logical-ports": len(lps)})
+
+ dr.routeBuildLock.Lock()
+ defer dr.routeBuildLock.Unlock()
+
+ // Add port to logical ports
+ dr.logicalPorts[lp.OfpPort.PortNo] = lp
+
+ // Update internal structures with device data
+ dr.updateCache(device)
+
+ // Adding a UNI port
+ childPonPort, err := dr.getChildPonPort(ctx, lp.DeviceId)
+ if err != nil {
+ return err
+ }
+ rootDevicePonPort, err := dr.getParentPonPort(ctx, device.ParentId, device.Id)
+ if err != nil {
+ return err
+ }
+
+ // Adding a UNI port
+ for _, lPort := range lps {
+ if lPort.RootPort {
+ dr.Routes[PathID{Ingress: lPort.OfpPort.PortNo, Egress: lp.OfpPort.PortNo}] = []Hop{
+ {DeviceID: lPort.DeviceId, Ingress: lPort.DevicePortNo, Egress: rootDevicePonPort},
+ {DeviceID: lp.DeviceId, Ingress: childPonPort, Egress: lp.DevicePortNo},
+ }
+ dr.Routes[PathID{Ingress: lp.OfpPort.PortNo, Egress: lPort.OfpPort.PortNo}] = getReverseRoute(
+ dr.Routes[PathID{Ingress: lPort.OfpPort.PortNo, Egress: lp.OfpPort.PortNo}])
+ }
}
return nil
}
-// AddPort augments the current set of routes with new routes corresponding to the logical port "lp". If the routes have
-// not been built yet then use logical port "lps" to compute all current routes (lps includes lp)
-func (dr *DeviceRoutes) AddPort(ctx context.Context, lp *voltha.LogicalPort, lps []*voltha.LogicalPort) error {
- logger.Debugw("add-port-to-routes", log.Fields{"port": lp, "len-logical-ports": len(lps)})
+// AddNNIPort setup routes between the logical NNI port lp and all registered UNI ports
+func (dr *DeviceRoutes) AddNNIPort(ctx context.Context, lp *voltha.LogicalPort, device *voltha.Device, lps []*voltha.LogicalPort) error {
+ logger.Debugw("add-port-to-routes", log.Fields{"port": lp, "logical-ports-count": len(lps), "device-id": device.Id})
dr.routeBuildLock.Lock()
- if len(dr.Routes) == 0 {
- dr.routeBuildLock.Unlock()
- return dr.ComputeRoutes(ctx, lps)
- }
-
- // A set of routes exists
- if err := dr.addPortAndVerifyPrecondition(lp); err != nil {
- dr.reset()
- dr.routeBuildLock.Unlock()
- return err
- }
-
defer dr.routeBuildLock.Unlock()
- // Update the set of root ports, if applicable
- if lp.RootPort {
- dr.RootPorts[lp.OfpPort.PortNo] = lp.OfpPort.PortNo
+
+ // Update internal structures with device data
+ dr.updateCache(device)
+
+ // Setup the physical ports to logical ports map, the nni ports as well as the root ports map
+ physPortToLogicalPortMap := make(map[string]uint32)
+ for _, lp := range lps {
+ physPortToLogicalPortMap[concatDeviceIDPortID(lp.DeviceId, lp.DevicePortNo)] = lp.OfpPort.PortNo
+ if lp.RootPort {
+ dr.rootPortsLock.Lock()
+ dr.RootPorts[lp.OfpPort.PortNo] = lp.OfpPort.PortNo
+ dr.rootPortsLock.Unlock()
+ }
+ dr.logicalPorts[lp.OfpPort.PortNo] = lp
}
- var copyFromNNIPort *voltha.LogicalPort
- // Setup the physical ports to logical ports map
- nniPorts := make([]*voltha.LogicalPort, 0)
- for _, lport := range dr.logicalPorts {
- if lport.RootPort {
- nniPorts = append(nniPorts, lport)
- if copyFromNNIPort == nil && lport.OfpPort.PortNo != lp.OfpPort.PortNo {
- copyFromNNIPort = lport
+ for _, rootDevicePort := range device.Ports {
+ if rootDevicePort.Type == voltha.Port_PON_OLT {
+ logger.Debugw("peers", log.Fields{"root-device-id": device.Id, "port-no": rootDevicePort.PortNo, "len-peers": len(rootDevicePort.Peers)})
+ for _, rootDevicePeer := range rootDevicePort.Peers {
+ childDevice, err := dr.getDeviceWithCacheUpdate(ctx, rootDevicePeer.DeviceId)
+ if err != nil {
+ continue
+ }
+
+ childPonPort, err := dr.getChildPonPort(ctx, childDevice.Id)
+ if err != nil {
+ continue
+ }
+
+ for _, childDevicePort := range childDevice.Ports {
+ childLogicalPort, exist := physPortToLogicalPortMap[concatDeviceIDPortID(childDevice.Id, childDevicePort.PortNo)]
+ if !exist {
+ // This can happen if this logical port has not been created yet for that device
+ continue
+ }
+
+ if childDevicePort.Type == voltha.Port_ETHERNET_UNI {
+ dr.Routes[PathID{Ingress: lp.OfpPort.PortNo, Egress: childLogicalPort}] = []Hop{
+ {DeviceID: device.Id, Ingress: lp.DevicePortNo, Egress: rootDevicePort.PortNo},
+ {DeviceID: childDevice.Id, Ingress: childPonPort, Egress: childDevicePort.PortNo},
+ }
+ dr.Routes[PathID{Ingress: childLogicalPort, Egress: lp.OfpPort.PortNo}] = getReverseRoute(
+ dr.Routes[PathID{Ingress: lp.OfpPort.PortNo, Egress: childLogicalPort}])
+ }
+ }
}
}
}
+ return nil
+}
- if copyFromNNIPort == nil {
- // Trying to add the same NNI port. Just return
- return nil
- }
-
- // Adding NNI Port? If we are here we already have an NNI port with a set of routes. Just copy the existing
- // routes using an existing NNI port
- if lp.RootPort {
- dr.copyFromExistingNNIRoutes(lp, copyFromNNIPort)
- return nil
- }
-
- // Adding a UNI port
- for _, nniPort := range nniPorts {
- childPonPorts := dr.getDevicePonPorts(lp.DeviceId, nniPort.DeviceId)
- if len(childPonPorts) == 0 || len(childPonPorts[0].Peers) == 0 {
- // Ports may not have been cached yet - get the device info which sets the PON port cache
- if _, err := dr.getDevice(ctx, lp.DeviceId); err != nil {
- dr.reset()
+// AddAllPorts setups up new routes using all ports on the device. lps includes the device's logical port
+func (dr *DeviceRoutes) AddAllPorts(ctx context.Context, device *voltha.Device, lps []*voltha.LogicalPort) error {
+ logger.Debugw("add-all-port-to-routes", log.Fields{"logical-ports-count": len(lps), "device-id": device.Id})
+ for _, lp := range lps {
+ if lp.DeviceId == device.Id {
+ if err := dr.AddPort(ctx, lp, device, lps); err != nil {
return err
}
- childPonPorts = dr.getDevicePonPorts(lp.DeviceId, nniPort.DeviceId)
- if len(childPonPorts) == 0 || len(childPonPorts[0].Peers) == 0 {
- dr.reset()
- return status.Errorf(codes.FailedPrecondition, "no-pon-ports-%s", lp.DeviceId)
- }
}
- // We use the first PON port on the child device
- childPonPort := childPonPorts[0]
- dr.Routes[PathID{Ingress: nniPort.OfpPort.PortNo, Egress: lp.OfpPort.PortNo}] = []Hop{
- {DeviceID: nniPort.DeviceId, Ingress: nniPort.DevicePortNo, Egress: childPonPort.Peers[0].PortNo},
- {DeviceID: lp.DeviceId, Ingress: childPonPort.PortNo, Egress: lp.DevicePortNo},
- }
- dr.Routes[PathID{Ingress: lp.OfpPort.PortNo, Egress: nniPort.OfpPort.PortNo}] = getReverseRoute(
- dr.Routes[PathID{Ingress: nniPort.OfpPort.PortNo, Egress: lp.OfpPort.PortNo}])
}
return nil
}
@@ -298,8 +353,8 @@
return nil
}
-// IsUpToDate returns true if device is up to date
-func (dr *DeviceRoutes) IsUpToDate(ld *voltha.LogicalDevice) bool {
+// isUpToDate returns true if device is up to date
+func (dr *DeviceRoutes) isUpToDate(ld *voltha.LogicalDevice) bool {
dr.routeBuildLock.Lock()
defer dr.routeBuildLock.Unlock()
numNNI, numUNI := 0, 0
@@ -313,40 +368,53 @@
return len(dr.Routes) == numNNI*numUNI*2
}
-// getDevicePonPorts returns all the PON ports of a device whose peer device ID is peerDeviceID
-func (dr *DeviceRoutes) getDevicePonPorts(deviceID string, peerDeviceID string) []*voltha.Port {
- dr.devicesPonPortsLock.RLock()
- defer dr.devicesPonPortsLock.RUnlock()
- ponPorts := make([]*voltha.Port, 0)
- ports, exist := dr.devicesPonPorts[deviceID]
- if !exist {
- return ponPorts
- }
- //fmt.Println("getDevicePonPorts", deviceID, peerDeviceID, ports)
- for _, port := range ports {
- for _, peer := range port.Peers {
- if peer.DeviceId == peerDeviceID {
- ponPorts = append(ponPorts, port)
+// IsRoutesEmpty returns true if there are no routes
+func (dr *DeviceRoutes) IsRoutesEmpty() bool {
+ dr.routeBuildLock.RLock()
+ defer dr.routeBuildLock.RUnlock()
+ return len(dr.Routes) == 0
+}
+
+// GetHalfRoute returns a half route that has only the egress hop set or the ingress hop set
+func (dr *DeviceRoutes) GetHalfRoute(nniAsEgress bool, ingress, egress uint32) ([]Hop, error) {
+ dr.routeBuildLock.RLock()
+ defer dr.routeBuildLock.RUnlock()
+ routes := make([]Hop, 0)
+ for routeLink, path := range dr.Routes {
+ // If nniAsEgress is set then the half route will only have the egress hop set where the egress port needs to be
+ // an NNI port
+ if nniAsEgress {
+ // Prioritize a specific egress NNI port if set
+ if egress != 0 && dr.IsRootPort(egress) && routeLink.Egress == egress {
+ routes = append(routes, Hop{})
+ routes = append(routes, path[1])
+ return routes, nil
+ }
+ if egress == 0 && dr.IsRootPort(routeLink.Egress) {
+ routes = append(routes, Hop{})
+ routes = append(routes, path[1])
+ return routes, nil
+ }
+ } else {
+ // Here we use the first route whose ingress port matches the ingress input parameter
+ if ingress != 0 && routeLink.Ingress == ingress {
+ routes = append(routes, path[0])
+ routes = append(routes, Hop{})
+ return routes, nil
}
}
}
- return ponPorts
+ return routes, fmt.Errorf("no half route found for ingress port %d, egress port %d and nni as egress %t", ingress, egress, nniAsEgress)
}
-//getDevice returns the from the model and updates the PON ports map of that device.
-func (dr *DeviceRoutes) getDevice(ctx context.Context, deviceID string) (*voltha.Device, error) {
+//getDeviceWithCacheUpdate returns the from the model and updates the PON ports map of that device.
+func (dr *DeviceRoutes) getDeviceWithCacheUpdate(ctx context.Context, deviceID string) (*voltha.Device, error) {
device, err := dr.getDeviceFromModel(ctx, deviceID)
if err != nil {
logger.Errorw("device-not-found", log.Fields{"deviceId": deviceID, "error": err})
return nil, err
}
- dr.devicesPonPortsLock.Lock()
- defer dr.devicesPonPortsLock.Unlock()
- for _, port := range device.Ports {
- if port.Type == voltha.Port_PON_ONU || port.Type == voltha.Port_PON_OLT {
- dr.devicesPonPorts[device.Id] = append(dr.devicesPonPorts[device.Id], port)
- }
- }
+ dr.updateCache(device)
return device, nil
}
@@ -376,12 +444,10 @@
dr.rootPortsLock.Lock()
dr.RootPorts = make(map[uint32]uint32)
dr.rootPortsLock.Unlock()
- // Do not numGetDeviceCalledLock Routes, logicalPorts as the callee function already holds its numGetDeviceCalledLock.
dr.Routes = make(map[PathID][]Hop)
- dr.logicalPorts = make([]*voltha.LogicalPort, 0)
- dr.devicesPonPortsLock.Lock()
+ dr.logicalPorts = make(map[uint32]*voltha.LogicalPort)
dr.devicesPonPorts = make(map[string][]*voltha.Port)
- dr.devicesPonPortsLock.Unlock()
+ dr.childConnectionPort = make(map[string]uint32)
}
//concatDeviceIdPortId formats a portid using the device id and the port number
@@ -397,3 +463,82 @@
}
return reverse
}
+
+// getChildPonPort returns the child PON port number either from cache or from the model. If it is from the model then
+// it updates the PON ports map of that device.
+func (dr *DeviceRoutes) getChildPonPort(ctx context.Context, deviceID string) (uint32, error) {
+ if port, exist := dr.devicesPonPorts[deviceID]; exist {
+ // Return only the first PON port of that child device
+ return port[0].PortNo, nil
+ }
+
+ // Get child device from model
+ if _, err := dr.getDeviceWithCacheUpdate(ctx, deviceID); err != nil {
+ logger.Errorw("device-not-found", log.Fields{"device-id": deviceID, "error": err})
+ return 0, err
+ }
+
+ // Try again
+ if port, exist := dr.devicesPonPorts[deviceID]; exist {
+ // Return only the first PON port of that child device
+ return port[0].PortNo, nil
+ }
+
+ return 0, fmt.Errorf("pon port not found %s", deviceID)
+}
+
+// getParentPonPort returns the parent PON port of the child device
+func (dr *DeviceRoutes) getParentPonPort(ctx context.Context, deviceID string, childDeviceID string) (uint32, error) {
+ if pNo, exist := dr.childConnectionPort[childDeviceID]; exist {
+ return pNo, nil
+ }
+
+ // Get parent device from the model
+ if _, err := dr.getDeviceWithCacheUpdate(ctx, deviceID); err != nil {
+ logger.Errorw("device-not-found", log.Fields{"deviceId": deviceID, "error": err})
+ return 0, err
+ }
+ // Try again
+ if pNo, exist := dr.childConnectionPort[childDeviceID]; exist {
+ return pNo, nil
+ }
+ return 0, fmt.Errorf("pon port associated with child device %s not found", childDeviceID)
+}
+
+func (dr *DeviceRoutes) updateCache(device *voltha.Device) {
+ for _, port := range device.Ports {
+ if port.Type == voltha.Port_PON_ONU || port.Type == voltha.Port_PON_OLT {
+ dr.devicesPonPorts[device.Id] = append(dr.devicesPonPorts[device.Id], port)
+ for _, peer := range port.Peers {
+ if port.Type == voltha.Port_PON_ONU {
+ dr.childConnectionPort[port.DeviceId] = peer.PortNo
+ } else {
+ dr.childConnectionPort[peer.DeviceId] = port.PortNo
+ }
+ }
+ }
+ }
+}
+
+func (dr *DeviceRoutes) getLogicalPorts(ingress, egress uint32) (uniPort, nniPort *voltha.LogicalPort, err error) {
+ inPort, exist := dr.logicalPorts[ingress]
+ if !exist {
+ err = fmt.Errorf("ingress port %d not found", ingress)
+ return
+ }
+ outPort, exist := dr.logicalPorts[egress]
+ if !exist {
+ err = fmt.Errorf("egress port %d not found", egress)
+ return
+ }
+
+ if inPort.RootPort {
+ nniPort = inPort
+ uniPort = outPort
+ } else {
+ nniPort = outPort
+ uniPort = inPort
+ }
+
+ return
+}
diff --git a/rw_core/route/device_route_test.go b/rw_core/route/device_route_test.go
index 1f90ecd..fbbc802 100644
--- a/rw_core/route/device_route_test.go
+++ b/rw_core/route/device_route_test.go
@@ -35,6 +35,10 @@
oltDeviceID = "olt"
)
+const testSetupPhase contextKey = "testSetupPhase"
+
+type contextKey string
+
//portRegistration is a message sent from an OLT device to a logical device to create a logical port
type portRegistration struct {
port *voltha.Port
@@ -82,7 +86,11 @@
}
ldM.logicalDevice.Ports = append(ldM.logicalDevice.Ports, lp)
if buildRoutes {
- err := ldM.deviceRoutes.AddPort(context.Background(), lp, ldM.logicalDevice.Ports)
+ device, err := getDevice(context.WithValue(context.Background(), testSetupPhase, true), lp.DeviceId)
+ if err != nil {
+ fmt.Println("Error when getting device:", lp.DeviceId, err)
+ }
+ err = ldM.deviceRoutes.AddPort(context.Background(), lp, device, ldM.logicalDevice.Ports)
if err != nil && !strings.Contains(err.Error(), "code = FailedPrecondition") {
fmt.Println("(Error when adding port:", lp, len(ldM.logicalDevice.Ports), err)
}
@@ -225,10 +233,12 @@
return nil
}
-func (onuM *onuManager) GetDeviceHelper(_ context.Context, id string) (*voltha.Device, error) {
- onuM.numGetDeviceInvokedLock.Lock()
- onuM.numGetDeviceInvoked++
- onuM.numGetDeviceInvokedLock.Unlock()
+func (onuM *onuManager) GetDeviceHelper(ctx context.Context, id string) (*voltha.Device, error) {
+ if ctx.Value(testSetupPhase) != true {
+ onuM.numGetDeviceInvokedLock.Lock()
+ onuM.numGetDeviceInvoked++
+ onuM.numGetDeviceInvokedLock.Unlock()
+ }
if id == oltDeviceID {
return onuM.oltMgr.olt, nil
}
@@ -241,7 +251,7 @@
func TestDeviceRoutes_ComputeRoutes(t *testing.T) {
numNNIPort := 2
numPonPortOnOlt := 8
- numOnuPerOltPonPort := 32
+ numOnuPerOltPonPort := 256
numUniPerOnu := 4
done := make(chan struct{})
@@ -272,7 +282,7 @@
assert.Nil(t, err)
// Validate the routes are up to date
- assert.True(t, ldMgr.deviceRoutes.IsUpToDate(ld))
+ assert.True(t, ldMgr.deviceRoutes.isUpToDate(ld))
// Validate the expected number of routes
assert.EqualValues(t, 2*numNNIPort*numPonPortOnOlt*numOnuPerOltPonPort*numUniPerOnu, len(ldMgr.deviceRoutes.Routes))
@@ -286,8 +296,8 @@
func TestDeviceRoutes_AddPort(t *testing.T) {
numNNIPort := 2
- numPonPortOnOlt := 8
- numOnuPerOltPonPort := 32
+ numPonPortOnOlt := 16
+ numOnuPerOltPonPort := 256
numUniPerOnu := 4
done := make(chan struct{})
@@ -316,7 +326,7 @@
ldMgr.deviceRoutes.Print()
// Validate the routes are up to date
- assert.True(t, ldMgr.deviceRoutes.IsUpToDate(ld))
+ assert.True(t, ldMgr.deviceRoutes.isUpToDate(ld))
// Validate the expected number of routes
assert.EqualValues(t, 2*numNNIPort*numPonPortOnOlt*numOnuPerOltPonPort*numUniPerOnu, len(ldMgr.deviceRoutes.Routes))