VOL-3121 - Separated ports from devices.

Similar to flows/groups/meters/logical ports.
Also added ListDevicePorts and GetDevicePort to the adapter API.
Also removed unused `// +build integration` tests.

Change-Id: I586adb9f46a249c9430d4205ef5db2d105dbbe06
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index b3de89d..5ac7aca 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -21,198 +21,229 @@
 	"fmt"
 
 	"github.com/gogo/protobuf/proto"
+	"github.com/opencord/voltha-go/rw_core/core/device/port"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
-// getPorts retrieves the ports information of the device based on the port type.
-func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
-	logger.Debugw(ctx, "getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
-	ports := &voltha.Ports{}
-	if device, _ := agent.deviceMgr.getDevice(ctx, agent.deviceID); device != nil {
-		for _, port := range device.Ports {
-			if port.Type == portType {
-				ports.Items = append(ports.Items, port)
-			}
+// listDevicePorts returns device ports
+func (agent *Agent) listDevicePorts() map[uint32]*voltha.Port {
+	portIDs := agent.portLoader.ListIDs()
+	ports := make(map[uint32]*voltha.Port, len(portIDs))
+	for portID := range portIDs {
+		if portHandle, have := agent.portLoader.Lock(portID); have {
+			ports[portID] = portHandle.GetReadOnly()
+			portHandle.Unlock()
 		}
 	}
 	return ports
 }
 
-func (agent *Agent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
+// getPorts retrieves the ports information of the device based on the port type.
+func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+	logger.Debugw(ctx, "getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
+	ports := &voltha.Ports{}
+	for _, port := range agent.listDevicePorts() {
+		if port.Type == portType {
+			ports.Items = append(ports.Items, port)
+		}
+	}
+	return ports
+}
+
+func (agent *Agent) getDevicePort(portID uint32) (*voltha.Port, error) {
+	portHandle, have := agent.portLoader.Lock(portID)
+	if !have {
+		return nil, status.Errorf(codes.NotFound, "port-%d", portID)
+	}
+	defer portHandle.Unlock()
+	return portHandle.GetReadOnly(), nil
+}
+
+func (agent *Agent) updatePortsOperState(ctx context.Context, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
 	logger.Debugw(ctx, "updatePortsOperState", log.Fields{"device-id": agent.deviceID})
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
+
+	for portID := range agent.portLoader.ListIDs() {
+		if portHandle, have := agent.portLoader.Lock(portID); have {
+			if oldPort := portHandle.GetReadOnly(); (1<<oldPort.Type)&portTypeFilter == 0 { // only update port types not included in the mask
+				// clone top-level port struct
+				newPort := *oldPort
+				newPort.OperStatus = operStatus
+				if err := portHandle.Update(ctx, &newPort); err != nil {
+					portHandle.Unlock()
+					return err
+				}
+
+				// Notify the logical device manager to change the port state
+				// Do this for NNI and UNIs only. PON ports are not known by logical device
+				if newPort.Type == voltha.Port_ETHERNET_NNI || newPort.Type == voltha.Port_ETHERNET_UNI {
+					go func(portID uint32, ctx context.Context) {
+						if err := agent.deviceMgr.logicalDeviceMgr.updatePortState(ctx, agent.deviceID, portID, operStatus); err != nil {
+							// TODO: VOL-2707
+							logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
+						}
+					}(portID, context.Background())
+				}
+			}
+			portHandle.Unlock()
+		}
 	}
-	defer agent.requestQueue.RequestComplete()
-	cloned := agent.getDeviceWithoutLock()
-	for _, port := range cloned.Ports {
-		port.OperStatus = operStatus
-	}
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+	return nil
 }
 
 func (agent *Agent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	// Work only on latest data
-	// TODO: Get list of ports from device directly instead of the entire device
-	cloned := agent.getDeviceWithoutLock()
-
 	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
 	if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
 		return status.Errorf(codes.InvalidArgument, "%s", portType)
 	}
-	for _, port := range cloned.Ports {
-		if port.Type == portType && port.PortNo == portNo {
-			port.OperStatus = operStatus
-		}
+
+	portHandle, have := agent.portLoader.Lock(portNo)
+	if !have {
+		return nil
 	}
-	logger.Debugw(ctx, "portStatusUpdate", log.Fields{"deviceId": cloned.Id})
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+	defer portHandle.Unlock()
+
+	port := portHandle.GetReadOnly()
+	if port.Type != portType {
+		return nil
+	}
+
+	newPort := *port // clone top-level port struct
+	newPort.OperStatus = operStatus
+	return portHandle.Update(ctx, &newPort)
 }
 
 func (agent *Agent) deleteAllPorts(ctx context.Context) error {
 	logger.Debugw(ctx, "deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+	device, err := agent.getDevice(ctx)
+	if err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
 
-	cloned := agent.getDeviceWithoutLock()
-
-	if cloned.AdminState != voltha.AdminState_DISABLED && cloned.AdminState != voltha.AdminState_DELETED {
-		err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", cloned.AdminState))
-		logger.Warnw(ctx, "invalid-state-removing-ports", log.Fields{"state": cloned.AdminState, "error": err})
+	if device.AdminState != voltha.AdminState_DISABLED && device.AdminState != voltha.AdminState_DELETED {
+		err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", device.AdminState))
+		logger.Warnw(ctx, "invalid-state-removing-ports", log.Fields{"state": device.AdminState, "error": err})
 		return err
 	}
-	if len(cloned.Ports) == 0 {
-		logger.Debugw(ctx, "no-ports-present", log.Fields{"deviceId": agent.deviceID})
-		return nil
-	}
 
-	cloned.Ports = []*voltha.Port{}
-	logger.Debugw(ctx, "portStatusUpdate", log.Fields{"deviceId": cloned.Id})
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+	for portID := range agent.portLoader.ListIDs() {
+		if portHandle, have := agent.portLoader.Lock(portID); have {
+			if err := portHandle.Delete(ctx); err != nil {
+				portHandle.Unlock()
+				return err
+			}
+			portHandle.Unlock()
+		}
+	}
+	return nil
 }
 
 func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
 	logger.Debugw(ctx, "addPort", log.Fields{"deviceId": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
-	updatePort := false
-	if cloned.Ports == nil {
-		//	First port
-		logger.Debugw(ctx, "addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceID})
-		cloned.Ports = make([]*voltha.Port, 0)
-	} else {
-		for _, p := range cloned.Ports {
-			if p.Type == port.Type && p.PortNo == port.PortNo {
-				if p.Label == "" && p.Type == voltha.Port_PON_OLT {
-					//Creation of OLT PON port is being processed after a default PON port was created.  Just update it.
-					logger.Infow(ctx, "update-pon-port-created-by-default", log.Fields{"default-port": p, "port-to-add": port})
-					p.Label = port.Label
-					p.OperStatus = port.OperStatus
-					updatePort = true
-					break
-				}
-				logger.Debugw(ctx, "port already exists", log.Fields{"port": port})
-				return nil
-			}
-		}
+	port.AdminState = voltha.AdminState_ENABLED
+
+	portHandle, created, err := agent.portLoader.LockOrCreate(ctx, port)
+	if err != nil {
+		return err
 	}
-	if !updatePort {
-		cp := proto.Clone(port).(*voltha.Port)
-		// Set the admin state of the port to ENABLE
-		cp.AdminState = voltha.AdminState_ENABLED
-		cloned.Ports = append(cloned.Ports, cp)
+	defer portHandle.Unlock()
+
+	if created {
+		return nil
 	}
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+
+	oldPort := portHandle.GetReadOnly()
+	if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
+		logger.Debugw(ctx, "port already exists", log.Fields{"port": port})
+		return nil
+	}
+
+	// Creation of OLT PON port is being processed after a default PON port was created.  Just update it.
+	logger.Infow(ctx, "update-pon-port-created-by-default", log.Fields{"default-port": oldPort, "port-to-add": port})
+	newPort := *oldPort // clone top-level port struct
+	newPort.Label = port.Label
+	newPort.OperStatus = port.OperStatus
+
+	return portHandle.Update(ctx, &newPort)
 }
 
 func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
 	logger.Debugw(ctx, "adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
 
-	cloned := agent.getDeviceWithoutLock()
-
-	// Get the peer port on the device based on the peerPort no
-	found := false
-	for _, port := range cloned.Ports {
-		if port.PortNo == peerPort.PortNo { // found peerPort
-			cp := proto.Clone(peerPort).(*voltha.Port_PeerPort)
-			port.Peers = append(port.Peers, cp)
-			logger.Debugw(ctx, "found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
-			found = true
-			break
-		}
-	}
-	if !found && agent.isRootdevice {
-		// An ONU PON port has been created before the corresponding creation of the OLT PON port.  Create the OLT PON port
-		// with default values which will be updated once the OLT PON port creation is processed.
+	var portHandle *port.Handle
+	if agent.isRootDevice {
+		// If an ONU PON port needs to be referenced before the corresponding creation of the OLT PON port, then create the OLT PON port
+		// with default values, and update it later when the OLT PON port creation is processed.
 		ponPort := &voltha.Port{
 			PortNo:     peerPort.PortNo,
 			Type:       voltha.Port_PON_OLT,
 			AdminState: voltha.AdminState_ENABLED,
 			DeviceId:   agent.deviceID,
-			Peers:      []*voltha.Port_PeerPort{proto.Clone(peerPort).(*voltha.Port_PeerPort)},
+			Peers:      []*voltha.Port_PeerPort{peerPort},
 		}
-		cloned.Ports = append(cloned.Ports, ponPort)
-		logger.Infow(ctx, "adding-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
+
+		h, created, err := agent.portLoader.LockOrCreate(ctx, ponPort)
+		if err != nil {
+			return err
+		}
+		defer h.Unlock()
+
+		if created {
+			logger.Infow(ctx, "added-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
+			return nil
+		}
+
+		portHandle = h
+	} else {
+		h, have := agent.portLoader.Lock(peerPort.PortNo)
+		if !have {
+			return nil
+		}
+		defer h.Unlock()
+
+		portHandle = h
 	}
 
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+	logger.Debugw(ctx, "found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
+
+	newPort := proto.Clone(portHandle.GetReadOnly()).(*voltha.Port)
+	newPort.Peers = append(newPort.Peers, peerPort)
+
+	return portHandle.Update(ctx, newPort)
 }
 
-func (agent *Agent) disablePort(ctx context.Context, Port *voltha.Port) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw(ctx, "disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
-	var cp *voltha.Port
-	// Get the most up to date the device info
-	device := agent.getDeviceWithoutLock()
-	for _, port := range device.Ports {
-		if port.PortNo == Port.PortNo {
-			port.AdminState = voltha.AdminState_DISABLED
-			cp = proto.Clone(port).(*voltha.Port)
-			break
+func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
+	logger.Debugw(ctx, "disablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
 
-		}
+	portHandle, have := agent.portLoader.Lock(portID)
+	if !have {
+		return status.Errorf(codes.InvalidArgument, "%v", portID)
 	}
-	if cp == nil {
-		return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+	defer portHandle.Unlock()
+
+	oldPort := portHandle.GetReadOnly()
+
+	if oldPort.Type != voltha.Port_PON_OLT {
+		return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", oldPort.Type)
 	}
 
-	if cp.Type != voltha.Port_PON_OLT {
-		return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", cp.Type)
-	}
-	// Store the device
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
-		logger.Debugw(ctx, "updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+	newPort := *oldPort
+	newPort.AdminState = voltha.AdminState_DISABLED
+	if err := portHandle.Update(ctx, &newPort); err != nil {
 		return err
 	}
 
 	//send request to adapter
+	device, err := agent.getDevice(ctx)
+	if err != nil {
+		return err
+	}
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.DisablePort(ctx, device, cp)
+	ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
 	if err != nil {
 		cancel()
 		return err
@@ -221,39 +252,34 @@
 	return nil
 }
 
-func (agent *Agent) enablePort(ctx context.Context, Port *voltha.Port) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
+	logger.Debugw(ctx, "enablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+
+	portHandle, have := agent.portLoader.Lock(portID)
+	if !have {
+		return status.Errorf(codes.InvalidArgument, "%v", portID)
+	}
+	defer portHandle.Unlock()
+
+	oldPort := portHandle.GetReadOnly()
+
+	if oldPort.Type != voltha.Port_PON_OLT {
+		return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", oldPort.Type)
+	}
+
+	newPort := *oldPort
+	newPort.AdminState = voltha.AdminState_ENABLED
+	if err := portHandle.Update(ctx, &newPort); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw(ctx, "enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
 
-	var cp *voltha.Port
-	// Get the most up to date the device info
-	device := agent.getDeviceWithoutLock()
-	for _, port := range device.Ports {
-		if port.PortNo == Port.PortNo {
-			port.AdminState = voltha.AdminState_ENABLED
-			cp = proto.Clone(port).(*voltha.Port)
-			break
-		}
-	}
-
-	if cp == nil {
-		return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
-	}
-
-	if cp.Type != voltha.Port_PON_OLT {
-		return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", cp.Type)
-	}
-	// Store the device
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
-		logger.Debugw(ctx, "updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
-		return err
-	}
 	//send request to adapter
+	device, err := agent.getDevice(ctx)
+	if err != nil {
+		return err
+	}
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.EnablePort(ctx, device, cp)
+	ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
 	if err != nil {
 		cancel()
 		return err