VOL-3121 - Separated ports from devices.
Similar to flows/groups/meters/logical ports.
Also added ListDevicePorts and GetDevicePort to the adapter API.
Also removed unused `// +build integration` tests.
Change-Id: I586adb9f46a249c9430d4205ef5db2d105dbbe06
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index b3de89d..5ac7aca 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -21,198 +21,229 @@
"fmt"
"github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-go/rw_core/core/device/port"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
-// getPorts retrieves the ports information of the device based on the port type.
-func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
- logger.Debugw(ctx, "getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
- ports := &voltha.Ports{}
- if device, _ := agent.deviceMgr.getDevice(ctx, agent.deviceID); device != nil {
- for _, port := range device.Ports {
- if port.Type == portType {
- ports.Items = append(ports.Items, port)
- }
+// listDevicePorts returns device ports
+func (agent *Agent) listDevicePorts() map[uint32]*voltha.Port {
+ portIDs := agent.portLoader.ListIDs()
+ ports := make(map[uint32]*voltha.Port, len(portIDs))
+ for portID := range portIDs {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ ports[portID] = portHandle.GetReadOnly()
+ portHandle.Unlock()
}
}
return ports
}
-func (agent *Agent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
+// getPorts retrieves the ports information of the device based on the port type.
+func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+ logger.Debugw(ctx, "getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
+ ports := &voltha.Ports{}
+ for _, port := range agent.listDevicePorts() {
+ if port.Type == portType {
+ ports.Items = append(ports.Items, port)
+ }
+ }
+ return ports
+}
+
+func (agent *Agent) getDevicePort(portID uint32) (*voltha.Port, error) {
+ portHandle, have := agent.portLoader.Lock(portID)
+ if !have {
+ return nil, status.Errorf(codes.NotFound, "port-%d", portID)
+ }
+ defer portHandle.Unlock()
+ return portHandle.GetReadOnly(), nil
+}
+
+func (agent *Agent) updatePortsOperState(ctx context.Context, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
logger.Debugw(ctx, "updatePortsOperState", log.Fields{"device-id": agent.deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
+
+ for portID := range agent.portLoader.ListIDs() {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ if oldPort := portHandle.GetReadOnly(); (1<<oldPort.Type)&portTypeFilter == 0 { // only update port types not included in the mask
+ // clone top-level port struct
+ newPort := *oldPort
+ newPort.OperStatus = operStatus
+ if err := portHandle.Update(ctx, &newPort); err != nil {
+ portHandle.Unlock()
+ return err
+ }
+
+ // Notify the logical device manager to change the port state
+ // Do this for NNI and UNIs only. PON ports are not known by logical device
+ if newPort.Type == voltha.Port_ETHERNET_NNI || newPort.Type == voltha.Port_ETHERNET_UNI {
+ go func(portID uint32, ctx context.Context) {
+ if err := agent.deviceMgr.logicalDeviceMgr.updatePortState(ctx, agent.deviceID, portID, operStatus); err != nil {
+ // TODO: VOL-2707
+ logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
+ }
+ }(portID, context.Background())
+ }
+ }
+ portHandle.Unlock()
+ }
}
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
- for _, port := range cloned.Ports {
- port.OperStatus = operStatus
- }
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ return nil
}
func (agent *Agent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Work only on latest data
- // TODO: Get list of ports from device directly instead of the entire device
- cloned := agent.getDeviceWithoutLock()
-
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
return status.Errorf(codes.InvalidArgument, "%s", portType)
}
- for _, port := range cloned.Ports {
- if port.Type == portType && port.PortNo == portNo {
- port.OperStatus = operStatus
- }
+
+ portHandle, have := agent.portLoader.Lock(portNo)
+ if !have {
+ return nil
}
- logger.Debugw(ctx, "portStatusUpdate", log.Fields{"deviceId": cloned.Id})
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ defer portHandle.Unlock()
+
+ port := portHandle.GetReadOnly()
+ if port.Type != portType {
+ return nil
+ }
+
+ newPort := *port // clone top-level port struct
+ newPort.OperStatus = operStatus
+ return portHandle.Update(ctx, &newPort)
}
func (agent *Agent) deleteAllPorts(ctx context.Context) error {
logger.Debugw(ctx, "deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+ device, err := agent.getDevice(ctx)
+ if err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
-
- if cloned.AdminState != voltha.AdminState_DISABLED && cloned.AdminState != voltha.AdminState_DELETED {
- err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", cloned.AdminState))
- logger.Warnw(ctx, "invalid-state-removing-ports", log.Fields{"state": cloned.AdminState, "error": err})
+ if device.AdminState != voltha.AdminState_DISABLED && device.AdminState != voltha.AdminState_DELETED {
+ err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", device.AdminState))
+ logger.Warnw(ctx, "invalid-state-removing-ports", log.Fields{"state": device.AdminState, "error": err})
return err
}
- if len(cloned.Ports) == 0 {
- logger.Debugw(ctx, "no-ports-present", log.Fields{"deviceId": agent.deviceID})
- return nil
- }
- cloned.Ports = []*voltha.Port{}
- logger.Debugw(ctx, "portStatusUpdate", log.Fields{"deviceId": cloned.Id})
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ for portID := range agent.portLoader.ListIDs() {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ if err := portHandle.Delete(ctx); err != nil {
+ portHandle.Unlock()
+ return err
+ }
+ portHandle.Unlock()
+ }
+ }
+ return nil
}
func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "addPort", log.Fields{"deviceId": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
- updatePort := false
- if cloned.Ports == nil {
- // First port
- logger.Debugw(ctx, "addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceID})
- cloned.Ports = make([]*voltha.Port, 0)
- } else {
- for _, p := range cloned.Ports {
- if p.Type == port.Type && p.PortNo == port.PortNo {
- if p.Label == "" && p.Type == voltha.Port_PON_OLT {
- //Creation of OLT PON port is being processed after a default PON port was created. Just update it.
- logger.Infow(ctx, "update-pon-port-created-by-default", log.Fields{"default-port": p, "port-to-add": port})
- p.Label = port.Label
- p.OperStatus = port.OperStatus
- updatePort = true
- break
- }
- logger.Debugw(ctx, "port already exists", log.Fields{"port": port})
- return nil
- }
- }
+ port.AdminState = voltha.AdminState_ENABLED
+
+ portHandle, created, err := agent.portLoader.LockOrCreate(ctx, port)
+ if err != nil {
+ return err
}
- if !updatePort {
- cp := proto.Clone(port).(*voltha.Port)
- // Set the admin state of the port to ENABLE
- cp.AdminState = voltha.AdminState_ENABLED
- cloned.Ports = append(cloned.Ports, cp)
+ defer portHandle.Unlock()
+
+ if created {
+ return nil
}
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+
+ oldPort := portHandle.GetReadOnly()
+ if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
+ logger.Debugw(ctx, "port already exists", log.Fields{"port": port})
+ return nil
+ }
+
+ // Creation of OLT PON port is being processed after a default PON port was created. Just update it.
+ logger.Infow(ctx, "update-pon-port-created-by-default", log.Fields{"default-port": oldPort, "port-to-add": port})
+ newPort := *oldPort // clone top-level port struct
+ newPort.Label = port.Label
+ newPort.OperStatus = port.OperStatus
+
+ return portHandle.Update(ctx, &newPort)
}
func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
- cloned := agent.getDeviceWithoutLock()
-
- // Get the peer port on the device based on the peerPort no
- found := false
- for _, port := range cloned.Ports {
- if port.PortNo == peerPort.PortNo { // found peerPort
- cp := proto.Clone(peerPort).(*voltha.Port_PeerPort)
- port.Peers = append(port.Peers, cp)
- logger.Debugw(ctx, "found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
- found = true
- break
- }
- }
- if !found && agent.isRootdevice {
- // An ONU PON port has been created before the corresponding creation of the OLT PON port. Create the OLT PON port
- // with default values which will be updated once the OLT PON port creation is processed.
+ var portHandle *port.Handle
+ if agent.isRootDevice {
+ // If an ONU PON port needs to be referenced before the corresponding creation of the OLT PON port, then create the OLT PON port
+ // with default values, and update it later when the OLT PON port creation is processed.
ponPort := &voltha.Port{
PortNo: peerPort.PortNo,
Type: voltha.Port_PON_OLT,
AdminState: voltha.AdminState_ENABLED,
DeviceId: agent.deviceID,
- Peers: []*voltha.Port_PeerPort{proto.Clone(peerPort).(*voltha.Port_PeerPort)},
+ Peers: []*voltha.Port_PeerPort{peerPort},
}
- cloned.Ports = append(cloned.Ports, ponPort)
- logger.Infow(ctx, "adding-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
+
+ h, created, err := agent.portLoader.LockOrCreate(ctx, ponPort)
+ if err != nil {
+ return err
+ }
+ defer h.Unlock()
+
+ if created {
+ logger.Infow(ctx, "added-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
+ return nil
+ }
+
+ portHandle = h
+ } else {
+ h, have := agent.portLoader.Lock(peerPort.PortNo)
+ if !have {
+ return nil
+ }
+ defer h.Unlock()
+
+ portHandle = h
}
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ logger.Debugw(ctx, "found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
+
+ newPort := proto.Clone(portHandle.GetReadOnly()).(*voltha.Port)
+ newPort.Peers = append(newPort.Peers, peerPort)
+
+ return portHandle.Update(ctx, newPort)
}
-func (agent *Agent) disablePort(ctx context.Context, Port *voltha.Port) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
- var cp *voltha.Port
- // Get the most up to date the device info
- device := agent.getDeviceWithoutLock()
- for _, port := range device.Ports {
- if port.PortNo == Port.PortNo {
- port.AdminState = voltha.AdminState_DISABLED
- cp = proto.Clone(port).(*voltha.Port)
- break
+func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
+ logger.Debugw(ctx, "disablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
- }
+ portHandle, have := agent.portLoader.Lock(portID)
+ if !have {
+ return status.Errorf(codes.InvalidArgument, "%v", portID)
}
- if cp == nil {
- return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+ defer portHandle.Unlock()
+
+ oldPort := portHandle.GetReadOnly()
+
+ if oldPort.Type != voltha.Port_PON_OLT {
+ return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", oldPort.Type)
}
- if cp.Type != voltha.Port_PON_OLT {
- return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", cp.Type)
- }
- // Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
- logger.Debugw(ctx, "updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ newPort := *oldPort
+ newPort.AdminState = voltha.AdminState_DISABLED
+ if err := portHandle.Update(ctx, &newPort); err != nil {
return err
}
//send request to adapter
+ device, err := agent.getDevice(ctx)
+ if err != nil {
+ return err
+ }
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.DisablePort(ctx, device, cp)
+ ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
if err != nil {
cancel()
return err
@@ -221,39 +252,34 @@
return nil
}
-func (agent *Agent) enablePort(ctx context.Context, Port *voltha.Port) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
+ logger.Debugw(ctx, "enablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+
+ portHandle, have := agent.portLoader.Lock(portID)
+ if !have {
+ return status.Errorf(codes.InvalidArgument, "%v", portID)
+ }
+ defer portHandle.Unlock()
+
+ oldPort := portHandle.GetReadOnly()
+
+ if oldPort.Type != voltha.Port_PON_OLT {
+ return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", oldPort.Type)
+ }
+
+ newPort := *oldPort
+ newPort.AdminState = voltha.AdminState_ENABLED
+ if err := portHandle.Update(ctx, &newPort); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
- var cp *voltha.Port
- // Get the most up to date the device info
- device := agent.getDeviceWithoutLock()
- for _, port := range device.Ports {
- if port.PortNo == Port.PortNo {
- port.AdminState = voltha.AdminState_ENABLED
- cp = proto.Clone(port).(*voltha.Port)
- break
- }
- }
-
- if cp == nil {
- return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
- }
-
- if cp.Type != voltha.Port_PON_OLT {
- return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", cp.Type)
- }
- // Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
- logger.Debugw(ctx, "updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
- return err
- }
//send request to adapter
+ device, err := agent.getDevice(ctx)
+ if err != nil {
+ return err
+ }
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.EnablePort(ctx, device, cp)
+ ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
if err != nil {
cancel()
return err