VOL-3121 - Separated out logical ports from logical agent.
Similar to flows/groups/meters.
Also modified device_route tests to generate unique port IDs (`.OfpPort.PortNo`s) across all UNI ports withing each test, i.e. within an OLT.
Also replaced logicalPortsNo map & associated NNI vs UNI logic with root device checks.
Change-Id: Ib0cecbf7d4f8d509ce7c989b9ccf697c8b0d17d6
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 7229e05..ee4e77d 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -19,51 +19,46 @@
import (
"context"
"fmt"
+ "sync"
- "github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- ic "github.com/opencord/voltha-protos/v3/go/inter_container"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
-// ListLogicalDevicePorts returns logical device ports
-func (agent *LogicalAgent) ListLogicalDevicePorts(ctx context.Context) (*voltha.LogicalPorts, error) {
- logger.Debug("ListLogicalDevicePorts")
- logicalDevice, err := agent.GetLogicalDevice(ctx)
- if err != nil {
- return nil, err
+// listLogicalDevicePorts returns logical device ports
+func (agent *LogicalAgent) listLogicalDevicePorts() map[uint32]*voltha.LogicalPort {
+ logger.Debug("listLogicalDevicePorts")
+ portIDs := agent.portLoader.ListIDs()
+ ret := make(map[uint32]*voltha.LogicalPort, len(portIDs))
+ for portID := range portIDs {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ ret[portID] = portHandle.GetReadOnly()
+ portHandle.Unlock()
+ }
}
- if logicalDevice == nil {
- return &voltha.LogicalPorts{}, nil
- }
- lPorts := make([]*voltha.LogicalPort, 0)
- lPorts = append(lPorts, logicalDevice.Ports...)
- return &voltha.LogicalPorts{Items: lPorts}, nil
+ return ret
}
func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
logger.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
- var err error
switch port.Type {
case voltha.Port_ETHERNET_NNI:
- if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
+ if err := agent.addNNILogicalPort(ctx, device, port); err != nil {
return err
}
- agent.addLogicalPortToMap(port.PortNo, true)
case voltha.Port_ETHERNET_UNI:
- if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
+ if err := agent.addUNILogicalPort(ctx, device, port); err != nil {
return err
}
- agent.addLogicalPortToMap(port.PortNo, false)
case voltha.Port_PON_OLT:
// Rebuilt the routes on Parent PON port addition
go func() {
- if err = agent.buildRoutes(ctx); err != nil {
+ if err := agent.buildRoutes(ctx); err != nil {
// Not an error - temporary state
logger.Infow("failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
}
@@ -72,7 +67,7 @@
case voltha.Port_PON_ONU:
// Add the routes corresponding to that child device
go func() {
- if err = agent.updateAllRoutes(ctx, device); err != nil {
+ if err := agent.updateAllRoutes(ctx, device); err != nil {
// Not an error - temporary state
logger.Infow("failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
}
@@ -134,89 +129,77 @@
//Get UNI port number
for _, port := range device.Ports {
if port.Type == voltha.Port_ETHERNET_NNI {
- if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
+ if err = agent.addNNILogicalPort(ctx, device, port); err != nil {
logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
- agent.addLogicalPortToMap(port.PortNo, true)
}
}
return err
}
// updatePortState updates the port state of the device
-func (agent *LogicalAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
+func (agent *LogicalAgent) updatePortState(ctx context.Context, portNo uint32, operStatus voltha.OperStatus_Types) error {
logger.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+ portHandle, have := agent.portLoader.Lock(portNo)
+ if !have {
+ return status.Errorf(codes.NotFound, "port-%d-not-exist", portNo)
+ }
+ defer portHandle.Unlock()
+
+ newPort := clonePortSetState(portHandle.GetReadOnly(), operStatus)
+ if err := portHandle.Update(ctx, newPort); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- // Get the latest logical device info
- original := agent.getLogicalDeviceWithoutLock()
- updatedPorts := clonePorts(original.Ports)
- for _, port := range updatedPorts {
- if port.DeviceId == deviceID && port.DevicePortNo == portNo {
- if operStatus == voltha.OperStatus_ACTIVE {
- port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
- } else {
- port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
- }
- // Update the logical device
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
- logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
- return err
- }
- return nil
- }
- }
- return status.Errorf(codes.NotFound, "port-%d-not-exist", portNo)
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_MODIFY, newPort.OfpPort)
+ return nil
}
// updatePortsState updates the ports state related to the device
-func (agent *LogicalAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
+func (agent *LogicalAgent) updatePortsState(ctx context.Context, deviceID string, state voltha.OperStatus_Types) error {
logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Get the latest logical device info
- original := agent.getLogicalDeviceWithoutLock()
- updatedPorts := clonePorts(original.Ports)
- for _, port := range updatedPorts {
- if port.DeviceId == device.Id {
- if state == voltha.OperStatus_ACTIVE {
- port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
- } else {
- port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+
+ for portNo := range agent.portLoader.ListIDsForDevice(deviceID) {
+ if portHandle, have := agent.portLoader.Lock(portNo); have {
+ newPort := clonePortSetState(portHandle.GetReadOnly(), state)
+ if err := portHandle.Update(ctx, newPort); err != nil {
+ portHandle.Unlock()
+ return err
}
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_MODIFY, newPort.OfpPort)
+
+ portHandle.Unlock()
}
}
- // Updating the logical device will trigger the poprt change events to be populated to the controller
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
- logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
return nil
}
+func clonePortSetState(oldPort *voltha.LogicalPort, state voltha.OperStatus_Types) *voltha.LogicalPort {
+ newPort := *oldPort // only clone the struct(s) that will be changed
+ newOfpPort := *oldPort.OfpPort
+ newPort.OfpPort = &newOfpPort
+
+ if state == voltha.OperStatus_ACTIVE {
+ newOfpPort.Config = newOfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ newOfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+ } else {
+ newOfpPort.Config = newOfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ newOfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ }
+ return &newPort
+}
+
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
logger.Infow("setupUNILogicalPort", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
var err error
- var added bool
//Get UNI port number
for _, port := range childDevice.Ports {
if port.Type == voltha.Port_ETHERNET_UNI {
- if added, err = agent.addUNILogicalPort(ctx, childDevice, port); err != nil {
+ if err = agent.addUNILogicalPort(ctx, childDevice, port); err != nil {
logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
- if added {
- agent.addLogicalPortToMap(port.PortNo, false)
- }
}
}
return err
@@ -225,86 +208,52 @@
// deleteAllLogicalPorts deletes all logical ports associated with this logical device
func (agent *LogicalAgent) deleteAllLogicalPorts(ctx context.Context) error {
logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Get the latest logical device info
- cloned := agent.getLogicalDeviceWithoutLock()
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, cloned, []*voltha.LogicalPort{}); err != nil {
- logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
- return nil
-}
-
-// deleteLogicalPort removes the logical port
-func (agent *LogicalAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- logicalDevice := agent.getLogicalDeviceWithoutLock()
-
- index := -1
- for i, logicalPort := range logicalDevice.Ports {
- if logicalPort.Id == lPort.Id {
- index = i
- break
- }
- }
- if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
- if index < len(clonedPorts)-1 {
- copy(clonedPorts[index:], clonedPorts[index+1:])
- }
- clonedPorts[len(clonedPorts)-1] = nil
- clonedPorts = clonedPorts[:len(clonedPorts)-1]
- logger.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts); err != nil {
- logger.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- return err
- }
-
- // Remove the logical port from cache
- agent.deleteLogicalPortsFromMap([]uint32{lPort.DevicePortNo})
- // Reset the logical device routes
- go func() {
- if err := agent.buildRoutes(context.Background()); err != nil {
- logger.Warnw("device-routes-not-ready", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
+ // for each port
+ for portID := range agent.portLoader.ListIDs() {
+ // TODO: can just call agent.deleteLogicalPort()?
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ oldPort := portHandle.GetReadOnly()
+ // delete
+ err := portHandle.Delete(ctx)
+ portHandle.Unlock()
+ if err != nil {
+ return err
}
- }()
+ // and send event
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_DELETE, oldPort.OfpPort)
+ }
}
+
+ // Reset the logical device routes
+ go func() {
+ if err := agent.buildRoutes(context.Background()); err != nil {
+ logger.Warnw("device-routes-not-ready", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
+ }
+ }()
return nil
}
// deleteLogicalPorts removes the logical ports associated with that deviceId
func (agent *LogicalAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
logger.Debugw("deleting-logical-ports", log.Fields{"device-id": deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logicalDevice := agent.getLogicalDeviceWithoutLock()
- lPortstoKeep := []*voltha.LogicalPort{}
- lPortsNoToDelete := []uint32{}
- for _, logicalPort := range logicalDevice.Ports {
- if logicalPort.DeviceId != deviceID {
- lPortstoKeep = append(lPortstoKeep, logicalPort)
- } else {
- lPortsNoToDelete = append(lPortsNoToDelete, logicalPort.DevicePortNo)
+ // for each port
+ for portNo := range agent.portLoader.ListIDsForDevice(deviceID) {
+ if portHandle, have := agent.portLoader.Lock(portNo); have {
+ // if belongs to this device
+ if oldPort := portHandle.GetReadOnly(); oldPort.DeviceId == deviceID {
+ // delete
+ if err := portHandle.Delete(ctx); err != nil {
+ portHandle.Unlock()
+ return err
+ }
+ // and send event
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_DELETE, oldPort.OfpPort)
+ }
+ portHandle.Unlock()
}
}
- logger.Debugw("deleted-logical-ports", log.Fields{"ports": lPortstoKeep})
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, lPortstoKeep); err != nil {
- logger.Errorw("logical-device-update-failed", log.Fields{"logical-device-id": agent.logicalDeviceID})
- return err
- }
- // Remove the port from the cached logical ports set
- agent.deleteLogicalPortsFromMap(lPortsNoToDelete)
// Reset the logical device routes
go func() {
@@ -312,335 +261,286 @@
logger.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
-
return nil
}
// enableLogicalPort enables the logical port
-func (agent *LogicalAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+func (agent *LogicalAgent) enableLogicalPort(ctx context.Context, lPortNo uint32) error {
+ portHandle, have := agent.portLoader.Lock(lPortNo)
+ if !have {
+ return status.Errorf(codes.NotFound, "port-%d-not-exist", lPortNo)
+ }
+ defer portHandle.Unlock()
+
+ oldPort := portHandle.GetReadOnly()
+
+ newPort := *oldPort // only clone the struct(s) that will be changed
+ newOfpPort := *oldPort.OfpPort
+ newPort.OfpPort = &newOfpPort
+
+ newOfpPort.Config = newOfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ if err := portHandle.Update(ctx, &newPort); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
-
- logicalDevice := agent.getLogicalDeviceWithoutLock()
-
- index := -1
- for i, logicalPort := range logicalDevice.Ports {
- if logicalPort.Id == lPortID {
- index = i
- break
- }
- }
- if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
- clonedPorts[index].OfpPort.Config = clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
- }
- return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
-}
-
-// disableLogicalPort disabled the logical port
-func (agent *LogicalAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- // Get the most up to date logical device
- logicalDevice := agent.getLogicalDeviceWithoutLock()
- index := -1
- for i, logicalPort := range logicalDevice.Ports {
- if logicalPort.Id == lPortID {
- index = i
- break
- }
- }
- if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
- clonedPorts[index].OfpPort.Config = (clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
- }
- return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
-}
-
-// addNNILogicalPort adds an NNI port to the logical device. It returns a bool representing whether a port has been
-// added and an eror in case a valid error is encountered. If the port was successfully added it will return
-// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
-// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) (bool, error) {
- logger.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
-
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
-
- defer agent.requestQueue.RequestComplete()
- if agent.portExist(device, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
-
- // TODO: Change the port creation logic to include the port capability. This will eliminate the port capability
- // request that the Core makes following a port create event.
- var portCap *ic.PortCapability
- var err error
- // First get the port capability
- if portCap, err = agent.deviceMgr.getPortCapability(ctx, device.Id, port.PortNo); err != nil {
- logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return false, err
- }
-
- portCap.Port.RootPort = true
- lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
- lp.DeviceId = device.Id
- lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
- lp.OfpPort.PortNo = port.PortNo
- lp.OfpPort.Name = lp.Id
- lp.DevicePortNo = port.PortNo
-
- ld := agent.getLogicalDeviceWithoutLock()
-
- clonedPorts := clonePorts(ld.Ports)
- if clonedPorts == nil {
- clonedPorts = make([]*voltha.LogicalPort, 0)
- }
- clonedPorts = append(clonedPorts, lp)
-
- if err = agent.addLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts, lp, device); err != nil {
- logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
- return false, err
- }
-
- return true, nil
-}
-
-func (agent *LogicalAgent) portExist(device *voltha.Device, port *voltha.Port) bool {
- ldevice := agent.getLogicalDeviceWithoutLock()
- for _, lPort := range ldevice.Ports {
- if lPort.DeviceId == device.Id && lPort.DevicePortNo == port.PortNo {
- return true
- }
- }
- return false
-}
-
-// addUNILogicalPort adds an UNI port to the logical device. It returns a bool representing whether a port has been
-// added and an eror in case a valid error is encountered. If the port was successfully added it will return
-// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
-// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) (bool, error) {
- logger.Debugw("addUNILogicalPort", log.Fields{"port": port})
- if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
- logger.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
- return false, nil
- }
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
- defer agent.requestQueue.RequestComplete()
-
- if agent.portExist(childDevice, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
-
- // TODO: Change the port creation logic to include the port capability. This will eliminate the port capability
- // request that the Core makes following a port create event.
-
- var portCap *ic.PortCapability
- var err error
- // First get the port capability
- if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, port.PortNo); err != nil {
- logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return false, err
- }
-
- // Get stored logical device
- ldevice := agent.getLogicalDeviceWithoutLock()
-
- logger.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
- portCap.Port.RootPort = false
- portCap.Port.Id = port.Label
- portCap.Port.OfpPort.PortNo = port.PortNo
- portCap.Port.DeviceId = childDevice.Id
- portCap.Port.DevicePortNo = port.PortNo
- clonedPorts := clonePorts(ldevice.Ports)
- if clonedPorts == nil {
- clonedPorts = make([]*voltha.LogicalPort, 0)
- }
- clonedPorts = append(clonedPorts, portCap.Port)
-
- if err = agent.addLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts, portCap.Port, childDevice); err != nil {
- return false, err
- }
-
- return true, nil
-}
-
-func clonePorts(ports []*voltha.LogicalPort) []*voltha.LogicalPort {
- return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
-}
-
-//updateLogicalDevicePortsWithoutLock updates the
-func (agent *LogicalAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
- oldPorts := device.Ports
- device.Ports = newPorts
- if err := agent.updateLogicalDeviceWithoutLock(ctx, device); err != nil {
- return err
- }
- agent.portUpdated(oldPorts, newPorts)
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_MODIFY, newPort.OfpPort)
return nil
}
-// addLogicalDevicePortsWithoutLock add the new ports to the logical device, update routes associated with those new
-// ports and send an add port event to the OF controller
-func (agent *LogicalAgent) addLogicalDevicePortsWithoutLock(ctx context.Context, lDevice *voltha.LogicalDevice, newPorts []*voltha.LogicalPort, lp *voltha.LogicalPort, device *voltha.Device) error {
- oldPorts := lDevice.Ports
- lDevice.Ports = newPorts
- if err := agent.updateLogicalDeviceWithoutLock(ctx, lDevice); err != nil {
+// disableLogicalPort disabled the logical port
+func (agent *LogicalAgent) disableLogicalPort(ctx context.Context, lPortNo uint32) error {
+ portHandle, have := agent.portLoader.Lock(lPortNo)
+ if !have {
+ return status.Errorf(codes.NotFound, "port-%d-not-exist", lPortNo)
+ }
+ defer portHandle.Unlock()
+
+ oldPort := portHandle.GetReadOnly()
+
+ newPort := *oldPort // only clone the struct(s) that will be changed
+ newOfpPort := *oldPort.OfpPort
+ newPort.OfpPort = &newOfpPort
+
+ newOfpPort.Config = (newOfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ if err := portHandle.Update(ctx, &newPort); err != nil {
return err
}
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_MODIFY, newPort.OfpPort)
+ return nil
+}
+
+// addNNILogicalPort adds an NNI port to the logical device. It returns a bool representing whether a port has been
+// added and an error in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+ logger.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
+
+ label := fmt.Sprintf("nni-%d", port.PortNo)
+ tmpPort := &voltha.LogicalPort{
+ RootPort: true,
+ DeviceId: device.Id,
+ Id: label,
+ DevicePortNo: port.PortNo,
+ OfpPort: &voltha.OfpPort{
+ PortNo: port.PortNo,
+ Name: label,
+ },
+ OfpPortStats: &ofp.OfpPortStats{},
+ }
+
+ portHandle, created, err := agent.portLoader.LockOrCreate(ctx, tmpPort)
+ if err != nil {
+ return err
+ }
+ defer portHandle.Unlock()
+
+ if !created {
+ logger.Debugw("port-already-exist", log.Fields{"port": port})
+ return nil
+ }
+
+ // TODO: VOL-3202 Change the port creation logic to include the port capability. This will eliminate
+ // the port capability request that the Core makes following a port create event.
+ // TODO: VOL-3202 the port lock should not be held while getPortCapability() runs (preferably not while *any*
+ // external request runs), this is a temporary hack to avoid updating port state before the port is ready
+
+ // First get the port capability
+ portCap, err := agent.deviceMgr.getPortCapability(ctx, device.Id, port.PortNo)
+ if err != nil {
+ logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
+ return err
+ }
+
+ newPort := portCap.Port
+ newPort.RootPort = true
+ newPort.DeviceId = device.Id
+ newPort.Id = label
+ newPort.DevicePortNo = port.PortNo
+ newPort.OfpPort.PortNo = port.PortNo
+ newPort.OfpPort.Name = label
+
+ // TODO: VOL-3202 shouldn't create tmp port then update, should prepare complete port first then LockOrCreate()
+ // the use of context.Background() is required to ensure we don't get an inconsistent logical port state
+ // while doing this, and can be removed later.
+ if err := portHandle.Update(ctx, newPort); err != nil {
+ if err := portHandle.Delete(context.Background()); err != nil {
+ return fmt.Errorf("unable-to-delete-%d: %s", port.PortNo, err)
+ }
+ return err
+ }
+
+ // ensure that no events will be sent until this one is
+ queuePosition := agent.orderedEvents.assignQueuePosition()
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
// First setup the routes
- if err := agent.updateRoutes(context.Background(), device, lp, newPorts); err != nil {
+ if err := agent.updateRoutes(context.Background(), device, newPort, agent.listLogicalDevicePorts()); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
- logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
+ logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": newPort.OfpPort.PortNo, "error": err})
}
- // Send a port update event
- agent.portUpdated(oldPorts, newPorts)
+ // send event, and allow any queued events to be sent as well
+ queuePosition.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, newPort.OfpPort)
}()
-
return nil
}
-// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
-func diff(oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
- newPorts = make(map[string]*voltha.LogicalPort, len(newList))
- changedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
- deletedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
-
- for _, n := range newList {
- newPorts[n.Id] = n
+// addUNILogicalPort adds an UNI port to the logical device. It returns a bool representing whether a port has been
+// added and an error in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) error {
+ logger.Debugw("addUNILogicalPort", log.Fields{"port": port})
+ if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
+ logger.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
+ return nil
}
- for _, o := range oldList {
- if n, have := newPorts[o.Id]; have {
- delete(newPorts, o.Id) // not new
- if !proto.Equal(n, o) {
- changedPorts[n.Id] = n // changed
- }
- } else {
- deletedPorts[o.Id] = o // deleted
+ tmpPort := &voltha.LogicalPort{
+ RootPort: false,
+ DeviceId: childDevice.Id,
+ Id: port.Label,
+ DevicePortNo: port.PortNo,
+ OfpPort: &voltha.OfpPort{
+ PortNo: port.PortNo,
+ },
+ OfpPortStats: &ofp.OfpPortStats{},
+ }
+
+ portHandle, created, err := agent.portLoader.LockOrCreate(ctx, tmpPort)
+ if err != nil {
+ return err
+ }
+ defer portHandle.Unlock()
+
+ if !created {
+ logger.Debugw("port-already-exist", log.Fields{"port": port})
+ return nil
+ }
+
+ // TODO: VOL-3202 Change the port creation logic to include the port capability. This will eliminate
+ // the port capability request that the Core makes following a port create event.
+ // TODO: VOL-3202 the port lock should not be held while getPortCapability() runs (preferably not while *any*
+ // external request runs), this is a temporary hack to avoid updating port state before the port is ready
+
+ // First get the port capability
+ portCap, err := agent.deviceMgr.getPortCapability(ctx, childDevice.Id, port.PortNo)
+ if err != nil {
+ logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
+ return err
+ }
+
+ logger.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
+ newPort := portCap.Port
+ newPort.RootPort = false
+ newPort.DeviceId = childDevice.Id
+ newPort.Id = port.Label
+ newPort.DevicePortNo = port.PortNo
+ newPort.OfpPort.PortNo = port.PortNo
+
+ // TODO: VOL-3202 shouldn't create tmp port then update, should prepare complete port first then LockOrCreate()
+ // the use of context.Background() is required to ensure we don't get an inconsistent logical port state
+ // while doing this, and can be removed later.
+ if err := portHandle.Update(ctx, newPort); err != nil {
+ if err := portHandle.Delete(context.Background()); err != nil {
+ return fmt.Errorf("unable-to-delete-%d: %s", port.PortNo, err)
}
+ return err
}
- return newPorts, changedPorts, deletedPorts
-}
+ // ensure that no events will be sent until this one is
+ queuePosition := agent.orderedEvents.assignQueuePosition()
-// portUpdated is invoked when a port is updated on the logical device
-func (agent *LogicalAgent) portUpdated(prevPorts, currPorts []*voltha.LogicalPort) interface{} {
- // Get the difference between the two list
- newPorts, changedPorts, deletedPorts := diff(prevPorts, currPorts)
-
- // Send the port change events to the OF controller
- for _, newP := range newPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
- }
- for _, change := range changedPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
- }
- for _, del := range deletedPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
- }
-
+ // Setup the routes for this device and then send the port update event to the OF Controller
+ go func() {
+ // First setup the routes
+ if err := agent.updateRoutes(context.Background(), childDevice, newPort, agent.listLogicalDevicePorts()); err != nil {
+ // This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
+ // created yet.
+ logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": newPort.OfpPort.PortNo, "error": err})
+ }
+ // send event, and allow any queued events to be sent as well
+ queuePosition.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, newPort.OfpPort)
+ }()
return nil
}
-//GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
-//returns their port numbers. This function is invoked only during flow decomposition where the lock on the logical
-//device is already held. Therefore it is safe to retrieve the logical device without lock.
-func (agent *LogicalAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
- lPorts := make([]uint32, 0)
- var exclPort uint32
- if len(excludePort) == 1 {
- exclPort = excludePort[0]
- }
- lDevice := agent.getLogicalDeviceWithoutLock()
- for _, port := range lDevice.Ports {
- if port.OfpPort.PortNo != exclPort {
- lPorts = append(lPorts, port.OfpPort.PortNo)
- }
- }
- return lPorts
+// send is a convenience to avoid calling both assignQueuePosition and qp.send
+func (e *orderedEvents) send(agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
+ qp := e.assignQueuePosition()
+ go qp.send(agent, deviceID, reason, desc)
}
-// helpers for agent.logicalPortsNo
+// TODO: shouldn't need to guarantee event ordering like this
+// event ordering should really be protected by per-LogicalPort lock
+// once routing uses on-demand calculation only, this should be changed
+// assignQueuePosition ensures that no events will be sent until this thread calls send() on the returned queuePosition
+func (e *orderedEvents) assignQueuePosition() queuePosition {
+ e.mutex.Lock()
+ defer e.mutex.Unlock()
-func (agent *LogicalAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
- agent.lockLogicalPortsNo.Lock()
- defer agent.lockLogicalPortsNo.Unlock()
- if exist := agent.logicalPortsNo[portNo]; !exist {
- agent.logicalPortsNo[portNo] = nniPort
+ prev := e.last
+ next := make(chan struct{})
+ e.last = next
+ return queuePosition{
+ prev: prev,
+ next: next,
}
}
-func (agent *LogicalAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
- agent.lockLogicalPortsNo.Lock()
- defer agent.lockLogicalPortsNo.Unlock()
- for _, lp := range lps {
- if exist := agent.logicalPortsNo[lp.DevicePortNo]; !exist {
- agent.logicalPortsNo[lp.DevicePortNo] = lp.RootPort
- }
- }
+// orderedEvents guarantees the order that events are sent, while allowing events to back up.
+type orderedEvents struct {
+ mutex sync.Mutex
+ last <-chan struct{}
}
-func (agent *LogicalAgent) deleteLogicalPortsFromMap(portsNo []uint32) {
- agent.lockLogicalPortsNo.Lock()
- defer agent.lockLogicalPortsNo.Unlock()
- for _, pNo := range portsNo {
- delete(agent.logicalPortsNo, pNo)
- }
+type queuePosition struct {
+ prev <-chan struct{}
+ next chan<- struct{}
}
+// send waits for its turn, then sends the event, then notifies the next in line
+func (qp queuePosition) send(agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
+ if qp.prev != nil {
+ <-qp.prev // wait for turn
+ }
+ agent.ldeviceMgr.SendChangeEvent(deviceID, reason, desc)
+ close(qp.next) // notify next
+}
+
+// GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
+// returns their port numbers.
+func (agent *LogicalAgent) GetWildcardInputPorts(excludePort uint32) map[uint32]struct{} {
+ portIDs := agent.portLoader.ListIDs()
+ delete(portIDs, excludePort)
+ return portIDs
+}
+
+// isNNIPort return true iff the specified port belongs to the parent (OLT) device
func (agent *LogicalAgent) isNNIPort(portNo uint32) bool {
- agent.lockLogicalPortsNo.RLock()
- defer agent.lockLogicalPortsNo.RUnlock()
- if exist := agent.logicalPortsNo[portNo]; exist {
- return agent.logicalPortsNo[portNo]
+ portHandle, have := agent.portLoader.Lock(portNo)
+ if !have {
+ return false
}
- return false
+ defer portHandle.Unlock()
+
+ // any root-device logical port is an NNI port
+ return portHandle.GetReadOnly().RootPort
}
-func (agent *LogicalAgent) getFirstNNIPort() (uint32, error) {
- agent.lockLogicalPortsNo.RLock()
- defer agent.lockLogicalPortsNo.RUnlock()
- for portNo, nni := range agent.logicalPortsNo {
- if nni {
- return portNo, nil
- }
+// getAnyNNIPort returns an NNI port
+func (agent *LogicalAgent) getAnyNNIPort() (uint32, error) {
+ for portID := range agent.portLoader.ListIDsForDevice(agent.rootDeviceID) {
+ return portID, nil
}
return 0, status.Error(codes.NotFound, "No NNI port found")
}
-//GetNNIPorts returns NNI ports.
-func (agent *LogicalAgent) GetNNIPorts() []uint32 {
- agent.lockLogicalPortsNo.RLock()
- defer agent.lockLogicalPortsNo.RUnlock()
- nniPorts := make([]uint32, 0)
- for portNo, nni := range agent.logicalPortsNo {
- if nni {
- nniPorts = append(nniPorts, portNo)
- }
- }
- return nniPorts
+//GetNNIPorts returns all NNI ports
+func (agent *LogicalAgent) GetNNIPorts() map[uint32]struct{} {
+ return agent.portLoader.ListIDsForDevice(agent.rootDeviceID)
}
// getUNILogicalPortNo returns the UNI logical port number specified in the flow