VOL-3121 - Separated ports from devices.
Similar to flows/groups/meters/logical ports.
Also added ListDevicePorts and GetDevicePort to the adapter API.
Also removed unused `// +build integration` tests.
Change-Id: I586adb9f46a249c9430d4205ef5db2d105dbbe06
diff --git a/rw_core/core/api/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
index d1e655e..79093a7 100644
--- a/rw_core/core/api/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -19,6 +19,7 @@
import (
"context"
"errors"
+
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -455,6 +456,7 @@
return nil, err
}
deviceID := &voltha.ID{}
+ portTypeFilter := &ic.IntType{}
operStatus := &ic.IntType{}
transactionID := &ic.StrType{}
for _, arg := range args {
@@ -464,6 +466,11 @@
logger.Warnw(ctx, "cannot-unmarshal-device-id", log.Fields{"error": err})
return nil, err
}
+ case "port_type_filter":
+ if err := ptypes.UnmarshalAny(arg.Value, portTypeFilter); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
case "oper_status":
if err := ptypes.UnmarshalAny(arg.Value, operStatus); err != nil {
logger.Warnw(ctx, "cannot-unmarshal-operStatus", log.Fields{"error": err})
@@ -478,7 +485,7 @@
}
logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"deviceID": deviceID.Id, "operStatus": operStatus, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.UpdatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val)); err != nil {
+ if err := rhp.deviceMgr.UpdatePortsState(context.TODO(), deviceID.Id, uint32(portTypeFilter.Val), voltha.OperStatus_Types(operStatus.Val)); err != nil {
logger.Debugw(ctx, "unable-to-update-ports-state", log.Fields{"error": err})
return nil, err
}
@@ -572,6 +579,68 @@
return &empty.Empty{}, nil
}
+// GetDevicePort returns a single port
+func (rhp *AdapterRequestHandlerProxy) GetDevicePort(ctx context.Context, args []*ic.Argument) (*voltha.Port, error) {
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ deviceID := &voltha.ID{}
+ portNo := &ic.IntType{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "port_no":
+ if err := ptypes.UnmarshalAny(arg.Value, portNo); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-port-no", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw(ctx, "GetDevicePort", log.Fields{"deviceID": deviceID.Id, "portNo": portNo.Val, "transactionID": transactionID.Val})
+
+ return rhp.deviceMgr.GetDevicePort(context.TODO(), deviceID.Id, uint32(portNo.Val))
+}
+
+// ListDevicePorts returns all ports belonging to the device
+func (rhp *AdapterRequestHandlerProxy) ListDevicePorts(ctx context.Context, args []*ic.Argument) (*voltha.Ports, error) {
+ if len(args) < 2 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ deviceID := &voltha.ID{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw(ctx, "ListDevicePorts", log.Fields{"deviceID": deviceID.Id, "transactionID": transactionID.Val})
+
+ return rhp.deviceMgr.ListDevicePorts(context.TODO(), deviceID)
+}
+
// ChildDevicesLost indicates that a parent device is in a state (Disabled) where it cannot manage the child devices.
// This will trigger the Core to disable all the child devices.
func (rhp *AdapterRequestHandlerProxy) ChildDevicesLost(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
diff --git a/rw_core/core/api/common_test.go b/rw_core/core/api/common_test.go
index 6fcd511..92ba701 100644
--- a/rw_core/core/api/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -38,6 +38,7 @@
type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
type isLogicalDevicePortsConditionSatisfied func(ports []*voltha.LogicalPort) bool
type isDeviceConditionSatisfied func(ld *voltha.Device) bool
+type isDevicePortsConditionSatisfied func(ports *voltha.Ports) bool
type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
type isConditionSatisfied func() bool
@@ -88,6 +89,36 @@
}
}
+func waitUntilDevicePortsReadiness(deviceID string,
+ timeout time.Duration,
+ verificationFunction isDevicePortsConditionSatisfied,
+ nbi *NBIHandler) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ ports, _ := nbi.ListDevicePorts(getContext(), &voltha.ID{Id: deviceID})
+ if verificationFunction(ports) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
+ }
+}
+
func waitUntilLogicalDeviceReadiness(oltDeviceID string,
timeout time.Duration,
nbi *NBIHandler,
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 434df90..6b37a14 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -205,6 +205,8 @@
// Now, verify the details of the device. First get the latest update
d, err := nbi.GetDevice(getContext(), &voltha.ID{Id: device.Id})
assert.Nil(t, err)
+ dPorts, err := nbi.ListDevicePorts(getContext(), &voltha.ID{Id: device.Id})
+ assert.Nil(t, err)
assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
@@ -227,8 +229,8 @@
} else {
assert.Error(t, errors.New("invalid-device-type"))
}
- assert.Equal(t, 2, len(d.Ports))
- for _, p := range d.Ports {
+ assert.Equal(t, 2, len(dPorts.Items))
+ for _, p := range dPorts.Items {
assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
@@ -635,8 +637,10 @@
oltDevice, err := nb.getADevice(true, nbi)
assert.Nil(t, err)
assert.NotNil(t, oltDevice)
+ oltPorts, err := nbi.ListDevicePorts(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
- for _, cp = range oltDevice.Ports {
+ for _, cp = range oltPorts.Items {
if cp.Type == voltha.Port_PON_OLT {
break
}
@@ -649,15 +653,15 @@
_, err = nbi.DisablePort(getContext(), cp)
assert.Nil(t, err)
// Wait for the olt device Port to be disabled
- var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
- for _, port := range device.Ports {
+ var vdFunction isDevicePortsConditionSatisfied = func(ports *voltha.Ports) bool {
+ for _, port := range ports.Items {
if port.PortNo == cp.PortNo {
return port.AdminState == voltha.AdminState_DISABLED
}
}
return false
}
- err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ err = waitUntilDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
assert.Nil(t, err)
// Wait for the logical device to satisfy the expected condition
var vlFunction = func(ports []*voltha.LogicalPort) bool {
@@ -677,15 +681,15 @@
assert.Nil(t, err)
// Wait for the olt device Port to be enabled
- vdFunction = func(device *voltha.Device) bool {
- for _, port := range device.Ports {
+ vdFunction = func(ports *voltha.Ports) bool {
+ for _, port := range ports.Items {
if port.PortNo == cp.PortNo {
return port.AdminState == voltha.AdminState_ENABLED
}
}
return false
}
- err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ err = waitUntilDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
assert.Nil(t, err)
// Wait for the logical device to satisfy the expected condition
vlFunction = func(ports []*voltha.LogicalPort) bool {
@@ -701,7 +705,7 @@
assert.Nil(t, err)
// Disable a non-PON port
- for _, cp = range oltDevice.Ports {
+ for _, cp = range oltPorts.Items {
if cp.Type != voltha.Port_PON_OLT {
break
}
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 2b18296..9fbeb9d 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -29,6 +29,7 @@
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/core/device/group"
+ "github.com/opencord/voltha-go/rw_core/core/device/port"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -48,7 +49,7 @@
deviceID string
parentID string
deviceType string
- isRootdevice bool
+ isRootDevice bool
adapterProxy *remote.AdapterProxy
adapterMgr *adapter.Manager
deviceMgr *Manager
@@ -63,32 +64,33 @@
flowLoader *flow.Loader
groupLoader *group.Loader
+ portLoader *port.Loader
}
//newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbProxy *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
- var agent Agent
- agent.adapterProxy = ap
- if device.Id == "" {
- agent.deviceID = coreutils.CreateDeviceID()
- } else {
- agent.deviceID = device.Id
+func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
+ deviceID := device.Id
+ if deviceID == "" {
+ deviceID = coreutils.CreateDeviceID()
}
- agent.isRootdevice = device.Root
- agent.parentID = device.ParentId
- agent.deviceType = device.Type
- agent.deviceMgr = deviceMgr
- agent.adapterMgr = deviceMgr.adapterMgr
- agent.exitChannel = make(chan int, 1)
- agent.dbProxy = deviceProxy
- agent.defaultTimeout = timeout
- agent.device = proto.Clone(device).(*voltha.Device)
- agent.requestQueue = coreutils.NewRequestQueue()
- agent.flowLoader = flow.NewLoader(dbProxy.SubPath("flows").Proxy(device.Id))
- agent.groupLoader = group.NewLoader(dbProxy.SubPath("groups").Proxy(device.Id))
-
- return &agent
+ return &Agent{
+ deviceID: deviceID,
+ adapterProxy: ap,
+ isRootDevice: device.Root,
+ parentID: device.ParentId,
+ deviceType: device.Type,
+ deviceMgr: deviceMgr,
+ adapterMgr: deviceMgr.adapterMgr,
+ exitChannel: make(chan int, 1),
+ dbProxy: deviceProxy,
+ defaultTimeout: timeout,
+ device: proto.Clone(device).(*voltha.Device),
+ requestQueue: coreutils.NewRequestQueue(),
+ flowLoader: flow.NewLoader(dbPath.SubPath("flows").Proxy(deviceID)),
+ groupLoader: group.NewLoader(dbPath.SubPath("groups").Proxy(deviceID)),
+ portLoader: port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
+ }
}
// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
@@ -124,6 +126,7 @@
// load the flows and groups from KV to cache
agent.flowLoader.Load(ctx)
agent.groupLoader.Load(ctx)
+ agent.portLoader.Load(ctx)
logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
} else {
@@ -134,8 +137,6 @@
device = (proto.Clone(deviceToCreate)).(*voltha.Device)
device.Id = agent.deviceID
device.AdminState = voltha.AdminState_PREPROVISIONED
- device.FlowGroups = &ofp.FlowGroups{Items: nil}
- device.Flows = &ofp.Flows{Items: nil}
if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
// Set the default vlan ID to the one specified by the parent adapter. It can be
// overwritten by the child adapter during a device update request
@@ -202,6 +203,7 @@
agent.device = device
agent.flowLoader.Load(ctx)
agent.groupLoader.Load(ctx)
+ agent.portLoader.Load(ctx)
logger.Debugw(ctx, "reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}
@@ -714,27 +716,26 @@
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-ud": agent.deviceID})
- //Remove the associated peer ports on the parent device
- parentDevice := agent.getDeviceWithoutLock()
- var updatedPeers []*voltha.Port_PeerPort
- for _, port := range parentDevice.Ports {
- updatedPeers = make([]*voltha.Port_PeerPort, 0)
- for _, peerPort := range port.Peers {
- if peerPort.DeviceId != device.Id {
- updatedPeers = append(updatedPeers, peerPort)
+ // Remove the associated peer ports on the parent device
+ for portID := range agent.portLoader.ListIDs() {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ oldPort := portHandle.GetReadOnly()
+ updatedPeers := make([]*voltha.Port_PeerPort, 0)
+ for _, peerPort := range oldPort.Peers {
+ if peerPort.DeviceId != device.Id {
+ updatedPeers = append(updatedPeers, peerPort)
+ }
}
+ newPort := *oldPort
+ newPort.Peers = updatedPeers
+ if err := portHandle.Update(ctx, &newPort); err != nil {
+ portHandle.Unlock()
+ return nil
+ }
+ portHandle.Unlock()
}
- port.Peers = updatedPeers
- }
- if err := agent.updateDeviceInStoreWithoutLock(ctx, parentDevice, false, ""); err != nil {
- return err
}
//send request to adapter
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index b3de89d..5ac7aca 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -21,198 +21,229 @@
"fmt"
"github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-go/rw_core/core/device/port"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
-// getPorts retrieves the ports information of the device based on the port type.
-func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
- logger.Debugw(ctx, "getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
- ports := &voltha.Ports{}
- if device, _ := agent.deviceMgr.getDevice(ctx, agent.deviceID); device != nil {
- for _, port := range device.Ports {
- if port.Type == portType {
- ports.Items = append(ports.Items, port)
- }
+// listDevicePorts returns device ports
+func (agent *Agent) listDevicePorts() map[uint32]*voltha.Port {
+ portIDs := agent.portLoader.ListIDs()
+ ports := make(map[uint32]*voltha.Port, len(portIDs))
+ for portID := range portIDs {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ ports[portID] = portHandle.GetReadOnly()
+ portHandle.Unlock()
}
}
return ports
}
-func (agent *Agent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
+// getPorts retrieves the ports information of the device based on the port type.
+func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+ logger.Debugw(ctx, "getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
+ ports := &voltha.Ports{}
+ for _, port := range agent.listDevicePorts() {
+ if port.Type == portType {
+ ports.Items = append(ports.Items, port)
+ }
+ }
+ return ports
+}
+
+func (agent *Agent) getDevicePort(portID uint32) (*voltha.Port, error) {
+ portHandle, have := agent.portLoader.Lock(portID)
+ if !have {
+ return nil, status.Errorf(codes.NotFound, "port-%d", portID)
+ }
+ defer portHandle.Unlock()
+ return portHandle.GetReadOnly(), nil
+}
+
+func (agent *Agent) updatePortsOperState(ctx context.Context, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
logger.Debugw(ctx, "updatePortsOperState", log.Fields{"device-id": agent.deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
+
+ for portID := range agent.portLoader.ListIDs() {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ if oldPort := portHandle.GetReadOnly(); (1<<oldPort.Type)&portTypeFilter == 0 { // only update port types not included in the mask
+ // clone top-level port struct
+ newPort := *oldPort
+ newPort.OperStatus = operStatus
+ if err := portHandle.Update(ctx, &newPort); err != nil {
+ portHandle.Unlock()
+ return err
+ }
+
+ // Notify the logical device manager to change the port state
+ // Do this for NNI and UNIs only. PON ports are not known by logical device
+ if newPort.Type == voltha.Port_ETHERNET_NNI || newPort.Type == voltha.Port_ETHERNET_UNI {
+ go func(portID uint32, ctx context.Context) {
+ if err := agent.deviceMgr.logicalDeviceMgr.updatePortState(ctx, agent.deviceID, portID, operStatus); err != nil {
+ // TODO: VOL-2707
+ logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
+ }
+ }(portID, context.Background())
+ }
+ }
+ portHandle.Unlock()
+ }
}
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
- for _, port := range cloned.Ports {
- port.OperStatus = operStatus
- }
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ return nil
}
func (agent *Agent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Work only on latest data
- // TODO: Get list of ports from device directly instead of the entire device
- cloned := agent.getDeviceWithoutLock()
-
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
return status.Errorf(codes.InvalidArgument, "%s", portType)
}
- for _, port := range cloned.Ports {
- if port.Type == portType && port.PortNo == portNo {
- port.OperStatus = operStatus
- }
+
+ portHandle, have := agent.portLoader.Lock(portNo)
+ if !have {
+ return nil
}
- logger.Debugw(ctx, "portStatusUpdate", log.Fields{"deviceId": cloned.Id})
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ defer portHandle.Unlock()
+
+ port := portHandle.GetReadOnly()
+ if port.Type != portType {
+ return nil
+ }
+
+ newPort := *port // clone top-level port struct
+ newPort.OperStatus = operStatus
+ return portHandle.Update(ctx, &newPort)
}
func (agent *Agent) deleteAllPorts(ctx context.Context) error {
logger.Debugw(ctx, "deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+ device, err := agent.getDevice(ctx)
+ if err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
-
- if cloned.AdminState != voltha.AdminState_DISABLED && cloned.AdminState != voltha.AdminState_DELETED {
- err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", cloned.AdminState))
- logger.Warnw(ctx, "invalid-state-removing-ports", log.Fields{"state": cloned.AdminState, "error": err})
+ if device.AdminState != voltha.AdminState_DISABLED && device.AdminState != voltha.AdminState_DELETED {
+ err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", device.AdminState))
+ logger.Warnw(ctx, "invalid-state-removing-ports", log.Fields{"state": device.AdminState, "error": err})
return err
}
- if len(cloned.Ports) == 0 {
- logger.Debugw(ctx, "no-ports-present", log.Fields{"deviceId": agent.deviceID})
- return nil
- }
- cloned.Ports = []*voltha.Port{}
- logger.Debugw(ctx, "portStatusUpdate", log.Fields{"deviceId": cloned.Id})
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ for portID := range agent.portLoader.ListIDs() {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ if err := portHandle.Delete(ctx); err != nil {
+ portHandle.Unlock()
+ return err
+ }
+ portHandle.Unlock()
+ }
+ }
+ return nil
}
func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "addPort", log.Fields{"deviceId": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
- updatePort := false
- if cloned.Ports == nil {
- // First port
- logger.Debugw(ctx, "addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceID})
- cloned.Ports = make([]*voltha.Port, 0)
- } else {
- for _, p := range cloned.Ports {
- if p.Type == port.Type && p.PortNo == port.PortNo {
- if p.Label == "" && p.Type == voltha.Port_PON_OLT {
- //Creation of OLT PON port is being processed after a default PON port was created. Just update it.
- logger.Infow(ctx, "update-pon-port-created-by-default", log.Fields{"default-port": p, "port-to-add": port})
- p.Label = port.Label
- p.OperStatus = port.OperStatus
- updatePort = true
- break
- }
- logger.Debugw(ctx, "port already exists", log.Fields{"port": port})
- return nil
- }
- }
+ port.AdminState = voltha.AdminState_ENABLED
+
+ portHandle, created, err := agent.portLoader.LockOrCreate(ctx, port)
+ if err != nil {
+ return err
}
- if !updatePort {
- cp := proto.Clone(port).(*voltha.Port)
- // Set the admin state of the port to ENABLE
- cp.AdminState = voltha.AdminState_ENABLED
- cloned.Ports = append(cloned.Ports, cp)
+ defer portHandle.Unlock()
+
+ if created {
+ return nil
}
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+
+ oldPort := portHandle.GetReadOnly()
+ if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
+ logger.Debugw(ctx, "port already exists", log.Fields{"port": port})
+ return nil
+ }
+
+ // Creation of OLT PON port is being processed after a default PON port was created. Just update it.
+ logger.Infow(ctx, "update-pon-port-created-by-default", log.Fields{"default-port": oldPort, "port-to-add": port})
+ newPort := *oldPort // clone top-level port struct
+ newPort.Label = port.Label
+ newPort.OperStatus = port.OperStatus
+
+ return portHandle.Update(ctx, &newPort)
}
func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
- cloned := agent.getDeviceWithoutLock()
-
- // Get the peer port on the device based on the peerPort no
- found := false
- for _, port := range cloned.Ports {
- if port.PortNo == peerPort.PortNo { // found peerPort
- cp := proto.Clone(peerPort).(*voltha.Port_PeerPort)
- port.Peers = append(port.Peers, cp)
- logger.Debugw(ctx, "found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
- found = true
- break
- }
- }
- if !found && agent.isRootdevice {
- // An ONU PON port has been created before the corresponding creation of the OLT PON port. Create the OLT PON port
- // with default values which will be updated once the OLT PON port creation is processed.
+ var portHandle *port.Handle
+ if agent.isRootDevice {
+ // If an ONU PON port needs to be referenced before the corresponding creation of the OLT PON port, then create the OLT PON port
+ // with default values, and update it later when the OLT PON port creation is processed.
ponPort := &voltha.Port{
PortNo: peerPort.PortNo,
Type: voltha.Port_PON_OLT,
AdminState: voltha.AdminState_ENABLED,
DeviceId: agent.deviceID,
- Peers: []*voltha.Port_PeerPort{proto.Clone(peerPort).(*voltha.Port_PeerPort)},
+ Peers: []*voltha.Port_PeerPort{peerPort},
}
- cloned.Ports = append(cloned.Ports, ponPort)
- logger.Infow(ctx, "adding-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
+
+ h, created, err := agent.portLoader.LockOrCreate(ctx, ponPort)
+ if err != nil {
+ return err
+ }
+ defer h.Unlock()
+
+ if created {
+ logger.Infow(ctx, "added-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
+ return nil
+ }
+
+ portHandle = h
+ } else {
+ h, have := agent.portLoader.Lock(peerPort.PortNo)
+ if !have {
+ return nil
+ }
+ defer h.Unlock()
+
+ portHandle = h
}
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ logger.Debugw(ctx, "found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
+
+ newPort := proto.Clone(portHandle.GetReadOnly()).(*voltha.Port)
+ newPort.Peers = append(newPort.Peers, peerPort)
+
+ return portHandle.Update(ctx, newPort)
}
-func (agent *Agent) disablePort(ctx context.Context, Port *voltha.Port) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
- var cp *voltha.Port
- // Get the most up to date the device info
- device := agent.getDeviceWithoutLock()
- for _, port := range device.Ports {
- if port.PortNo == Port.PortNo {
- port.AdminState = voltha.AdminState_DISABLED
- cp = proto.Clone(port).(*voltha.Port)
- break
+func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
+ logger.Debugw(ctx, "disablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
- }
+ portHandle, have := agent.portLoader.Lock(portID)
+ if !have {
+ return status.Errorf(codes.InvalidArgument, "%v", portID)
}
- if cp == nil {
- return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+ defer portHandle.Unlock()
+
+ oldPort := portHandle.GetReadOnly()
+
+ if oldPort.Type != voltha.Port_PON_OLT {
+ return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", oldPort.Type)
}
- if cp.Type != voltha.Port_PON_OLT {
- return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", cp.Type)
- }
- // Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
- logger.Debugw(ctx, "updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ newPort := *oldPort
+ newPort.AdminState = voltha.AdminState_DISABLED
+ if err := portHandle.Update(ctx, &newPort); err != nil {
return err
}
//send request to adapter
+ device, err := agent.getDevice(ctx)
+ if err != nil {
+ return err
+ }
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.DisablePort(ctx, device, cp)
+ ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
if err != nil {
cancel()
return err
@@ -221,39 +252,34 @@
return nil
}
-func (agent *Agent) enablePort(ctx context.Context, Port *voltha.Port) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
+ logger.Debugw(ctx, "enablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+
+ portHandle, have := agent.portLoader.Lock(portID)
+ if !have {
+ return status.Errorf(codes.InvalidArgument, "%v", portID)
+ }
+ defer portHandle.Unlock()
+
+ oldPort := portHandle.GetReadOnly()
+
+ if oldPort.Type != voltha.Port_PON_OLT {
+ return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", oldPort.Type)
+ }
+
+ newPort := *oldPort
+ newPort.AdminState = voltha.AdminState_ENABLED
+ if err := portHandle.Update(ctx, &newPort); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
- var cp *voltha.Port
- // Get the most up to date the device info
- device := agent.getDeviceWithoutLock()
- for _, port := range device.Ports {
- if port.PortNo == Port.PortNo {
- port.AdminState = voltha.AdminState_ENABLED
- cp = proto.Clone(port).(*voltha.Port)
- break
- }
- }
-
- if cp == nil {
- return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
- }
-
- if cp.Type != voltha.Port_PON_OLT {
- return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", cp.Type)
- }
- // Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
- logger.Debugw(ctx, "updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
- return err
- }
//send request to adapter
+ device, err := agent.getDevice(ctx)
+ if err != nil {
+ return err
+ }
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.EnablePort(ctx, device, cp)
+ ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
if err != nil {
cancel()
return err
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index e9bd663..92977ac 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -60,6 +60,7 @@
defaultTimeout time.Duration
maxTimeout time.Duration
device *voltha.Device
+ devicePorts map[uint32]*voltha.Port
done chan int
}
@@ -101,12 +102,12 @@
Reason: "All good",
ConnectStatus: voltha.ConnectStatus_UNKNOWN,
Custom: nil,
- Ports: []*voltha.Port{
- {PortNo: 1, Label: "pon-1", Type: voltha.Port_PON_ONU, AdminState: voltha.AdminState_ENABLED,
- OperStatus: voltha.OperStatus_ACTIVE, Peers: []*voltha.Port_PeerPort{{DeviceId: parentID, PortNo: 1}}},
- {PortNo: 100, Label: "uni-100", Type: voltha.Port_ETHERNET_UNI, AdminState: voltha.AdminState_ENABLED,
- OperStatus: voltha.OperStatus_ACTIVE},
- },
+ }
+ test.devicePorts = map[uint32]*voltha.Port{
+ 1: {PortNo: 1, Label: "pon-1", Type: voltha.Port_PON_ONU, AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE, Peers: []*voltha.Port_PeerPort{{DeviceId: parentID, PortNo: 1}}},
+ 100: {PortNo: 100, Label: "uni-100", Type: voltha.Port_ETHERNET_UNI, AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE},
}
return test
}
@@ -168,12 +169,17 @@
d, err := deviceAgent.start(context.TODO(), clonedDevice)
assert.Nil(t, err)
assert.NotNil(t, d)
+ for _, port := range dat.devicePorts {
+ err := deviceAgent.addPort(context.TODO(), port)
+ assert.Nil(t, err)
+ }
deviceMgr.addDeviceAgentToMap(deviceAgent)
return deviceAgent
}
func (dat *DATest) updateDeviceConcurrently(t *testing.T, da *Agent, globalWG *sync.WaitGroup) {
originalDevice, err := da.getDevice(context.Background())
+ originalDevicePorts := da.listDevicePorts()
assert.Nil(t, err)
assert.NotNil(t, originalDevice)
var localWG sync.WaitGroup
@@ -227,7 +233,6 @@
expectedChange := proto.Clone(originalDevice).(*voltha.Device)
expectedChange.OperStatus = voltha.OperStatus_ACTIVE
expectedChange.ConnectStatus = voltha.ConnectStatus_REACHABLE
- expectedChange.Ports = append(expectedChange.Ports, portToAdd)
expectedChange.Root = root
expectedChange.Vendor = vendor
expectedChange.Model = model
@@ -237,8 +242,11 @@
expectedChange.Reason = reason
updatedDevice, _ := da.getDevice(context.Background())
+ updatedDevicePorts := da.listDevicePorts()
assert.NotNil(t, updatedDevice)
assert.True(t, proto.Equal(expectedChange, updatedDevice))
+ assert.Equal(t, len(originalDevicePorts)+1, len(updatedDevicePorts))
+ assert.True(t, proto.Equal(updatedDevicePorts[portToAdd.PortNo], portToAdd))
globalWG.Done()
}
diff --git a/rw_core/core/device/flow/loader_test.go b/rw_core/core/device/flow/loader_test.go
index 8973f12..958124d 100644
--- a/rw_core/core/device/flow/loader_test.go
+++ b/rw_core/core/device/flow/loader_test.go
@@ -27,14 +27,14 @@
// TestLoadersIdentical ensures that the group, flow, and meter loaders always have an identical implementation.
func TestLoadersIdentical(t *testing.T) {
- types := []string{"flow", "group", "meter", "logical_port"}
+ types := []string{"flow", "group", "meter", "port", "logical_port"}
identical := [][]string{
- {`ofp\.OfpFlowStats`, `ofp\.OfpGroupEntry`, `ofp\.OfpMeterEntry`, `voltha\.LogicalPort`},
- {`\.Id`, `\.Desc\.GroupId`, `\.Config\.MeterId`, `\.OfpPort\.PortNo`},
- {`uint64`, `uint32`, `uint32`, `uint32`},
- {`Flow`, `Group`, `Meter`, `Port`},
- {`flow`, `group`, `meter`, `port|logical_port`},
+ {`ofp\.OfpFlowStats`, `ofp\.OfpGroupEntry`, `ofp\.OfpMeterEntry`, `voltha\.Port`, `voltha\.LogicalPort`},
+ {`\.Id`, `\.Desc\.GroupId`, `\.Config\.MeterId`, `\.PortNo`, `\.OfpPort\.PortNo`},
+ {`uint64`, `uint32`, `uint32`, `uint32`, `uint32`},
+ {`Flow`, `Group`, `Meter`, `Port`, `Port`},
+ {`flow`, `group`, `meter`, `port`, `port|logical_port`},
}
regexes := make([][]*regexp.Regexp, len(identical[0]))
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 6a14e5a..07f85e8 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -66,13 +66,14 @@
func newLogicalAgent(ctx context.Context, id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
deviceMgr *Manager, dbProxy *model.Path, ldProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
- agent := &LogicalAgent{
+ return &LogicalAgent{
logicalDeviceID: id,
serialNumber: sn,
rootDeviceID: deviceID,
deviceMgr: deviceMgr,
ldProxy: ldProxy,
ldeviceMgr: ldeviceMgr,
+ deviceRoutes: route.NewDeviceRoutes(id, deviceID, deviceMgr.listDevicePorts),
flowDecomposer: fd.NewFlowDecomposer(deviceMgr),
defaultTimeout: defaultTimeout,
requestQueue: coreutils.NewRequestQueue(),
@@ -82,8 +83,6 @@
meterLoader: meter.NewLoader(dbProxy.SubPath("logical_meters").Proxy(id)),
portLoader: port.NewLoader(dbProxy.SubPath("logical_ports").Proxy(id)),
}
- agent.deviceRoutes = route.NewDeviceRoutes(ctx, agent.logicalDeviceID, agent.deviceMgr.getDevice)
- return agent
}
// start creates the logical device and add it to the data model
@@ -123,9 +122,6 @@
ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
logger.Debugw(ctx, "Switch-capability", log.Fields{"Desc": ld.Desc, "fromAd": switchCap.Desc})
ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
- ld.Flows = &ofp.Flows{Items: nil}
- ld.FlowGroups = &ofp.FlowGroups{Items: nil}
- ld.Ports = []*voltha.LogicalPort{}
// Save the logical device
if err := agent.ldProxy.Set(ctx, ld.Id, ld); err != nil {
@@ -160,6 +156,9 @@
// Update the last data
agent.logicalDevice = ld
+ // now that the root device is known, create DeviceRoutes with it
+ agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.rootDeviceID, agent.deviceMgr.listDevicePorts)
+
// load the flows, meters and groups from KV to cache
agent.flowLoader.Load(ctx)
agent.meterLoader.Load(ctx)
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 0229c18..40f058e 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -44,15 +44,15 @@
return ret
}
-func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
logger.Debugw(ctx, "updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
switch port.Type {
case voltha.Port_ETHERNET_NNI:
- if err := agent.addNNILogicalPort(ctx, device, port); err != nil {
+ if err := agent.addNNILogicalPort(ctx, device.Id, devicePorts, port); err != nil {
return err
}
case voltha.Port_ETHERNET_UNI:
- if err := agent.addUNILogicalPort(ctx, device, port); err != nil {
+ if err := agent.addUNILogicalPort(ctx, device.Id, device.AdminState, device.OperStatus, devicePorts, port); err != nil {
return err
}
case voltha.Port_PON_OLT:
@@ -60,16 +60,16 @@
go func() {
if err := agent.buildRoutes(context.Background()); err != nil {
// Not an error - temporary state
- logger.Infow(ctx, "failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
+ logger.Infow(ctx, "failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
}
}()
//fallthrough
case voltha.Port_PON_ONU:
// Add the routes corresponding to that child device
go func() {
- if err := agent.updateAllRoutes(context.Background(), device); err != nil {
+ if err := agent.updateAllRoutes(context.Background(), device.Id, devicePorts); err != nil {
// Not an error - temporary state
- logger.Infow(ctx, "failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
+ logger.Infow(ctx, "failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
}
}()
default:
@@ -99,13 +99,21 @@
for _, child := range children.Items {
response := coreutils.NewResponse()
responses = append(responses, response)
- go func(child *voltha.Device) {
- if err = agent.setupUNILogicalPorts(context.Background(), child); err != nil {
+ go func(ctx context.Context, child *voltha.Device) {
+ defer response.Done()
+
+ childPorts, err := agent.deviceMgr.listDevicePorts(ctx, child.Id)
+ if err != nil {
+ logger.Error(ctx, "setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
+ response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
+ return
+ }
+
+ if err = agent.setupUNILogicalPorts(ctx, child, childPorts); err != nil {
logger.Error(ctx, "setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
}
- response.Done()
- }(child)
+ }(context.Background(), child)
}
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
@@ -118,19 +126,18 @@
func (agent *LogicalAgent) setupNNILogicalPorts(ctx context.Context, deviceID string) error {
logger.Infow(ctx, "setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
- var err error
- var device *voltha.Device
- if device, err = agent.deviceMgr.getDevice(ctx, deviceID); err != nil {
- logger.Errorw(ctx, "error-retrieving-device", log.Fields{"error": err, "deviceId": deviceID})
+ devicePorts, err := agent.deviceMgr.listDevicePorts(ctx, deviceID)
+ if err != nil {
+ logger.Errorw(ctx, "error-retrieving-device-ports", log.Fields{"error": err, "deviceId": deviceID})
return err
}
//Get UNI port number
- for _, port := range device.Ports {
+ for _, port := range devicePorts {
if port.Type == voltha.Port_ETHERNET_NNI {
- if err = agent.addNNILogicalPort(ctx, device, port); err != nil {
- logger.Errorw(ctx, "error-adding-UNI-port", log.Fields{"error": err})
+ if err = agent.addNNILogicalPort(ctx, deviceID, devicePorts, port); err != nil {
+ logger.Errorw(ctx, "error-adding-NNI-port", log.Fields{"error": err})
}
}
}
@@ -155,25 +162,6 @@
return nil
}
-// updatePortsState updates the ports state related to the device
-func (agent *LogicalAgent) updatePortsState(ctx context.Context, deviceID string, state voltha.OperStatus_Types) error {
- logger.Infow(ctx, "updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-
- for portNo := range agent.portLoader.ListIDsForDevice(deviceID) {
- if portHandle, have := agent.portLoader.Lock(portNo); have {
- newPort := clonePortSetState(portHandle.GetReadOnly(), state)
- if err := portHandle.Update(ctx, newPort); err != nil {
- portHandle.Unlock()
- return err
- }
- agent.orderedEvents.send(ctx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_MODIFY, newPort.OfpPort)
-
- portHandle.Unlock()
- }
- }
- return nil
-}
-
func clonePortSetState(oldPort *voltha.LogicalPort, state voltha.OperStatus_Types) *voltha.LogicalPort {
newPort := *oldPort // only clone the struct(s) that will be changed
newOfpPort := *oldPort.OfpPort
@@ -190,14 +178,14 @@
}
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
-func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
logger.Infow(ctx, "setupUNILogicalPort", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
var err error
//Get UNI port number
- for _, port := range childDevice.Ports {
+ for _, port := range childDevicePorts {
if port.Type == voltha.Port_ETHERNET_UNI {
- if err = agent.addUNILogicalPort(ctx, childDevice, port); err != nil {
+ if err = agent.addUNILogicalPort(ctx, childDevice.Id, childDevice.AdminState, childDevice.OperStatus, childDevicePorts, port); err != nil {
logger.Errorw(ctx, "error-adding-UNI-port", log.Fields{"error": err})
}
}
@@ -312,7 +300,7 @@
// added and an error in case a valid error is encountered. If the port was successfully added it will return
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, deviceID string, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
logger.Debugw(ctx, "addNNILogicalPort", log.Fields{"logical-device-id": agent.logicalDeviceID, "nni-port": port})
label := fmt.Sprintf("nni-%d", port.PortNo)
@@ -322,7 +310,7 @@
ofpPort.Name = label
nniPort := &voltha.LogicalPort{
RootPort: true,
- DeviceId: device.Id,
+ DeviceId: deviceID,
Id: label,
DevicePortNo: port.PortNo,
OfpPort: &ofpPort,
@@ -346,7 +334,7 @@
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
// First setup the routes
- if err := agent.updateRoutes(context.Background(), device, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+ if err := agent.updateRoutes(context.Background(), deviceID, devicePorts, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": nniPort.OfpPort.PortNo, "error": err})
@@ -362,10 +350,10 @@
// added and an error in case a valid error is encountered. If the port was successfully added it will return
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) error {
+func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, deviceID string, deviceAdminState voltha.AdminState_Types, deviceOperStatus voltha.OperStatus_Types, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
logger.Debugw(ctx, "addUNILogicalPort", log.Fields{"port": port})
- if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
- logger.Infow(ctx, "device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
+ if deviceAdminState != voltha.AdminState_ENABLED || deviceOperStatus != voltha.OperStatus_ACTIVE {
+ logger.Infow(ctx, "device-not-ready", log.Fields{"deviceId": deviceID, "admin": deviceAdminState, "oper": deviceOperStatus})
return nil
}
ofpPort := *port.OfpPort
@@ -373,7 +361,7 @@
ofpPort.PortNo = port.PortNo
uniPort := &voltha.LogicalPort{
RootPort: false,
- DeviceId: childDevice.Id,
+ DeviceId: deviceID,
Id: port.Label,
DevicePortNo: port.PortNo,
OfpPort: &ofpPort,
@@ -397,7 +385,7 @@
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
// First setup the routes
- if err := agent.updateRoutes(context.Background(), childDevice, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+ if err := agent.updateRoutes(context.Background(), deviceID, devicePorts, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": uniPort.OfpPort.PortNo, "error": err})
diff --git a/rw_core/core/device/logical_agent_route.go b/rw_core/core/device/logical_agent_route.go
index 8043e54..62e8a5f 100644
--- a/rw_core/core/device/logical_agent_route.go
+++ b/rw_core/core/device/logical_agent_route.go
@@ -108,10 +108,10 @@
}
//updateRoutes updates the device routes
-func (agent *LogicalAgent) updateRoutes(ctx context.Context, device *voltha.Device, lp *voltha.LogicalPort, lps map[uint32]*voltha.LogicalPort) error {
- logger.Debugw(ctx, "updateRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": device.Id, "port:": lp})
+func (agent *LogicalAgent) updateRoutes(ctx context.Context, deviceID string, devicePorts map[uint32]*voltha.Port, lp *voltha.LogicalPort, lps map[uint32]*voltha.LogicalPort) error {
+ logger.Debugw(ctx, "updateRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": deviceID, "port:": lp})
- if err := agent.deviceRoutes.AddPort(ctx, lp, device, lps); err != nil {
+ if err := agent.deviceRoutes.AddPort(ctx, lp, deviceID, devicePorts, lps); err != nil {
return err
}
if err := agent.deviceRoutes.Print(ctx); err != nil {
@@ -121,10 +121,10 @@
}
//updateAllRoutes updates the device routes using all the logical ports on that device
-func (agent *LogicalAgent) updateAllRoutes(ctx context.Context, device *voltha.Device) error {
- logger.Debugw(ctx, "updateAllRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": device.Id, "ports-count": len(device.Ports)})
+func (agent *LogicalAgent) updateAllRoutes(ctx context.Context, deviceID string, devicePorts map[uint32]*voltha.Port) error {
+ logger.Debugw(ctx, "updateAllRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": deviceID, "ports-count": len(devicePorts)})
- if err := agent.deviceRoutes.AddAllPorts(ctx, device, agent.listLogicalDevicePorts(ctx)); err != nil {
+ if err := agent.deviceRoutes.AddAllPorts(ctx, deviceID, devicePorts, agent.listLogicalDevicePorts(ctx)); err != nil {
return err
}
if err := agent.deviceRoutes.Print(ctx); err != nil {
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 5cd72b4..89b3cbe 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -53,6 +53,7 @@
defaultTimeout time.Duration
maxTimeout time.Duration
logicalDevice *voltha.LogicalDevice
+ logicalPorts map[uint32]*voltha.LogicalPort
deviceIds []string
done chan int
}
@@ -89,42 +90,42 @@
ofp.OfpCapabilities_OFPC_GROUP_STATS),
},
RootDeviceId: test.deviceIds[0],
- Ports: []*voltha.LogicalPort{
- {
- Id: "1001",
- DeviceId: test.deviceIds[0],
- DevicePortNo: 1,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 1,
- Name: "port1",
- Config: 4,
- State: 4,
- },
+ }
+ test.logicalPorts = map[uint32]*voltha.LogicalPort{
+ 1: {
+ Id: "1001",
+ DeviceId: test.deviceIds[0],
+ DevicePortNo: 1,
+ RootPort: true,
+ OfpPort: &ofp.OfpPort{
+ PortNo: 1,
+ Name: "port1",
+ Config: 4,
+ State: 4,
},
- {
- Id: "1002",
- DeviceId: test.deviceIds[1],
- DevicePortNo: 2,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 2,
- Name: "port2",
- Config: 4,
- State: 4,
- },
+ },
+ 2: {
+ Id: "1002",
+ DeviceId: test.deviceIds[1],
+ DevicePortNo: 2,
+ RootPort: false,
+ OfpPort: &ofp.OfpPort{
+ PortNo: 2,
+ Name: "port2",
+ Config: 4,
+ State: 4,
},
- {
- Id: "1003",
- DeviceId: test.deviceIds[2],
- DevicePortNo: 3,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 3,
- Name: "port3",
- Config: 4,
- State: 4,
- },
+ },
+ 3: {
+ Id: "1003",
+ DeviceId: test.deviceIds[2],
+ DevicePortNo: 3,
+ RootPort: false,
+ OfpPort: &ofp.OfpPort{
+ PortNo: 3,
+ Name: "port3",
+ Config: 4,
+ State: 4,
},
},
}
@@ -184,14 +185,15 @@
clonedLD.DatapathId = rand.Uint64()
lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
- for _, port := range clonedLD.Ports {
- handle, created, err := lDeviceAgent.portLoader.LockOrCreate(context.Background(), port)
+ for _, port := range lda.logicalPorts {
+ clonedPort := proto.Clone(port).(*voltha.LogicalPort)
+ handle, created, err := lDeviceAgent.portLoader.LockOrCreate(context.Background(), clonedPort)
if err != nil {
panic(err)
}
handle.Unlock()
if !created {
- t.Errorf("port %d already exists", port.OfpPort.PortNo)
+ t.Errorf("port %d already exists", clonedPort.OfpPort.PortNo)
}
}
err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
@@ -201,14 +203,14 @@
}
func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalAgent, globalWG *sync.WaitGroup) {
- originalLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
- assert.NotNil(t, originalLogicalDevice)
+ originalLogicalPorts := ldAgent.listLogicalDevicePorts(context.Background())
+ assert.NotNil(t, originalLogicalPorts)
var localWG sync.WaitGroup
// Change the state of the first port to FAILED
localWG.Add(1)
go func() {
- err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
+ err := ldAgent.updatePortState(context.Background(), 1, voltha.OperStatus_FAILED)
assert.Nil(t, err)
localWG.Done()
}()
@@ -216,7 +218,7 @@
// Change the state of the second port to TESTING
localWG.Add(1)
go func() {
- err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
+ err := ldAgent.updatePortState(context.Background(), 2, voltha.OperStatus_TESTING)
assert.Nil(t, err)
localWG.Done()
}()
@@ -224,9 +226,9 @@
// Change the state of the third port to UNKNOWN and then back to ACTIVE
localWG.Add(1)
go func() {
- err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
+ err := ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_UNKNOWN)
assert.Nil(t, err)
- err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
+ err = ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_ACTIVE)
assert.Nil(t, err)
localWG.Done()
}()
@@ -262,17 +264,27 @@
meterHandle.Unlock()
}
- expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
- expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- expectedChange.Ports[0].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
- expectedChange.Ports[1].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
- expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+ expectedLogicalPorts := make(map[uint32]*voltha.LogicalPort)
+ for _, port := range originalLogicalPorts {
+ clonedPort := proto.Clone(port).(*voltha.LogicalPort)
+ switch clonedPort.OfpPort.PortNo {
+ case 1:
+ clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ case 2:
+ clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ case 3:
+ clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+ }
+ expectedLogicalPorts[clonedPort.OfpPort.PortNo] = clonedPort
+ }
- updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts(context.Background())
- for _, p := range expectedChange.Ports {
- assert.True(t, proto.Equal(p, updatedLogicalDevicePorts[p.DevicePortNo]))
+ updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts(ctx)
+ assert.Equal(t, len(expectedLogicalPorts), len(updatedLogicalDevicePorts))
+ for _, p := range updatedLogicalDevicePorts {
+ assert.True(t, proto.Equal(p, expectedLogicalPorts[p.OfpPort.PortNo]))
}
globalWG.Done()
}
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 4eb5ea7..cf2beed 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -348,7 +348,7 @@
// updateLogicalPort sets up a logical port on the logical device based on the device port
// information, if needed
-func (ldMgr *LogicalManager) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+func (ldMgr *LogicalManager) updateLogicalPort(ctx context.Context, device *voltha.Device, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
ldID, err := ldMgr.getLogicalDeviceID(ctx, device)
if err != nil || *ldID == "" {
// This is not an error as the logical device may not have been created at this time. In such a case,
@@ -356,7 +356,7 @@
return nil
}
if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
- if err := agent.updateLogicalPort(ctx, device, port); err != nil {
+ if err := agent.updateLogicalPort(ctx, device, devicePorts, port); err != nil {
return err
}
}
@@ -381,7 +381,7 @@
return nil
}
-func (ldMgr *LogicalManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+func (ldMgr *LogicalManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId, "current-data": childDevice})
// Sanity check
if childDevice.Root {
@@ -399,7 +399,7 @@
}
if agent := ldMgr.getLogicalDeviceAgent(ctx, logDeviceID); agent != nil {
- if err := agent.setupUNILogicalPorts(ctx, childDevice); err != nil {
+ if err := agent.setupUNILogicalPorts(ctx, childDevice, childDevicePorts); err != nil {
return err
}
}
@@ -442,24 +442,6 @@
return nil
}
-func (ldMgr *LogicalManager) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "updatePortsState", log.Fields{"deviceId": device.Id, "state": state, "current-data": device})
-
- var ldID *string
- var err error
- //Get the logical device Id for this device
- if ldID, err = ldMgr.getLogicalDeviceID(ctx, device); err != nil {
- logger.Warnw(ctx, "no-logical-device-found", log.Fields{"deviceId": device.Id, "error": err})
- return err
- }
- if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
- if err := agent.updatePortsState(ctx, device.Id, state); err != nil {
- return err
- }
- }
- return nil
-}
-
// UpdateLogicalDeviceFlowTable updates logical device flow table
func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
logger.Debugw(ctx, "UpdateLogicalDeviceFlowTable", log.Fields{"logicalDeviceId": flow.Id})
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 015c8a3..77da2f0 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -96,7 +96,7 @@
}
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
- dMgr.rootDevices[agent.deviceID] = agent.isRootdevice
+ dMgr.rootDevices[agent.deviceID] = agent.isRootDevice
}
@@ -211,14 +211,31 @@
return &empty.Empty{}, agent.deleteDevice(ctx)
}
+// GetDevicePort returns the port details for a specific device port entry
+func (dMgr *Manager) GetDevicePort(ctx context.Context, deviceID string, portID uint32) (*voltha.Port, error) {
+ logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID})
+ agent := dMgr.getDeviceAgent(ctx, deviceID)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "device-%s", deviceID)
+ }
+ return agent.getDevicePort(portID)
+}
+
// ListDevicePorts returns the ports details for a specific device entry
func (dMgr *Manager) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": id.Id})
- device, err := dMgr.getDevice(ctx, id.Id)
- if err != nil {
- return &voltha.Ports{}, err
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
}
- return &voltha.Ports{Items: device.Ports}, nil
+
+ ports := agent.listDevicePorts()
+ ctr, ret := 0, make([]*voltha.Port, len(ports))
+ for _, port := range ports {
+ ret[ctr] = port
+ ctr++
+ }
+ return &voltha.Ports{Items: ret}, nil
}
// ListDeviceFlows returns the flow details for a specific device entry
@@ -226,7 +243,7 @@
logger.Debugw(ctx, "ListDeviceFlows", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
- return &ofp.Flows{}, status.Errorf(codes.NotFound, "device-%s", id.Id)
+ return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
}
flows := agent.listDeviceFlows()
@@ -293,41 +310,46 @@
return nil, status.Errorf(codes.NotFound, "%s", id)
}
+func (dMgr *Manager) listDevicePorts(ctx context.Context, id string) (map[uint32]*voltha.Port, error) {
+ logger.Debugw(ctx, "listDevicePorts", log.Fields{"deviceid": id})
+ agent := dMgr.getDeviceAgent(ctx, id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+ }
+ return agent.listDevicePorts(), nil
+}
+
// GetChildDevice will return a device, either from memory or from the dB, if present
func (dMgr *Manager) GetChildDevice(ctx context.Context, parentDeviceID string, serialNumber string, onuID int64, parentPortNo int64) (*voltha.Device, error) {
logger.Debugw(ctx, "GetChildDevice", log.Fields{"parentDeviceid": parentDeviceID, "serialNumber": serialNumber,
"parentPortNo": parentPortNo, "onuId": onuID})
- var parentDevice *voltha.Device
- var err error
- if parentDevice, err = dMgr.getDevice(ctx, parentDeviceID); err != nil {
+ parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
+ if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
- var childDeviceIds []string
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
- return nil, status.Errorf(codes.Aborted, "%s", err.Error())
- }
+ childDeviceIds := dMgr.getAllChildDeviceIds(ctx, parentDevicePorts)
if len(childDeviceIds) == 0 {
- logger.Debugw(ctx, "no-child-devices", log.Fields{"parentDeviceId": parentDevice.Id, "serialNumber": serialNumber, "onuId": onuID})
+ logger.Debugw(ctx, "no-child-devices", log.Fields{"parentDeviceId": parentDeviceID, "serialNumber": serialNumber, "onuId": onuID})
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
var foundChildDevice *voltha.Device
- for _, childDeviceID := range childDeviceIds {
+ for childDeviceID := range childDeviceIds {
var found bool
if searchDevice, err := dMgr.getDevice(ctx, childDeviceID); err == nil {
foundOnuID := false
if searchDevice.ProxyAddress.OnuId == uint32(onuID) {
if searchDevice.ParentPortNo == uint32(parentPortNo) {
- logger.Debugw(ctx, "found-child-by-onuid", log.Fields{"parentDeviceId": parentDevice.Id, "onuId": onuID})
+ logger.Debugw(ctx, "found-child-by-onuid", log.Fields{"parentDeviceId": parentDeviceID, "onuId": onuID})
foundOnuID = true
}
}
foundSerialNumber := false
if searchDevice.SerialNumber == serialNumber {
- logger.Debugw(ctx, "found-child-by-serialnumber", log.Fields{"parentDeviceId": parentDevice.Id, "serialNumber": serialNumber})
+ logger.Debugw(ctx, "found-child-by-serialnumber", log.Fields{"parentDeviceId": parentDeviceID, "serialNumber": serialNumber})
foundSerialNumber = true
}
@@ -347,11 +369,11 @@
}
if foundChildDevice != nil {
- logger.Debugw(ctx, "child-device-found", log.Fields{"parentDeviceId": parentDevice.Id, "foundChildDevice": foundChildDevice})
+ logger.Debugw(ctx, "child-device-found", log.Fields{"parentDeviceId": parentDeviceID, "foundChildDevice": foundChildDevice})
return foundChildDevice, nil
}
- logger.Debugw(ctx, "child-device-not-found", log.Fields{"parentDeviceId": parentDevice.Id,
+ logger.Debugw(ctx, "child-device-not-found", log.Fields{"parentDeviceId": parentDeviceID,
"serialNumber": serialNumber, "onuId": onuID, "parentPortNo": parentPortNo})
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
@@ -360,22 +382,18 @@
func (dMgr *Manager) GetChildDeviceWithProxyAddress(ctx context.Context, proxyAddress *voltha.Device_ProxyAddress) (*voltha.Device, error) {
logger.Debugw(ctx, "GetChildDeviceWithProxyAddress", log.Fields{"proxyAddress": proxyAddress})
- var parentDevice *voltha.Device
- var err error
- if parentDevice, err = dMgr.getDevice(ctx, proxyAddress.DeviceId); err != nil {
+ parentDevicePorts, err := dMgr.listDevicePorts(ctx, proxyAddress.DeviceId)
+ if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
- var childDeviceIds []string
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
- return nil, status.Errorf(codes.Aborted, "%s", err.Error())
- }
+ childDeviceIds := dMgr.getAllChildDeviceIds(ctx, parentDevicePorts)
if len(childDeviceIds) == 0 {
- logger.Debugw(ctx, "no-child-devices", log.Fields{"parentDeviceId": parentDevice.Id})
+ logger.Debugw(ctx, "no-child-devices", log.Fields{"parentDeviceId": proxyAddress.DeviceId})
return nil, status.Errorf(codes.NotFound, "%s", proxyAddress)
}
var foundChildDevice *voltha.Device
- for _, childDeviceID := range childDeviceIds {
+ for childDeviceID := range childDeviceIds {
if searchDevice, err := dMgr.getDevice(ctx, childDeviceID); err == nil {
if searchDevice.ProxyAddress == proxyAddress {
foundChildDevice = searchDevice
@@ -522,30 +540,27 @@
}
// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
-func (dMgr *Manager) loadRootDeviceParentAndChildren(ctx context.Context, device *voltha.Device) error {
+func (dMgr *Manager) loadRootDeviceParentAndChildren(ctx context.Context, device *voltha.Device, devicePorts map[uint32]*voltha.Port) error {
logger.Debugw(ctx, "loading-parent-and-children", log.Fields{"deviceId": device.Id})
if device.Root {
// Scenario A
if device.ParentId != "" {
- // Load logical device if needed.
+ // Load logical device if needed.
if err := dMgr.logicalDeviceMgr.load(ctx, device.ParentId); err != nil {
logger.Warnw(ctx, "failure-loading-logical-device", log.Fields{"lDeviceId": device.ParentId})
}
} else {
logger.Debugw(ctx, "no-parent-to-load", log.Fields{"deviceId": device.Id})
}
- // Load all child devices, if needed
- if childDeviceIds, err := dMgr.getAllChildDeviceIds(ctx, device); err == nil {
- for _, childDeviceID := range childDeviceIds {
- if _, err := dMgr.loadDevice(ctx, childDeviceID); err != nil {
- logger.Warnw(ctx, "failure-loading-device", log.Fields{"deviceId": childDeviceID, "error": err})
- return err
- }
+ // Load all child devices, if needed
+ childDeviceIds := dMgr.getAllChildDeviceIds(ctx, devicePorts)
+ for childDeviceID := range childDeviceIds {
+ if _, err := dMgr.loadDevice(ctx, childDeviceID); err != nil {
+ logger.Warnw(ctx, "failure-loading-device", log.Fields{"deviceId": childDeviceID, "error": err})
+ return err
}
- logger.Debugw(ctx, "loaded-children", log.Fields{"deviceId": device.Id, "numChildren": len(childDeviceIds)})
- } else {
- logger.Debugw(ctx, "no-child-to-load", log.Fields{"deviceId": device.Id})
}
+ logger.Debugw(ctx, "loaded-children", log.Fields{"deviceId": device.Id, "numChildren": len(childDeviceIds)})
}
return nil
}
@@ -575,8 +590,10 @@
// Now we face two scenarios
if device.Root {
+ devicePorts := dAgent.listDevicePorts()
+
// Load all children as well as the parent of this device (logical_device)
- if err := dMgr.loadRootDeviceParentAndChildren(ctx, device); err != nil {
+ if err := dMgr.loadRootDeviceParentAndChildren(ctx, device, devicePorts); err != nil {
logger.Warnw(ctx, "failure-loading-device-parent-and-children", log.Fields{"deviceId": deviceID})
return err
}
@@ -656,8 +673,9 @@
logger.Debugw(ctx, "not-reconciling-root-device", log.Fields{"rootId": rootDevice.Id, "state": rootDevice.AdminState})
}
} else { // Should we be reconciling the root's children instead?
+ rootDevicePorts, _ := dMgr.listDevicePorts(ctx, rootDeviceID)
childManagedByAdapter:
- for _, port := range rootDevice.Ports {
+ for _, port := range rootDevicePorts {
for _, peer := range port.Peers {
if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, childDevice.Id, adapter.Type, adapter.CurrentReplica)
@@ -717,11 +735,11 @@
}
func (dMgr *Manager) ReconcileChildDevices(ctx context.Context, parentDeviceID string) error {
- if parentDevice, _ := dMgr.getDeviceFromModel(ctx, parentDeviceID); parentDevice != nil {
+ if parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID); err == nil {
responses := make([]utils.Response, 0)
- for _, port := range parentDevice.Ports {
+ for _, port := range parentDevicePorts {
for _, peer := range port.Peers {
- if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
+ if childDevice, err := dMgr.getDeviceFromModel(ctx, peer.DeviceId); err == nil {
responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
}
}
@@ -758,7 +776,11 @@
if err != nil {
return err
}
- if err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, port); err != nil {
+ ports, err := dMgr.listDevicePorts(ctx, deviceID)
+ if err != nil {
+ return err
+ }
+ if err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, ports, port); err != nil {
return err
}
return nil
@@ -793,7 +815,7 @@
func (dMgr *Manager) deleteParentFlows(ctx context.Context, deviceID string, uniPort uint32, metadata *voltha.FlowMetadata) error {
logger.Debugw(ctx, "deleteParentFlows", log.Fields{"device-id": deviceID, "uni-port": uniPort, "metadata": metadata})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
- if !agent.isRootdevice {
+ if !agent.isRootDevice {
return status.Errorf(codes.FailedPrecondition, "not-a-parent-device-%s", deviceID)
}
return agent.filterOutFlows(ctx, uniPort, metadata)
@@ -860,10 +882,11 @@
func (dMgr *Manager) GetPorts(ctx context.Context, deviceID string, portType voltha.Port_PortType) (*voltha.Ports, error) {
logger.Debugw(ctx, "GetPorts", log.Fields{"deviceid": deviceID, "portType": portType})
- if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
- return agent.getPorts(ctx, portType), nil
+ agent := dMgr.getDeviceAgent(ctx, deviceID)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
- return nil, status.Errorf(codes.NotFound, "%s", deviceID)
+ return agent.getPorts(ctx, portType), nil
}
func (dMgr *Manager) UpdateDeviceStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
@@ -876,19 +899,11 @@
func (dMgr *Manager) UpdateChildrenStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
logger.Debugw(ctx, "UpdateChildrenStatus", log.Fields{"parentDeviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
- var parentDevice *voltha.Device
- var err error
- if parentDevice, err = dMgr.getDevice(ctx, deviceID); err != nil {
+ parentDevicePorts, err := dMgr.listDevicePorts(ctx, deviceID)
+ if err != nil {
return status.Errorf(codes.Aborted, "%s", err.Error())
}
- var childDeviceIds []string
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
- return status.Errorf(codes.Aborted, "%s", err.Error())
- }
- if len(childDeviceIds) == 0 {
- logger.Debugw(ctx, "no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
- }
- for _, childDeviceID := range childDeviceIds {
+ for childDeviceID := range dMgr.getAllChildDeviceIds(ctx, parentDevicePorts) {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
if err = agent.updateDeviceStatus(ctx, operStatus, connStatus); err != nil {
return status.Errorf(codes.Aborted, "childDevice:%s, error:%s", childDeviceID, err.Error())
@@ -952,37 +967,20 @@
}
//UpdatePortsState updates all ports on the device
-func (dMgr *Manager) UpdatePortsState(ctx context.Context, deviceID string, state voltha.OperStatus_Types) error {
+func (dMgr *Manager) UpdatePortsState(ctx context.Context, deviceID string, portTypeFilter uint32, state voltha.OperStatus_Types) error {
logger.Debugw(ctx, "UpdatePortsState", log.Fields{"deviceid": deviceID})
-
- if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
- switch state {
- case voltha.OperStatus_ACTIVE:
- if err := agent.updatePortsOperState(ctx, state); err != nil {
- logger.Warnw(ctx, "updatePortsOperState-failed", log.Fields{"deviceId": deviceID, "error": err})
- return err
- }
- case voltha.OperStatus_UNKNOWN:
- if err := agent.updatePortsOperState(ctx, state); err != nil {
- logger.Warnw(ctx, "updatePortsOperState-failed", log.Fields{"deviceId": deviceID, "error": err})
- return err
- }
- default:
- return status.Error(codes.Unimplemented, "state-change-not-implemented")
- }
- // Notify the logical device about the state change
- device, err := dMgr.getDevice(ctx, deviceID)
- if err != nil {
- logger.Warnw(ctx, "non-existent-device", log.Fields{"deviceId": deviceID, "error": err})
- return err
- }
- if err := dMgr.logicalDeviceMgr.updatePortsState(ctx, device, state); err != nil {
- logger.Warnw(ctx, "failed-updating-ports-state", log.Fields{"deviceId": deviceID, "error": err})
- return err
- }
- return nil
+ agent := dMgr.getDeviceAgent(ctx, deviceID)
+ if agent == nil {
+ return status.Errorf(codes.NotFound, "%s", deviceID)
}
- return status.Errorf(codes.NotFound, "%s", deviceID)
+ if state != voltha.OperStatus_ACTIVE && state != voltha.OperStatus_UNKNOWN {
+ return status.Error(codes.Unimplemented, "state-change-not-implemented")
+ }
+ if err := agent.updatePortsOperState(ctx, portTypeFilter, state); err != nil {
+ logger.Warnw(ctx, "updatePortsOperState-failed", log.Fields{"deviceId": deviceID, "error": err})
+ return err
+ }
+ return nil
}
func (dMgr *Manager) ChildDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int64, deviceType string,
@@ -1176,9 +1174,8 @@
//cannot manage the child devices. This will trigger the Core to disable all the child devices.
func (dMgr *Manager) ChildDevicesLost(ctx context.Context, parentDeviceID string) error {
logger.Debug(ctx, "ChildDevicesLost")
- var err error
- var parentDevice *voltha.Device
- if parentDevice, err = dMgr.getDevice(ctx, parentDeviceID); err != nil {
+ parentDevice, err := dMgr.getDevice(ctx, parentDeviceID)
+ if err != nil {
logger.Warnw(ctx, "failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
return err
}
@@ -1189,34 +1186,28 @@
// disable/enable sequence. This will trigger the Core to Enable all the child devices of that parent.
func (dMgr *Manager) ChildDevicesDetected(ctx context.Context, parentDeviceID string) error {
logger.Debug(ctx, "ChildDevicesDetected")
- var err error
- var parentDevice *voltha.Device
- var childDeviceIds []string
-
- if parentDevice, err = dMgr.getDevice(ctx, parentDeviceID); err != nil {
+ parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
+ if err != nil {
logger.Warnw(ctx, "failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
return err
}
-
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
- return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
- }
+ childDeviceIds := dMgr.getAllChildDeviceIds(ctx, parentDevicePorts)
if len(childDeviceIds) == 0 {
- logger.Debugw(ctx, "no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+ logger.Debugw(ctx, "no-child-device", log.Fields{"parentDeviceId": parentDeviceID})
}
allChildEnableRequestSent := true
- for _, childDeviceID := range childDeviceIds {
+ for childDeviceID := range childDeviceIds {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
// Run the children re-registration in its own routine
- go func() {
- err = agent.enableDevice(context.Background())
+ go func(ctx context.Context) {
+ err = agent.enableDevice(ctx)
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
}
- }()
+ }(context.Background())
} else {
err = status.Errorf(codes.Unavailable, "no agent for child device %s", childDeviceID)
- logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parentDeviceId": parentDevice.Id, "childId": childDeviceID})
+ logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parentDeviceId": parentDeviceID, "childId": childDeviceID})
allChildEnableRequestSent = false
}
}
@@ -1234,17 +1225,10 @@
//DisableAllChildDevices is invoked as a callback when the parent device is disabled
func (dMgr *Manager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
logger.Debug(ctx, "DisableAllChildDevices")
- var childDeviceIds []string
- var err error
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentCurrDevice); err != nil {
- return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
- }
- if len(childDeviceIds) == 0 {
- logger.Debugw(ctx, "no-child-device", log.Fields{"parentDeviceId": parentCurrDevice.Id})
- }
- for _, childDeviceID := range childDeviceIds {
+ ports, _ := dMgr.listDevicePorts(ctx, parentCurrDevice.Id)
+ for childDeviceID := range dMgr.getAllChildDeviceIds(ctx, ports) {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
- if err = agent.disableDevice(ctx); err != nil {
+ if err := agent.disableDevice(ctx); err != nil {
// Just log the error - this error happens only if the child device was already in deleted state.
logger.Errorw(ctx, "failure-disable-device", log.Fields{"deviceId": childDeviceID, "error": err.Error()})
}
@@ -1256,17 +1240,10 @@
//DeleteAllChildDevices is invoked as a callback when the parent device is deleted
func (dMgr *Manager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
logger.Debug(ctx, "DeleteAllChildDevices")
- var childDeviceIds []string
- var err error
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentCurrDevice); err != nil {
- return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
- }
- if len(childDeviceIds) == 0 {
- logger.Debugw(ctx, "no-child-device", log.Fields{"parentDeviceId": parentCurrDevice.Id})
- }
- for _, childDeviceID := range childDeviceIds {
+ ports, _ := dMgr.listDevicePorts(ctx, parentCurrDevice.Id)
+ for childDeviceID := range dMgr.getAllChildDeviceIds(ctx, ports) {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
- if err = agent.deleteDevice(ctx); err != nil {
+ if err := agent.deleteDevice(ctx); err != nil {
logger.Warnw(ctx, "failure-delete-device", log.Fields{"deviceId": childDeviceID, "error": err.Error()})
}
// No further action is required here. The deleteDevice will change the device state where the resulting
@@ -1300,30 +1277,26 @@
}
//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
-func (dMgr *Manager) getAllChildDeviceIds(ctx context.Context, parentDevice *voltha.Device) ([]string, error) {
- logger.Debugw(ctx, "getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id})
- childDeviceIds := make([]string, 0)
- if parentDevice != nil {
- for _, port := range parentDevice.Ports {
- for _, peer := range port.Peers {
- childDeviceIds = append(childDeviceIds, peer.DeviceId)
- }
+func (dMgr *Manager) getAllChildDeviceIds(ctx context.Context, parentDevicePorts map[uint32]*voltha.Port) map[string]struct{} {
+ logger.Debug(ctx, "getAllChildDeviceIds")
+ childDeviceIds := make(map[string]struct{}, len(parentDevicePorts))
+ for _, port := range parentDevicePorts {
+ for _, peer := range port.Peers {
+ childDeviceIds[peer.DeviceId] = struct{}{}
}
- logger.Debugw(ctx, "returning-getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id, "childDeviceIds": childDeviceIds})
}
- return childDeviceIds, nil
+ logger.Debugw(ctx, "returning-getAllChildDeviceIds", log.Fields{"childDeviceIds": childDeviceIds})
+ return childDeviceIds
}
//GetAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
func (dMgr *Manager) GetAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
logger.Debugw(ctx, "GetAllChildDevices", log.Fields{"parentDeviceId": parentDeviceID})
- if parentDevice, err := dMgr.getDevice(ctx, parentDeviceID); err == nil {
+ if parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID); err == nil {
childDevices := make([]*voltha.Device, 0)
- if childDeviceIds, er := dMgr.getAllChildDeviceIds(ctx, parentDevice); er == nil {
- for _, deviceID := range childDeviceIds {
- if d, e := dMgr.getDevice(ctx, deviceID); e == nil && d != nil {
- childDevices = append(childDevices, d)
- }
+ for deviceID := range dMgr.getAllChildDeviceIds(ctx, parentDevicePorts) {
+ if d, e := dMgr.getDevice(ctx, deviceID); e == nil && d != nil {
+ childDevices = append(childDevices, d)
}
}
return &voltha.Devices{Items: childDevices}, nil
@@ -1334,7 +1307,11 @@
// SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
func (dMgr *Manager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
logger.Info(ctx, "addUNILogicalPort")
- if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice); err != nil {
+ cDevicePorts, err := dMgr.listDevicePorts(ctx, cDevice.Id)
+ if err != nil {
+ return err
+ }
+ if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice, cDevicePorts); err != nil {
logger.Warnw(ctx, "addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
return err
}
@@ -1529,7 +1506,7 @@
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
}
- return &empty.Empty{}, agent.enablePort(ctx, port)
+ return &empty.Empty{}, agent.enablePort(ctx, port.PortNo)
}
func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
@@ -1538,7 +1515,7 @@
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
}
- return &empty.Empty{}, agent.disablePort(ctx, port)
+ return &empty.Empty{}, agent.disablePort(ctx, port.PortNo)
}
// ChildDeviceLost calls parent adapter to delete child device and all its references
diff --git a/rw_core/core/device/port/common.go b/rw_core/core/device/port/common.go
new file mode 100644
index 0000000..df435c1
--- /dev/null
+++ b/rw_core/core/device/port/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package port
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "port"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/device/port/loader.go b/rw_core/core/device/port/loader.go
new file mode 100644
index 0000000..479695a
--- /dev/null
+++ b/rw_core/core/device/port/loader.go
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package port
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Loader hides all low-level locking & synchronization related to port state updates
+type Loader struct {
+ dbProxy *model.Proxy
+ // this lock protects the ports map, it does not protect individual ports
+ lock sync.RWMutex
+ ports map[uint32]*chunk
+}
+
+// chunk keeps a port and the lock for this port
+type chunk struct {
+ // this lock is used to synchronize all access to the port, and also to the "deleted" variable
+ lock sync.Mutex
+ deleted bool
+
+ port *voltha.Port
+}
+
+func NewLoader(dbProxy *model.Proxy) *Loader {
+ return &Loader{
+ dbProxy: dbProxy,
+ ports: make(map[uint32]*chunk),
+ }
+}
+
+// Load queries existing ports from the kv,
+// and should only be called once when first created.
+func (loader *Loader) Load(ctx context.Context) {
+ loader.lock.Lock()
+ defer loader.lock.Unlock()
+
+ var ports []*voltha.Port
+ if err := loader.dbProxy.List(ctx, &ports); err != nil {
+ logger.Errorw(ctx, "failed-to-list-ports-from-cluster-data-proxy", log.Fields{"error": err})
+ return
+ }
+ for _, port := range ports {
+ loader.ports[port.PortNo] = &chunk{port: port}
+ }
+}
+
+// LockOrCreate locks this port if it exists, or creates a new port if it does not.
+// In the case of port creation, the provided "port" must not be modified afterwards.
+func (loader *Loader) LockOrCreate(ctx context.Context, port *voltha.Port) (*Handle, bool, error) {
+ // try to use read lock instead of full lock if possible
+ if handle, have := loader.Lock(port.PortNo); have {
+ return handle, false, nil
+ }
+
+ loader.lock.Lock()
+ entry, have := loader.ports[port.PortNo]
+ if !have {
+ entry := &chunk{port: port}
+ loader.ports[port.PortNo] = entry
+ entry.lock.Lock()
+ loader.lock.Unlock()
+
+ if err := loader.dbProxy.Set(ctx, fmt.Sprint(port.PortNo), port); err != nil {
+ // revert the map
+ loader.lock.Lock()
+ delete(loader.ports, port.PortNo)
+ loader.lock.Unlock()
+
+ entry.deleted = true
+ entry.lock.Unlock()
+ return nil, false, err
+ }
+ return &Handle{loader: loader, chunk: entry}, true, nil
+ }
+ loader.lock.Unlock()
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.LockOrCreate(ctx, port)
+ }
+ return &Handle{loader: loader, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this port, and returns a handle which can be used to access the port until it's unlocked.
+// This handle ensures that the port cannot be accessed if the lock is not held.
+// Returns false if the port is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (loader *Loader) Lock(id uint32) (*Handle, bool) {
+ loader.lock.RLock()
+ entry, have := loader.ports[id]
+ loader.lock.RUnlock()
+
+ if !have {
+ return nil, false
+ }
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.Lock(id)
+ }
+ return &Handle{loader: loader, chunk: entry}, true
+}
+
+// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
+// This enforces correct Lock()-Usage()-Unlock() ordering.
+type Handle struct {
+ loader *Loader
+ chunk *chunk
+}
+
+// GetReadOnly returns an *voltha.Port which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *voltha.Port {
+ return h.chunk.port
+}
+
+// Update updates an existing port in the kv.
+// The provided "port" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, port *voltha.Port) error {
+ if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(port.PortNo), port); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-port-%v: %s", port.PortNo, err)
+ }
+ h.chunk.port = port
+ return nil
+}
+
+// Delete removes the device from the kv
+func (h *Handle) Delete(ctx context.Context) error {
+ if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.port.PortNo)); err != nil {
+ return fmt.Errorf("couldnt-delete-port-from-store-%v", h.chunk.port.PortNo)
+ }
+ h.chunk.deleted = true
+
+ h.loader.lock.Lock()
+ delete(h.loader.ports, h.chunk.port.PortNo)
+ h.loader.lock.Unlock()
+
+ h.Unlock()
+ return nil
+}
+
+// Unlock releases the lock on the port
+func (h *Handle) Unlock() {
+ if h.chunk != nil {
+ h.chunk.lock.Unlock()
+ h.chunk = nil // attempting to access the port through this handle in future will panic
+ }
+}
+
+// ListIDs returns a snapshot of all the managed port IDs
+// TODO: iterating through ports safely is expensive now, since all ports are stored & locked separately
+// should avoid this where possible
+func (loader *Loader) ListIDs() map[uint32]struct{} {
+ loader.lock.RLock()
+ defer loader.lock.RUnlock()
+ // copy the IDs so caller can safely iterate
+ ret := make(map[uint32]struct{}, len(loader.ports))
+ for id := range loader.ports {
+ ret[id] = struct{}{}
+ }
+ return ret
+}
diff --git a/rw_core/core/device/remote/adapter_proxy_test.go b/rw_core/core/device/remote/adapter_proxy_test.go
index 5ab0127..46de80f 100755
--- a/rw_core/core/device/remote/adapter_proxy_test.go
+++ b/rw_core/core/device/remote/adapter_proxy_test.go
@@ -18,6 +18,10 @@
import (
"context"
"crypto/rand"
+ "strings"
+ "testing"
+ "time"
+
"github.com/golang/protobuf/ptypes"
any2 "github.com/golang/protobuf/ptypes/any"
cm "github.com/opencord/voltha-go/rw_core/mocks"
@@ -31,9 +35,6 @@
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "strings"
- "testing"
- "time"
)
const (