[VOL-3245] Consume port capability on port creation
This commit consists of the changes needed in the Core to build
a logical port without having to request port capability from
the adapters.
This commit requires changes in the OLT and ONU adapters to be
completed. These are tracked under jira
https://jira.opencord.org/browse/VOL-3202. Until the adapters
changes are committed this commit will not pass jenkins.
Change-Id: I0cae167375b9a8c67a83202e25abcec6ae013a88
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index 2d7d210..25c06e9 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -21,9 +21,7 @@
"fmt"
"github.com/gogo/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- ic "github.com/opencord/voltha-protos/v3/go/inter_container"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -43,32 +41,6 @@
return ports
}
-// getPortCapability retrieves the port capability of a device
-func (agent *Agent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
- logger.Debugw("getPortCapability", log.Fields{"device-id": agent.deviceID})
- device, err := agent.getDevice(ctx)
- if err != nil {
- return nil, err
- }
- ch, err := agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo)
- if err != nil {
- return nil, err
- }
- // Wait for adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed")
- }
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
- }
- // Successful response
- portCap := &ic.PortCapability{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, portCap); err != nil {
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return portCap, nil
-}
func (agent *Agent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
logger.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index ee4e77d..2901fc4 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -313,22 +313,23 @@
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
- logger.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
+ logger.Debugw("addNNILogicalPort", log.Fields{"logical-device-id": agent.logicalDeviceID, "nni-port": port})
label := fmt.Sprintf("nni-%d", port.PortNo)
- tmpPort := &voltha.LogicalPort{
+ ofpPort := *port.OfpPort
+ ofpPort.HwAddr = append([]uint32{}, port.OfpPort.HwAddr...)
+ ofpPort.PortNo = port.PortNo
+ ofpPort.Name = label
+ nniPort := &voltha.LogicalPort{
RootPort: true,
DeviceId: device.Id,
Id: label,
DevicePortNo: port.PortNo,
- OfpPort: &voltha.OfpPort{
- PortNo: port.PortNo,
- Name: label,
- },
+ OfpPort: &ofpPort,
OfpPortStats: &ofp.OfpPortStats{},
}
- portHandle, created, err := agent.portLoader.LockOrCreate(ctx, tmpPort)
+ portHandle, created, err := agent.portLoader.LockOrCreate(ctx, nniPort)
if err != nil {
return err
}
@@ -339,50 +340,20 @@
return nil
}
- // TODO: VOL-3202 Change the port creation logic to include the port capability. This will eliminate
- // the port capability request that the Core makes following a port create event.
- // TODO: VOL-3202 the port lock should not be held while getPortCapability() runs (preferably not while *any*
- // external request runs), this is a temporary hack to avoid updating port state before the port is ready
-
- // First get the port capability
- portCap, err := agent.deviceMgr.getPortCapability(ctx, device.Id, port.PortNo)
- if err != nil {
- logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return err
- }
-
- newPort := portCap.Port
- newPort.RootPort = true
- newPort.DeviceId = device.Id
- newPort.Id = label
- newPort.DevicePortNo = port.PortNo
- newPort.OfpPort.PortNo = port.PortNo
- newPort.OfpPort.Name = label
-
- // TODO: VOL-3202 shouldn't create tmp port then update, should prepare complete port first then LockOrCreate()
- // the use of context.Background() is required to ensure we don't get an inconsistent logical port state
- // while doing this, and can be removed later.
- if err := portHandle.Update(ctx, newPort); err != nil {
- if err := portHandle.Delete(context.Background()); err != nil {
- return fmt.Errorf("unable-to-delete-%d: %s", port.PortNo, err)
- }
- return err
- }
-
// ensure that no events will be sent until this one is
queuePosition := agent.orderedEvents.assignQueuePosition()
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
// First setup the routes
- if err := agent.updateRoutes(context.Background(), device, newPort, agent.listLogicalDevicePorts()); err != nil {
+ if err := agent.updateRoutes(context.Background(), device, nniPort, agent.listLogicalDevicePorts()); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
- logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": newPort.OfpPort.PortNo, "error": err})
+ logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": nniPort.OfpPort.PortNo, "error": err})
}
// send event, and allow any queued events to be sent as well
- queuePosition.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, newPort.OfpPort)
+ queuePosition.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, nniPort.OfpPort)
}()
return nil
}
@@ -397,19 +368,19 @@
logger.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
return nil
}
-
- tmpPort := &voltha.LogicalPort{
+ ofpPort := *port.OfpPort
+ ofpPort.HwAddr = append([]uint32{}, port.OfpPort.HwAddr...)
+ ofpPort.PortNo = port.PortNo
+ uniPort := &voltha.LogicalPort{
RootPort: false,
DeviceId: childDevice.Id,
Id: port.Label,
DevicePortNo: port.PortNo,
- OfpPort: &voltha.OfpPort{
- PortNo: port.PortNo,
- },
+ OfpPort: &ofpPort,
OfpPortStats: &ofp.OfpPortStats{},
}
- portHandle, created, err := agent.portLoader.LockOrCreate(ctx, tmpPort)
+ portHandle, created, err := agent.portLoader.LockOrCreate(ctx, uniPort)
if err != nil {
return err
}
@@ -420,49 +391,20 @@
return nil
}
- // TODO: VOL-3202 Change the port creation logic to include the port capability. This will eliminate
- // the port capability request that the Core makes following a port create event.
- // TODO: VOL-3202 the port lock should not be held while getPortCapability() runs (preferably not while *any*
- // external request runs), this is a temporary hack to avoid updating port state before the port is ready
-
- // First get the port capability
- portCap, err := agent.deviceMgr.getPortCapability(ctx, childDevice.Id, port.PortNo)
- if err != nil {
- logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return err
- }
-
- logger.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
- newPort := portCap.Port
- newPort.RootPort = false
- newPort.DeviceId = childDevice.Id
- newPort.Id = port.Label
- newPort.DevicePortNo = port.PortNo
- newPort.OfpPort.PortNo = port.PortNo
-
- // TODO: VOL-3202 shouldn't create tmp port then update, should prepare complete port first then LockOrCreate()
- // the use of context.Background() is required to ensure we don't get an inconsistent logical port state
- // while doing this, and can be removed later.
- if err := portHandle.Update(ctx, newPort); err != nil {
- if err := portHandle.Delete(context.Background()); err != nil {
- return fmt.Errorf("unable-to-delete-%d: %s", port.PortNo, err)
- }
- return err
- }
-
// ensure that no events will be sent until this one is
queuePosition := agent.orderedEvents.assignQueuePosition()
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
// First setup the routes
- if err := agent.updateRoutes(context.Background(), childDevice, newPort, agent.listLogicalDevicePorts()); err != nil {
+ if err := agent.updateRoutes(context.Background(), childDevice, uniPort, agent.listLogicalDevicePorts()); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
- logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": newPort.OfpPort.PortNo, "error": err})
+ logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": uniPort.OfpPort.PortNo, "error": err})
}
+
// send event, and allow any queued events to be sent as well
- queuePosition.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, newPort.OfpPort)
+ queuePosition.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
}()
return nil
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 6f8d899..51da0e1 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -866,14 +866,6 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *Manager) getPortCapability(ctx context.Context, deviceID string, portNo uint32) (*ic.PortCapability, error) {
- logger.Debugw("getPortCapability", log.Fields{"deviceid": deviceID})
- if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
- return agent.getPortCapability(ctx, portNo)
- }
- return nil, status.Errorf(codes.NotFound, "%s", deviceID)
-}
-
func (dMgr *Manager) UpdateDeviceStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
logger.Debugw("UpdateDeviceStatus", log.Fields{"deviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index 4cbb363..6bb318b 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -166,21 +166,6 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// GetOfpPortInfo invokes get ofp port info rpc
-func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
- logger.Debugw("GetOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
- if err != nil {
- return nil, err
- }
- args := []*kafka.KVArg{
- {Key: "device", Value: device},
- {Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
- }
- replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
-}
-
// ReconcileDevice invokes reconcile device rpc
func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("ReconcileDevice", log.Fields{"device-id": device.Id})
diff --git a/rw_core/core/device/remote/adapter_proxy_test.go b/rw_core/core/device/remote/adapter_proxy_test.go
index 17627dc..b9755c7 100755
--- a/rw_core/core/device/remote/adapter_proxy_test.go
+++ b/rw_core/core/device/remote/adapter_proxy_test.go
@@ -178,24 +178,6 @@
assert.Equal(t, switchCap.String(), expectedCap.String())
}
-func testGetPortInfoFromAdapter(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
- d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- defer cancel()
- portNo := uint32(1)
- rpcResponse, err := ap.GetOfpPortInfo(ctx, d, portNo)
- assert.Nil(t, err)
- response, err := waitForResponse(ctx, rpcResponse)
- assert.Nil(t, err)
- portCap := &ic.PortCapability{}
- err = ptypes.UnmarshalAny(response, portCap)
- assert.Nil(t, err)
- assert.NotNil(t, portCap)
- expectedPortInfo, _ := adapter.Get_ofp_port_info(d, int64(portNo))
- assert.Equal(t, portCap.String(), expectedPortInfo.String())
-}
-
func testPacketOut(t *testing.T) {
ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
@@ -243,15 +225,12 @@
//2. Test get switch capability
testGetSwitchCapabilityFromAdapter(t)
- //3. Test get port info
- testGetPortInfoFromAdapter(t)
-
- //4. Test PacketOut
+ //3. Test PacketOut
testPacketOut(t)
- // 5. Test flow updates
+ //4. Test flow updates
testFlowUpdates(t)
- //6. Pm configs
+ //5. Pm configs
testPmUpdates(t)
}
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index 7bad0c2..7509f1f 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -208,27 +208,6 @@
}, nil
}
-// Get_ofp_port_info -
-func (ta *Adapter) Get_ofp_port_info(device *voltha.Device, portNo int64) (*ic.PortCapability, error) { // nolint
- capability := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
- return &ic.PortCapability{
- Port: &voltha.LogicalPort{
- OfpPort: &of.OfpPort{
- HwAddr: macAddressToUint32Array("11:11:33:44:55:66"),
- Config: 0,
- State: uint32(of.OfpPortState_OFPPS_LIVE),
- Curr: capability,
- Advertised: capability,
- Peer: capability,
- CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
- MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
- },
- DeviceId: device.Id,
- DevicePortNo: uint32(portNo),
- },
- }, nil
-}
-
// Process_inter_adapter_message -
func (ta *Adapter) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error { // nolint
return nil
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 61f431a..c567e40 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -60,11 +60,22 @@
if res := oltA.coreProxy.DeviceUpdate(context.TODO(), d); res != nil {
logger.Fatalf("deviceUpdate-failed-%s", res)
}
+ capability := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
nniPort := &voltha.Port{
PortNo: 2,
Label: fmt.Sprintf("nni-%d", 2),
Type: voltha.Port_ETHERNET_NNI,
OperStatus: voltha.OperStatus_ACTIVE,
+ OfpPort: &of.OfpPort{
+ HwAddr: macAddressToUint32Array("11:22:33:44:55:66"),
+ Config: 0,
+ State: uint32(of.OfpPortState_OFPPS_LIVE),
+ Curr: capability,
+ Advertised: capability,
+ Peer: capability,
+ CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ },
}
var err error
if err = oltA.coreProxy.PortCreated(context.TODO(), d.Id, nniPort); err != nil {
@@ -138,30 +149,6 @@
}, nil
}
-// Get_ofp_port_info returns ofp port info
-func (oltA *OLTAdapter) Get_ofp_port_info(device *voltha.Device, portNo int64) (*ic.PortCapability, error) { // nolint
- if d := oltA.getDevice(device.Id); d == nil {
- logger.Fatalf("device-not-found-%s", device.Id)
- }
- capability := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
- return &ic.PortCapability{
- Port: &voltha.LogicalPort{
- OfpPort: &of.OfpPort{
- HwAddr: macAddressToUint32Array("11:22:33:44:55:66"),
- Config: 0,
- State: uint32(of.OfpPortState_OFPPS_LIVE),
- Curr: capability,
- Advertised: capability,
- Peer: capability,
- CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
- MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
- },
- DeviceId: device.Id,
- DevicePortNo: uint32(portNo),
- },
- }, nil
-}
-
// GetNumONUPerOLT returns number of ONUs per OLT
func (oltA *OLTAdapter) GetNumONUPerOLT() int {
return numONUPerOLT
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index 217f01d..02c885e 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -70,12 +70,24 @@
}
}
+ capability := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
uniPort := &voltha.Port{
PortNo: uniPortNo,
Label: fmt.Sprintf("uni-%d", uniPortNo),
Type: voltha.Port_ETHERNET_UNI,
OperStatus: voltha.OperStatus_ACTIVE,
+ OfpPort: &of.OfpPort{
+ HwAddr: macAddressToUint32Array("12:12:12:12:12:12"),
+ Config: 0,
+ State: uint32(of.OfpPortState_OFPPS_LIVE),
+ Curr: capability,
+ Advertised: capability,
+ Peer: capability,
+ CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ },
}
+
var err error
if err = onuA.coreProxy.PortCreated(context.TODO(), d.Id, uniPort); err != nil {
logger.Fatalf("PortCreated-failed-%s", err)
@@ -114,30 +126,6 @@
return nil
}
-// Get_ofp_port_info returns ofp device info
-func (onuA *ONUAdapter) Get_ofp_port_info(device *voltha.Device, portNo int64) (*ic.PortCapability, error) { // nolint
- if d := onuA.getDevice(device.Id); d == nil {
- logger.Fatalf("device-not-found-%s", device.Id)
- }
- capability := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
- return &ic.PortCapability{
- Port: &voltha.LogicalPort{
- OfpPort: &of.OfpPort{
- HwAddr: macAddressToUint32Array("12:12:12:12:12:12"),
- Config: 0,
- State: uint32(of.OfpPortState_OFPPS_LIVE),
- Curr: capability,
- Advertised: capability,
- Peer: capability,
- CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
- MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
- },
- DeviceId: device.Id,
- DevicePortNo: uint32(portNo),
- },
- }, nil
-}
-
// Disable_device disables device
func (onuA *ONUAdapter) Disable_device(device *voltha.Device) error { // nolint
go func() {