VOL-3121 - Separated out logical ports from logical agent.
Similar to flows/groups/meters.
Also modified device_route tests to generate unique port IDs (`.OfpPort.PortNo`s) across all UNI ports withing each test, i.e. within an OLT.
Also replaced logicalPortsNo map & associated NNI vs UNI logic with root device checks.
Change-Id: Ib0cecbf7d4f8d509ce7c989b9ccf697c8b0d17d6
diff --git a/VERSION b/VERSION
index 8e8299d..6cdfe8d 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.2
+2.4.3-dev
diff --git a/rw_core/core/api/common_test.go b/rw_core/core/api/common_test.go
index 097b1c1..6fcd511 100644
--- a/rw_core/core/api/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -36,6 +36,7 @@
)
type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
+type isLogicalDevicePortsConditionSatisfied func(ports []*voltha.LogicalPort) bool
type isDeviceConditionSatisfied func(ld *voltha.Device) bool
type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
@@ -130,6 +131,41 @@
}
}
+func waitUntilLogicalDevicePortsReadiness(oltDeviceID string,
+ timeout time.Duration,
+ nbi *NBIHandler,
+ verificationFunction isLogicalDevicePortsConditionSatisfied,
+) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ // Get the logical device from the olt device
+ d, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDeviceID})
+ if d != nil && d.ParentId != "" {
+ ports, err := nbi.ListLogicalDevicePorts(getContext(), &voltha.ID{Id: d.ParentId})
+ if err == nil && verificationFunction(ports.Items) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
+ }
+}
+
func waitUntilConditionForDevices(timeout time.Duration, nbi *NBIHandler, verificationFunction isDevicesConditionSatisfied) error {
ch := make(chan int, 1)
done := false
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 5801f27..2eff4a4 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -150,6 +150,9 @@
assert.Equal(t, 1, len(logicalDevices.Items))
ld := logicalDevices.Items[0]
+ ports, err := nbi.ListLogicalDevicePorts(getContext(), &voltha.ID{Id: ld.Id})
+ assert.Nil(t, err)
+
assert.NotEqual(t, "", ld.Id)
assert.NotEqual(t, uint64(0), ld.DatapathId)
assert.Equal(t, "olt_adapter_mock", ld.Desc.HwDesc)
@@ -159,7 +162,7 @@
assert.Equal(t, uint32(256), ld.SwitchFeatures.NBuffers)
assert.Equal(t, uint32(2), ld.SwitchFeatures.NTables)
assert.Equal(t, uint32(15), ld.SwitchFeatures.Capabilities)
- assert.Equal(t, 1+nb.numONUPerOLT, len(ld.Ports))
+ assert.Equal(t, 1+nb.numONUPerOLT, len(ports.Items))
assert.Equal(t, oltDevice.ParentId, ld.Id)
//Expected port no
expectedPortNo := make(map[uint32]bool)
@@ -167,7 +170,7 @@
for i := 0; i < nb.numONUPerOLT; i++ {
expectedPortNo[uint32(i+100)] = false
}
- for _, p := range ld.Ports {
+ for _, p := range ports.Items {
assert.Equal(t, p.OfpPort.PortNo, p.DevicePortNo)
assert.Equal(t, uint32(4), p.OfpPort.State)
expectedPortNo[p.OfpPort.PortNo] = true
@@ -393,10 +396,10 @@
assert.Nil(t, err)
// Wait for the logical device to be in the ready state
- var vldFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
- return ld != nil && len(ld.Ports) == nb.numONUPerOLT+1
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
}
- err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
assert.Nil(t, err)
// Verify that the devices have been setup correctly
@@ -439,11 +442,8 @@
}
// Wait for the logical device to satisfy the expected condition
- var vlFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
- if ld == nil {
- return false
- }
- for _, lp := range ld.Ports {
+ var vlFunction = func(ports []*voltha.LogicalPort) bool {
+ for _, lp := range ports {
if (lp.OfpPort.Config&uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LINK_DOWN) {
return false
@@ -451,7 +451,7 @@
}
return true
}
- err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
assert.Nil(t, err)
// Reenable the oltDevice
@@ -474,11 +474,8 @@
}
// Wait for the logical device to satisfy the expected condition
- vlFunction = func(ld *voltha.LogicalDevice) bool {
- if ld == nil {
- return false
- }
- for _, lp := range ld.Ports {
+ vlFunction = func(ports []*voltha.LogicalPort) bool {
+ for _, lp := range ports {
if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
return false
@@ -486,7 +483,7 @@
}
return true
}
- err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
assert.Nil(t, err)
}
@@ -581,10 +578,10 @@
assert.Nil(t, err)
// Wait for the logical device to be in the ready state
- var vldFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
- return ld != nil && len(ld.Ports) == nb.numONUPerOLT+1
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
}
- err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
assert.Nil(t, err)
//Get all child devices
@@ -664,11 +661,8 @@
err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
assert.Nil(t, err)
// Wait for the logical device to satisfy the expected condition
- var vlFunction = func(ld *voltha.LogicalDevice) bool {
- if ld == nil {
- return false
- }
- for _, lp := range ld.Ports {
+ var vlFunction = func(ports []*voltha.LogicalPort) bool {
+ for _, lp := range ports {
if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
return false
@@ -676,7 +670,7 @@
}
return true
}
- err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
assert.Nil(t, err)
// Enable the NW Port of oltDevice
@@ -695,11 +689,8 @@
err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
assert.Nil(t, err)
// Wait for the logical device to satisfy the expected condition
- vlFunction = func(ld *voltha.LogicalDevice) bool {
- if ld == nil {
- return false
- }
- for _, lp := range ld.Ports {
+ vlFunction = func(ports []*voltha.LogicalPort) bool {
+ for _, lp := range ports {
if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
return false
@@ -707,7 +698,7 @@
}
return true
}
- err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
assert.Nil(t, err)
// Disable a non-PON port
@@ -876,10 +867,10 @@
assert.Nil(t, err)
// Wait for the logical device to be in the ready state
- var vldFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
- return ld != nil && len(ld.Ports) == nb.numONUPerOLT+1
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
}
- err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
assert.Nil(t, err)
// Wait for the olt device to be enabled
@@ -939,11 +930,11 @@
assert.Nil(t, err)
}
-func (nb *NBTest) sendTrapFlows(t *testing.T, nbi *NBIHandler, logicalDevice *voltha.LogicalDevice, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
+func (nb *NBTest) sendTrapFlows(t *testing.T, nbi *NBIHandler, logicalDeviceID string, ports []*voltha.LogicalPort, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
// Send flows for the parent device
var nniPorts []*voltha.LogicalPort
var uniPorts []*voltha.LogicalPort
- for _, p := range logicalDevice.Ports {
+ for _, p := range ports {
if p.RootPort {
nniPorts = append(nniPorts, p)
} else {
@@ -966,7 +957,7 @@
flows.Output(controllerPortMask),
},
}
- flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+ flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
_, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowLLDP)
assert.Nil(t, err)
@@ -983,7 +974,7 @@
flows.Output(controllerPortMask),
},
}
- flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+ flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV4)
assert.Nil(t, err)
@@ -1000,7 +991,7 @@
flows.Output(controllerPortMask),
},
}
- flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+ flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
_, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV6)
assert.Nil(t, err)
@@ -1059,9 +1050,13 @@
}
// Ensure there are both NNI ports and at least one UNI port on the logical device
ld := lds.Items[0]
+ ports, err := nbi.ListLogicalDevicePorts(getContext(), &voltha.ID{Id: ld.Id})
+ if err != nil {
+ return false
+ }
nniPort := false
uniPort := false
- for _, p := range ld.Ports {
+ for _, p := range ports.Items {
nniPort = nniPort || p.RootPort == true
uniPort = uniPort || p.RootPort == false
if nniPort && uniPort {
@@ -1078,7 +1073,7 @@
assert.NotNil(t, logicalDevices)
assert.Equal(t, 1, len(logicalDevices.Items))
- logicalDevice := logicalDevices.Items[0]
+ logicalDeviceID := logicalDevices.Items[0].Id
meterID := rand.Uint32()
// Add a meter to the logical device
@@ -1094,12 +1089,15 @@
},
},
}
- _, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{Id: logicalDevice.Id, MeterMod: meterMod})
+ _, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{Id: logicalDeviceID, MeterMod: meterMod})
+ assert.Nil(t, err)
+
+ ports, err := nbi.ListLogicalDevicePorts(getContext(), &voltha.ID{Id: logicalDeviceID})
assert.Nil(t, err)
// Send initial set of Trap flows
startingVlan := 4091
- nb.sendTrapFlows(t, nbi, logicalDevice, uint64(meterID), startingVlan)
+ nb.sendTrapFlows(t, nbi, logicalDeviceID, ports.Items, uint64(meterID), startingVlan)
// Listen for port events
start := time.Now()
@@ -1113,7 +1111,7 @@
if ps.Reason == ofp.OfpPortReason_OFPPR_ADD {
if ps.Desc.PortNo >= uint32(nb.startingUNIPortNo) {
processedUniLogicalPorts++
- nb.sendEAPFlows(t, nbi, logicalDevice.Id, ps.Desc, startingVlan, uint64(meterID))
+ nb.sendEAPFlows(t, nbi, logicalDeviceID, ps.Desc, startingVlan, uint64(meterID))
} else {
processedNniLogicalPorts++
}
@@ -1174,10 +1172,10 @@
assert.Nil(t, err)
// Wait for the logical device to be in the ready state
- var vldFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
- return ld != nil && len(ld.Ports) == nb.numONUPerOLT+1
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
}
- err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
assert.Nil(t, err)
// Verify that the devices have been setup correctly
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index fe379d6..00a054e 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -378,7 +378,7 @@
}
updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
if !dType.AcceptsAddRemoveFlowUpdates {
- flowIDs := agent.flowLoader.List()
+ flowIDs := agent.flowLoader.ListIDs()
for flowID := range flowIDs {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
@@ -468,7 +468,7 @@
}
updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
if !dType.AcceptsAddRemoveFlowUpdates {
- groupIDs := agent.groupLoader.List()
+ groupIDs := agent.groupLoader.ListIDs()
for groupID := range groupIDs {
if grpHandle, have := agent.groupLoader.Lock(groupID); have {
updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
@@ -579,7 +579,7 @@
}
updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
if !dType.AcceptsAddRemoveFlowUpdates {
- flowIDs := agent.flowLoader.List()
+ flowIDs := agent.flowLoader.ListIDs()
for flowID := range flowIDs {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
@@ -646,7 +646,7 @@
}
updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
if !dType.AcceptsAddRemoveFlowUpdates {
- groupIDs := agent.groupLoader.List()
+ groupIDs := agent.groupLoader.ListIDs()
for groupID := range groupIDs {
if grpHandle, have := agent.groupLoader.Lock(groupID); have {
updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
@@ -721,7 +721,7 @@
func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
var flowsToDelete []*ofp.OfpFlowStats
// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
- for flowID := range agent.flowLoader.List() {
+ for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
flow := flowHandle.GetReadOnly()
if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
@@ -764,7 +764,7 @@
}
updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
if !dType.AcceptsAddRemoveFlowUpdates {
- flowIDs := agent.flowLoader.List()
+ flowIDs := agent.flowLoader.ListIDs()
for flowID := range flowIDs {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
@@ -853,7 +853,7 @@
}
updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
if !dType.AcceptsAddRemoveFlowUpdates {
- groupIDs := agent.groupLoader.List()
+ groupIDs := agent.groupLoader.ListIDs()
for groupID := range groupIDs {
if grpHandle, have := agent.groupLoader.Lock(groupID); have {
updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
@@ -942,7 +942,7 @@
func (agent *Agent) deleteAllFlows(ctx context.Context) error {
logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
- for flowID := range agent.flowLoader.List() {
+ for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
// Update the store and cache
if err := flowHandle.Delete(ctx); err != nil {
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index bb8fe08..58f9aa9 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -22,7 +22,7 @@
// listDeviceFlows returns device flows
func (agent *Agent) listDeviceFlows() map[uint64]*ofp.OfpFlowStats {
- flowIDs := agent.flowLoader.List()
+ flowIDs := agent.flowLoader.ListIDs()
flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
for flowID := range flowIDs {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 18ac83a..cb9557c 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -22,7 +22,7 @@
// listDeviceGroups returns logical device flow groups
func (agent *Agent) listDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
- groupIDs := agent.groupLoader.List()
+ groupIDs := agent.groupLoader.ListIDs()
groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
for groupID := range groupIDs {
if groupHandle, have := agent.groupLoader.Lock(groupID); have {
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index c205564..16cd08b 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -18,11 +18,12 @@
import (
"encoding/hex"
+ "sync"
+
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "sync"
)
type Manager struct {
@@ -123,13 +124,17 @@
return nil
}
-func (q *Manager) SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus) {
- // TODO: validate the type of portStatus parameter
- //if _, ok := portStatus.(*openflow_13.OfpPortStatus); ok {
- //}
- event := openflow_13.ChangeEvent{Id: deviceID, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
- logger.Debugw("SendChangeEvent", log.Fields{"event": event})
- q.changeEventQueue <- event
+func (q *Manager) SendChangeEvent(deviceID string, reason openflow_13.OfpPortReason, desc *openflow_13.OfpPort) {
+ logger.Debugw("SendChangeEvent", log.Fields{"device-id": deviceID, "reason": reason, "desc": desc})
+ q.changeEventQueue <- openflow_13.ChangeEvent{
+ Id: deviceID,
+ Event: &openflow_13.ChangeEvent_PortStatus{
+ PortStatus: &openflow_13.OfpPortStatus{
+ Reason: reason,
+ Desc: desc,
+ },
+ },
+ }
}
// ReceiveChangeEvents receives change in events
diff --git a/rw_core/core/device/flow/loader.go b/rw_core/core/device/flow/loader.go
index b407a3b..25060ab 100644
--- a/rw_core/core/device/flow/loader.go
+++ b/rw_core/core/device/flow/loader.go
@@ -30,11 +30,10 @@
// Loader hides all low-level locking & synchronization related to flow state updates
type Loader struct {
+ dbProxy *model.Proxy
// this lock protects the flows map, it does not protect individual flows
lock sync.RWMutex
flows map[uint64]*chunk
-
- dbProxy *model.Proxy
}
// chunk keeps a flow and the lock for this flow
@@ -48,8 +47,8 @@
func NewLoader(dbProxy *model.Proxy) *Loader {
return &Loader{
- flows: make(map[uint64]*chunk),
dbProxy: dbProxy,
+ flows: make(map[uint64]*chunk),
}
}
@@ -86,8 +85,6 @@
loader.lock.Unlock()
if err := loader.dbProxy.Set(ctx, fmt.Sprint(flow.Id), flow); err != nil {
- logger.Errorw("failed-adding-flow-to-db", log.Fields{"flowID": flow.Id, "err": err})
-
// revert the map
loader.lock.Lock()
delete(loader.flows, flow.Id)
@@ -109,7 +106,7 @@
return &Handle{loader: loader, chunk: entry}, false, nil
}
-// Lock acquires the lock for this flow, and returns a handle which can be used to access the meter until it's unlocked.
+// Lock acquires the lock for this flow, and returns a handle which can be used to access the flow until it's unlocked.
// This handle ensures that the flow cannot be accessed if the lock is not held.
// Returns false if the flow is not present.
// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
@@ -130,6 +127,8 @@
return &Handle{loader: loader, chunk: entry}, true
}
+// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
+// This enforces correct Lock()-Usage()-Unlock() ordering.
type Handle struct {
loader *Loader
chunk *chunk
@@ -173,10 +172,10 @@
}
}
-// List returns a snapshot of all the managed flow IDs
+// ListIDs returns a snapshot of all the managed flow IDs
// TODO: iterating through flows safely is expensive now, since all flows are stored & locked separately
// should avoid this where possible
-func (loader *Loader) List() map[uint64]struct{} {
+func (loader *Loader) ListIDs() map[uint64]struct{} {
loader.lock.RLock()
defer loader.lock.RUnlock()
// copy the IDs so caller can safely iterate
diff --git a/rw_core/core/device/flow/loader_test.go b/rw_core/core/device/flow/loader_test.go
index b739272..8973f12 100644
--- a/rw_core/core/device/flow/loader_test.go
+++ b/rw_core/core/device/flow/loader_test.go
@@ -22,34 +22,43 @@
"os"
"regexp"
"strconv"
- "strings"
"testing"
)
// TestLoadersIdentical ensures that the group, flow, and meter loaders always have an identical implementation.
func TestLoadersIdentical(t *testing.T) {
+ types := []string{"flow", "group", "meter", "logical_port"}
+
identical := [][]string{
- {"ofp\\.OfpFlowStats", "ofp\\.OfpGroupEntry", "ofp\\.OfpMeterEntry"},
- {"\\.Id", "\\.Desc\\.GroupId", "\\.Config.MeterId"},
- {"uint64", "uint32", "uint32"},
- {"Flow", "Group", "Meter"},
- {"flow", "group", "meter"},
+ {`ofp\.OfpFlowStats`, `ofp\.OfpGroupEntry`, `ofp\.OfpMeterEntry`, `voltha\.LogicalPort`},
+ {`\.Id`, `\.Desc\.GroupId`, `\.Config\.MeterId`, `\.OfpPort\.PortNo`},
+ {`uint64`, `uint32`, `uint32`, `uint32`},
+ {`Flow`, `Group`, `Meter`, `Port`},
+ {`flow`, `group`, `meter`, `port|logical_port`},
}
- regexes := make([]*regexp.Regexp, len(identical))
+ regexes := make([][]*regexp.Regexp, len(identical[0]))
+ for i := range regexes {
+ regexes[i] = make([]*regexp.Regexp, len(identical))
+ }
for i, group := range identical {
- regexes[i] = regexp.MustCompile(strings.Join(group, "|"))
+ for j, regexStr := range group {
+ // convert from column-wise to row-wise for convenience
+ regexes[j][i] = regexp.MustCompile(regexStr)
+ }
}
- for i := 1; i < len(identical[0]); i++ {
- if err := compare(regexes, "../"+identical[4][0]+"/loader.go", "../"+identical[4][i]+"/loader.go"); err != nil {
+ for i := 1; i < len(types); i++ {
+ if err := compare(regexes[0], regexes[i],
+ "../"+types[0]+"/loader.go",
+ "../"+types[i]+"/loader.go"); err != nil {
t.Error(err)
return
}
}
}
-func compare(regexes []*regexp.Regexp, fileNameA, fileNameB string) error {
+func compare(regexesA, regexesB []*regexp.Regexp, fileNameA, fileNameB string) error {
fileA, err := os.Open(fileNameA)
if err != nil {
return err
@@ -64,43 +73,63 @@
scannerA, scannerB := bufio.NewScanner(fileA), bufio.NewScanner(fileB)
- spaceRegex := regexp.MustCompile(" +")
+ // treat any number of spaces as a single space
+ spaceRegex := regexp.MustCompile(` +`)
+ // extra lines are permitted before a "blank" line, or before a lock/unlock
+ spacerRegex := regexp.MustCompile(`^(?:[^a-z]*|.*Lock\(\)|.*Unlock\(\))$`)
+ // ignore import type differences
+ libGoImportRegex := regexp.MustCompile(`^.*github\.com/opencord/voltha-protos/.*$`)
- line := 1
+ lineA, lineB := 1, 1
+linesLoop:
for {
- if continueA, continueB := scannerA.Scan(), scannerB.Scan(); continueA != continueB {
- if !continueA && continueB {
- if err := scannerA.Err(); err != nil {
- return err
+ if continueA, continueB := scannerA.Scan(), scannerB.Scan(); !continueA || !continueB {
+ // EOF
+ break linesLoop
+ }
+ textA, textB := scannerA.Text(), scannerB.Text()
+
+ // allow any number of "extra" lines just before a spacer line
+ for {
+ isSpacerA, isSpacerB := spacerRegex.MatchString(textA), spacerRegex.MatchString(textB)
+ if isSpacerA && !isSpacerB {
+ if !scannerB.Scan() {
+ // EOF
+ break linesLoop
}
- }
- if continueA && !continueB {
- if err := scannerB.Err(); err != nil {
- return err
+ lineB++
+ textB = scannerB.Text()
+ continue
+ } else if isSpacerB && !isSpacerA {
+ if !scannerA.Scan() {
+ // EOF
+ break linesLoop
}
+ lineA++
+ textA = scannerA.Text()
+ continue
}
- return fmt.Errorf("line %d: files are not the same length", line)
- } else if !continueA {
- // EOF from both files
break
}
- textA, textB := scannerA.Text(), scannerB.Text()
-
replacedA, replacedB := textA, textB
- for i, regex := range regexes {
+ for i := range regexesA {
replacement := "{{type" + strconv.Itoa(i) + "}}"
- replacedA, replacedB = regex.ReplaceAllString(replacedA, replacement), regex.ReplaceAllString(replacedB, replacement)
+ replacedA, replacedB = regexesA[i].ReplaceAllString(replacedA, replacement), regexesB[i].ReplaceAllString(replacedB, replacement)
}
// replace multiple spaces with single space
replacedA, replacedB = spaceRegex.ReplaceAllString(replacedA, " "), spaceRegex.ReplaceAllString(replacedB, " ")
- if replacedA != replacedB {
- return fmt.Errorf("line %d: files %s and %s do not match: \n\t%s\n\t%s\n\n\t%s\n\t%s", line, fileNameA, fileNameB, textA, textB, replacedA, replacedB)
+ // ignore voltha-protos import of ofp vs voltha
+ replacedA, replacedB = libGoImportRegex.ReplaceAllString(replacedA, "{{lib-go-import}}"), libGoImportRegex.ReplaceAllString(replacedB, "{{lib-go-import}}")
+
+ if replacedA != replacedB && textA != textB {
+ return fmt.Errorf("files which must be identical do not match: \n %s:%d\n %s\n %s:%d\n %s\n\n\t%s\n\t%s", fileNameA, lineA, textA, fileNameB, lineB, textB, replacedA, replacedB)
}
- line++
+ lineA++
+ lineB++
}
if err := scannerA.Err(); err != nil {
diff --git a/rw_core/core/device/group/loader.go b/rw_core/core/device/group/loader.go
index 5b2890a..2edc29e 100644
--- a/rw_core/core/device/group/loader.go
+++ b/rw_core/core/device/group/loader.go
@@ -30,11 +30,10 @@
// Loader hides all low-level locking & synchronization related to group state updates
type Loader struct {
+ dbProxy *model.Proxy
// this lock protects the groups map, it does not protect individual groups
lock sync.RWMutex
groups map[uint32]*chunk
-
- dbProxy *model.Proxy
}
// chunk keeps a group and the lock for this group
@@ -48,8 +47,8 @@
func NewLoader(dbProxy *model.Proxy) *Loader {
return &Loader{
- groups: make(map[uint32]*chunk),
dbProxy: dbProxy,
+ groups: make(map[uint32]*chunk),
}
}
@@ -86,8 +85,6 @@
loader.lock.Unlock()
if err := loader.dbProxy.Set(ctx, fmt.Sprint(group.Desc.GroupId), group); err != nil {
- logger.Errorw("failed-adding-group-to-db", log.Fields{"groupID": group.Desc.GroupId, "err": err})
-
// revert the map
loader.lock.Lock()
delete(loader.groups, group.Desc.GroupId)
@@ -130,6 +127,8 @@
return &Handle{loader: loader, chunk: entry}, true
}
+// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
+// This enforces correct Lock()-Usage()-Unlock() ordering.
type Handle struct {
loader *Loader
chunk *chunk
@@ -173,10 +172,10 @@
}
}
-// List returns a snapshot of all the managed group IDs
+// ListIDs returns a snapshot of all the managed group IDs
// TODO: iterating through groups safely is expensive now, since all groups are stored & locked separately
// should avoid this where possible
-func (loader *Loader) List() map[uint32]struct{} {
+func (loader *Loader) ListIDs() map[uint32]struct{} {
loader.lock.RLock()
defer loader.lock.RUnlock()
// copy the IDs so caller can safely iterate
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index f943106..c14750d 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -19,7 +19,6 @@
import (
"context"
"encoding/hex"
- "fmt"
"sync"
"time"
@@ -27,6 +26,7 @@
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/core/device/group"
+ "github.com/opencord/voltha-go/rw_core/core/device/logical_port"
"github.com/opencord/voltha-go/rw_core/core/device/meter"
fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
"github.com/opencord/voltha-go/rw_core/route"
@@ -42,26 +42,26 @@
// LogicalAgent represent attributes of logical device agent
type LogicalAgent struct {
- logicalDeviceID string
- serialNumber string
- rootDeviceID string
- deviceMgr *Manager
- ldeviceMgr *LogicalManager
- ldProxy *model.Proxy
- stopped bool
- deviceRoutes *route.DeviceRoutes
- logicalPortsNo map[uint32]bool //value is true for NNI port
- lockLogicalPortsNo sync.RWMutex
- flowDecomposer *fd.FlowDecomposer
- defaultTimeout time.Duration
- logicalDevice *voltha.LogicalDevice
- requestQueue *coreutils.RequestQueue
- startOnce sync.Once
- stopOnce sync.Once
+ logicalDeviceID string
+ serialNumber string
+ rootDeviceID string
+ deviceMgr *Manager
+ ldeviceMgr *LogicalManager
+ ldProxy *model.Proxy
+ stopped bool
+ deviceRoutes *route.DeviceRoutes
+ flowDecomposer *fd.FlowDecomposer
+ defaultTimeout time.Duration
+ logicalDevice *voltha.LogicalDevice
+ requestQueue *coreutils.RequestQueue
+ orderedEvents orderedEvents
+ startOnce sync.Once
+ stopOnce sync.Once
flowLoader *flow.Loader
meterLoader *meter.Loader
groupLoader *group.Loader
+ portLoader *port.Loader
}
func newLogicalAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
@@ -74,13 +74,13 @@
ldProxy: ldProxy,
ldeviceMgr: ldeviceMgr,
flowDecomposer: fd.NewFlowDecomposer(deviceMgr),
- logicalPortsNo: make(map[uint32]bool),
defaultTimeout: defaultTimeout,
requestQueue: coreutils.NewRequestQueue(),
flowLoader: flow.NewLoader(dbProxy.SubPath("logical_flows").Proxy(id)),
groupLoader: group.NewLoader(dbProxy.SubPath("logical_groups").Proxy(id)),
meterLoader: meter.NewLoader(dbProxy.SubPath("logical_meters").Proxy(id)),
+ portLoader: port.NewLoader(dbProxy.SubPath("logical_ports").Proxy(id)),
}
agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
return agent
@@ -134,7 +134,7 @@
}
logger.Debugw("logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
- agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
+ agent.logicalDevice = ld
// Setup the logicalports - internal processing, no need to propagate the client context
go func() {
@@ -158,14 +158,13 @@
agent.rootDeviceID = ld.RootDeviceId
// Update the last data
- agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
+ agent.logicalDevice = ld
- // Setup the local list of logical ports
- agent.addLogicalPortsToMap(ld.Ports)
// load the flows, meters and groups from KV to cache
agent.flowLoader.Load(ctx)
agent.meterLoader.Load(ctx)
agent.groupLoader.Load(ctx)
+ agent.portLoader.Load(ctx)
}
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
@@ -200,6 +199,8 @@
} else {
logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
}
+ // TODO: remove all entries from all loaders
+ // TODO: don't allow any more modifications to flows/groups/meters/ports or to any logical device field
agent.stopped = true
@@ -217,28 +218,6 @@
return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice), nil
}
-// getLogicalDeviceWithoutLock returns a cloned logical device to a function that already holds the agent lock.
-func (agent *LogicalAgent) getLogicalDeviceWithoutLock() *voltha.LogicalDevice {
- logger.Debug("getLogicalDeviceWithoutLock")
- return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
-}
-
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
-func (agent *LogicalAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
- if agent.stopped {
- return fmt.Errorf("logical device agent stopped-%s", logicalDevice.Id)
- }
-
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if err := agent.ldProxy.Set(updateCtx, agent.logicalDeviceID, logicalDevice); err != nil {
- logger.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
- return err
- }
-
- agent.logicalDevice = logicalDevice
- return nil
-}
-
func (agent *LogicalAgent) addFlowsAndGroupsToDevices(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index b06b0f7..ff0f6e1 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -34,7 +34,7 @@
// listLogicalDeviceFlows returns logical device flows
func (agent *LogicalAgent) listLogicalDeviceFlows() map[uint64]*ofp.OfpFlowStats {
- flowIDs := agent.flowLoader.List()
+ flowIDs := agent.flowLoader.ListIDs()
flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
for flowID := range flowIDs {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
@@ -142,7 +142,7 @@
return changed, updated, err
}
- groupIDs := agent.groupLoader.List()
+ groupIDs := agent.groupLoader.ListIDs()
groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
for groupID := range groupIDs {
if groupHandle, have := agent.groupLoader.Lock(groupID); have {
@@ -242,7 +242,7 @@
}
// search through all the flows
- for flowID := range agent.flowLoader.List() {
+ for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
if flow := flowHandle.GetReadOnly(); fu.FlowMatchesMod(flow, mod) {
toDelete[flow.Id] = flow
@@ -283,7 +283,7 @@
}
groups := make(map[uint32]*ofp.OfpGroupEntry)
- for groupID := range agent.groupLoader.List() {
+ for groupID := range agent.groupLoader.ListIDs() {
if groupHandle, have := agent.groupLoader.Lock(groupID); have {
groups[groupID] = groupHandle.GetReadOnly()
groupHandle.Unlock()
@@ -344,7 +344,7 @@
defer flowHandle.Unlock()
groups := make(map[uint32]*ofp.OfpGroupEntry)
- for groupID := range agent.groupLoader.List() {
+ for groupID := range agent.groupLoader.ListIDs() {
if groupHandle, have := agent.groupLoader.Lock(groupID); have {
groups[groupID] = groupHandle.GetReadOnly()
groupHandle.Unlock()
@@ -419,7 +419,7 @@
func (agent *LogicalAgent) deleteFlowsHavingMeter(ctx context.Context, meterID uint32) error {
logger.Infow("Delete-flows-matching-meter", log.Fields{"meter": meterID})
- for flowID := range agent.flowLoader.List() {
+ for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
if err := flowHandle.Delete(ctx); err != nil {
@@ -438,7 +438,7 @@
func (agent *LogicalAgent) deleteFlowsHavingGroup(ctx context.Context, groupID uint32) (map[uint64]*ofp.OfpFlowStats, error) {
logger.Infow("Delete-flows-matching-group", log.Fields{"groupID": groupID})
flowsRemoved := make(map[uint64]*ofp.OfpFlowStats)
- for flowID := range agent.flowLoader.List() {
+ for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
if flow := flowHandle.GetReadOnly(); fu.FlowHasOutGroup(flow, groupID) {
if err := flowHandle.Delete(ctx); err != nil {
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index bf2edbb..56f23bc 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -31,7 +31,7 @@
// listLogicalDeviceGroups returns logical device flow groups
func (agent *LogicalAgent) listLogicalDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
- groupIDs := agent.groupLoader.List()
+ groupIDs := agent.groupLoader.ListIDs()
groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
for groupID := range groupIDs {
if groupHandle, have := agent.groupLoader.Lock(groupID); have {
@@ -110,7 +110,7 @@
toDelete := map[uint32]struct{}{groupMod.GroupId: {}}
if groupMod.GroupId == uint32(ofp.OfpGroup_OFPG_ALL) {
- toDelete = agent.groupLoader.List()
+ toDelete = agent.groupLoader.ListIDs()
}
for groupID := range toDelete {
diff --git a/rw_core/core/device/logical_agent_meter.go b/rw_core/core/device/logical_agent_meter.go
index dc44fda..991479a 100644
--- a/rw_core/core/device/logical_agent_meter.go
+++ b/rw_core/core/device/logical_agent_meter.go
@@ -29,7 +29,7 @@
// listLogicalDeviceMeters returns logical device meters
func (agent *LogicalAgent) listLogicalDeviceMeters() map[uint32]*ofp.OfpMeterEntry {
- meterIDs := agent.meterLoader.List()
+ meterIDs := agent.meterLoader.ListIDs()
meters := make(map[uint32]*ofp.OfpMeterEntry, len(meterIDs))
for meterID := range meterIDs {
if meterHandle, have := agent.meterLoader.Lock(meterID); have {
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 7229e05..ee4e77d 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -19,51 +19,46 @@
import (
"context"
"fmt"
+ "sync"
- "github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- ic "github.com/opencord/voltha-protos/v3/go/inter_container"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
-// ListLogicalDevicePorts returns logical device ports
-func (agent *LogicalAgent) ListLogicalDevicePorts(ctx context.Context) (*voltha.LogicalPorts, error) {
- logger.Debug("ListLogicalDevicePorts")
- logicalDevice, err := agent.GetLogicalDevice(ctx)
- if err != nil {
- return nil, err
+// listLogicalDevicePorts returns logical device ports
+func (agent *LogicalAgent) listLogicalDevicePorts() map[uint32]*voltha.LogicalPort {
+ logger.Debug("listLogicalDevicePorts")
+ portIDs := agent.portLoader.ListIDs()
+ ret := make(map[uint32]*voltha.LogicalPort, len(portIDs))
+ for portID := range portIDs {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ ret[portID] = portHandle.GetReadOnly()
+ portHandle.Unlock()
+ }
}
- if logicalDevice == nil {
- return &voltha.LogicalPorts{}, nil
- }
- lPorts := make([]*voltha.LogicalPort, 0)
- lPorts = append(lPorts, logicalDevice.Ports...)
- return &voltha.LogicalPorts{Items: lPorts}, nil
+ return ret
}
func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
logger.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
- var err error
switch port.Type {
case voltha.Port_ETHERNET_NNI:
- if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
+ if err := agent.addNNILogicalPort(ctx, device, port); err != nil {
return err
}
- agent.addLogicalPortToMap(port.PortNo, true)
case voltha.Port_ETHERNET_UNI:
- if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
+ if err := agent.addUNILogicalPort(ctx, device, port); err != nil {
return err
}
- agent.addLogicalPortToMap(port.PortNo, false)
case voltha.Port_PON_OLT:
// Rebuilt the routes on Parent PON port addition
go func() {
- if err = agent.buildRoutes(ctx); err != nil {
+ if err := agent.buildRoutes(ctx); err != nil {
// Not an error - temporary state
logger.Infow("failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
}
@@ -72,7 +67,7 @@
case voltha.Port_PON_ONU:
// Add the routes corresponding to that child device
go func() {
- if err = agent.updateAllRoutes(ctx, device); err != nil {
+ if err := agent.updateAllRoutes(ctx, device); err != nil {
// Not an error - temporary state
logger.Infow("failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(device.Ports), "error": err})
}
@@ -134,89 +129,77 @@
//Get UNI port number
for _, port := range device.Ports {
if port.Type == voltha.Port_ETHERNET_NNI {
- if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
+ if err = agent.addNNILogicalPort(ctx, device, port); err != nil {
logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
- agent.addLogicalPortToMap(port.PortNo, true)
}
}
return err
}
// updatePortState updates the port state of the device
-func (agent *LogicalAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
+func (agent *LogicalAgent) updatePortState(ctx context.Context, portNo uint32, operStatus voltha.OperStatus_Types) error {
logger.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+ portHandle, have := agent.portLoader.Lock(portNo)
+ if !have {
+ return status.Errorf(codes.NotFound, "port-%d-not-exist", portNo)
+ }
+ defer portHandle.Unlock()
+
+ newPort := clonePortSetState(portHandle.GetReadOnly(), operStatus)
+ if err := portHandle.Update(ctx, newPort); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- // Get the latest logical device info
- original := agent.getLogicalDeviceWithoutLock()
- updatedPorts := clonePorts(original.Ports)
- for _, port := range updatedPorts {
- if port.DeviceId == deviceID && port.DevicePortNo == portNo {
- if operStatus == voltha.OperStatus_ACTIVE {
- port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
- } else {
- port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
- }
- // Update the logical device
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
- logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
- return err
- }
- return nil
- }
- }
- return status.Errorf(codes.NotFound, "port-%d-not-exist", portNo)
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_MODIFY, newPort.OfpPort)
+ return nil
}
// updatePortsState updates the ports state related to the device
-func (agent *LogicalAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
+func (agent *LogicalAgent) updatePortsState(ctx context.Context, deviceID string, state voltha.OperStatus_Types) error {
logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Get the latest logical device info
- original := agent.getLogicalDeviceWithoutLock()
- updatedPorts := clonePorts(original.Ports)
- for _, port := range updatedPorts {
- if port.DeviceId == device.Id {
- if state == voltha.OperStatus_ACTIVE {
- port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
- } else {
- port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+
+ for portNo := range agent.portLoader.ListIDsForDevice(deviceID) {
+ if portHandle, have := agent.portLoader.Lock(portNo); have {
+ newPort := clonePortSetState(portHandle.GetReadOnly(), state)
+ if err := portHandle.Update(ctx, newPort); err != nil {
+ portHandle.Unlock()
+ return err
}
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_MODIFY, newPort.OfpPort)
+
+ portHandle.Unlock()
}
}
- // Updating the logical device will trigger the poprt change events to be populated to the controller
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
- logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
return nil
}
+func clonePortSetState(oldPort *voltha.LogicalPort, state voltha.OperStatus_Types) *voltha.LogicalPort {
+ newPort := *oldPort // only clone the struct(s) that will be changed
+ newOfpPort := *oldPort.OfpPort
+ newPort.OfpPort = &newOfpPort
+
+ if state == voltha.OperStatus_ACTIVE {
+ newOfpPort.Config = newOfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ newOfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+ } else {
+ newOfpPort.Config = newOfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ newOfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ }
+ return &newPort
+}
+
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
logger.Infow("setupUNILogicalPort", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
var err error
- var added bool
//Get UNI port number
for _, port := range childDevice.Ports {
if port.Type == voltha.Port_ETHERNET_UNI {
- if added, err = agent.addUNILogicalPort(ctx, childDevice, port); err != nil {
+ if err = agent.addUNILogicalPort(ctx, childDevice, port); err != nil {
logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
- if added {
- agent.addLogicalPortToMap(port.PortNo, false)
- }
}
}
return err
@@ -225,86 +208,52 @@
// deleteAllLogicalPorts deletes all logical ports associated with this logical device
func (agent *LogicalAgent) deleteAllLogicalPorts(ctx context.Context) error {
logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Get the latest logical device info
- cloned := agent.getLogicalDeviceWithoutLock()
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, cloned, []*voltha.LogicalPort{}); err != nil {
- logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
- return nil
-}
-
-// deleteLogicalPort removes the logical port
-func (agent *LogicalAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- logicalDevice := agent.getLogicalDeviceWithoutLock()
-
- index := -1
- for i, logicalPort := range logicalDevice.Ports {
- if logicalPort.Id == lPort.Id {
- index = i
- break
- }
- }
- if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
- if index < len(clonedPorts)-1 {
- copy(clonedPorts[index:], clonedPorts[index+1:])
- }
- clonedPorts[len(clonedPorts)-1] = nil
- clonedPorts = clonedPorts[:len(clonedPorts)-1]
- logger.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts); err != nil {
- logger.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- return err
- }
-
- // Remove the logical port from cache
- agent.deleteLogicalPortsFromMap([]uint32{lPort.DevicePortNo})
- // Reset the logical device routes
- go func() {
- if err := agent.buildRoutes(context.Background()); err != nil {
- logger.Warnw("device-routes-not-ready", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
+ // for each port
+ for portID := range agent.portLoader.ListIDs() {
+ // TODO: can just call agent.deleteLogicalPort()?
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ oldPort := portHandle.GetReadOnly()
+ // delete
+ err := portHandle.Delete(ctx)
+ portHandle.Unlock()
+ if err != nil {
+ return err
}
- }()
+ // and send event
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_DELETE, oldPort.OfpPort)
+ }
}
+
+ // Reset the logical device routes
+ go func() {
+ if err := agent.buildRoutes(context.Background()); err != nil {
+ logger.Warnw("device-routes-not-ready", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
+ }
+ }()
return nil
}
// deleteLogicalPorts removes the logical ports associated with that deviceId
func (agent *LogicalAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
logger.Debugw("deleting-logical-ports", log.Fields{"device-id": deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logicalDevice := agent.getLogicalDeviceWithoutLock()
- lPortstoKeep := []*voltha.LogicalPort{}
- lPortsNoToDelete := []uint32{}
- for _, logicalPort := range logicalDevice.Ports {
- if logicalPort.DeviceId != deviceID {
- lPortstoKeep = append(lPortstoKeep, logicalPort)
- } else {
- lPortsNoToDelete = append(lPortsNoToDelete, logicalPort.DevicePortNo)
+ // for each port
+ for portNo := range agent.portLoader.ListIDsForDevice(deviceID) {
+ if portHandle, have := agent.portLoader.Lock(portNo); have {
+ // if belongs to this device
+ if oldPort := portHandle.GetReadOnly(); oldPort.DeviceId == deviceID {
+ // delete
+ if err := portHandle.Delete(ctx); err != nil {
+ portHandle.Unlock()
+ return err
+ }
+ // and send event
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_DELETE, oldPort.OfpPort)
+ }
+ portHandle.Unlock()
}
}
- logger.Debugw("deleted-logical-ports", log.Fields{"ports": lPortstoKeep})
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, lPortstoKeep); err != nil {
- logger.Errorw("logical-device-update-failed", log.Fields{"logical-device-id": agent.logicalDeviceID})
- return err
- }
- // Remove the port from the cached logical ports set
- agent.deleteLogicalPortsFromMap(lPortsNoToDelete)
// Reset the logical device routes
go func() {
@@ -312,335 +261,286 @@
logger.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
-
return nil
}
// enableLogicalPort enables the logical port
-func (agent *LogicalAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+func (agent *LogicalAgent) enableLogicalPort(ctx context.Context, lPortNo uint32) error {
+ portHandle, have := agent.portLoader.Lock(lPortNo)
+ if !have {
+ return status.Errorf(codes.NotFound, "port-%d-not-exist", lPortNo)
+ }
+ defer portHandle.Unlock()
+
+ oldPort := portHandle.GetReadOnly()
+
+ newPort := *oldPort // only clone the struct(s) that will be changed
+ newOfpPort := *oldPort.OfpPort
+ newPort.OfpPort = &newOfpPort
+
+ newOfpPort.Config = newOfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ if err := portHandle.Update(ctx, &newPort); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
-
- logicalDevice := agent.getLogicalDeviceWithoutLock()
-
- index := -1
- for i, logicalPort := range logicalDevice.Ports {
- if logicalPort.Id == lPortID {
- index = i
- break
- }
- }
- if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
- clonedPorts[index].OfpPort.Config = clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
- }
- return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
-}
-
-// disableLogicalPort disabled the logical port
-func (agent *LogicalAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- // Get the most up to date logical device
- logicalDevice := agent.getLogicalDeviceWithoutLock()
- index := -1
- for i, logicalPort := range logicalDevice.Ports {
- if logicalPort.Id == lPortID {
- index = i
- break
- }
- }
- if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
- clonedPorts[index].OfpPort.Config = (clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
- }
- return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
-}
-
-// addNNILogicalPort adds an NNI port to the logical device. It returns a bool representing whether a port has been
-// added and an eror in case a valid error is encountered. If the port was successfully added it will return
-// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
-// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) (bool, error) {
- logger.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
-
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
-
- defer agent.requestQueue.RequestComplete()
- if agent.portExist(device, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
-
- // TODO: Change the port creation logic to include the port capability. This will eliminate the port capability
- // request that the Core makes following a port create event.
- var portCap *ic.PortCapability
- var err error
- // First get the port capability
- if portCap, err = agent.deviceMgr.getPortCapability(ctx, device.Id, port.PortNo); err != nil {
- logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return false, err
- }
-
- portCap.Port.RootPort = true
- lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
- lp.DeviceId = device.Id
- lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
- lp.OfpPort.PortNo = port.PortNo
- lp.OfpPort.Name = lp.Id
- lp.DevicePortNo = port.PortNo
-
- ld := agent.getLogicalDeviceWithoutLock()
-
- clonedPorts := clonePorts(ld.Ports)
- if clonedPorts == nil {
- clonedPorts = make([]*voltha.LogicalPort, 0)
- }
- clonedPorts = append(clonedPorts, lp)
-
- if err = agent.addLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts, lp, device); err != nil {
- logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
- return false, err
- }
-
- return true, nil
-}
-
-func (agent *LogicalAgent) portExist(device *voltha.Device, port *voltha.Port) bool {
- ldevice := agent.getLogicalDeviceWithoutLock()
- for _, lPort := range ldevice.Ports {
- if lPort.DeviceId == device.Id && lPort.DevicePortNo == port.PortNo {
- return true
- }
- }
- return false
-}
-
-// addUNILogicalPort adds an UNI port to the logical device. It returns a bool representing whether a port has been
-// added and an eror in case a valid error is encountered. If the port was successfully added it will return
-// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
-// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) (bool, error) {
- logger.Debugw("addUNILogicalPort", log.Fields{"port": port})
- if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
- logger.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
- return false, nil
- }
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
- defer agent.requestQueue.RequestComplete()
-
- if agent.portExist(childDevice, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
-
- // TODO: Change the port creation logic to include the port capability. This will eliminate the port capability
- // request that the Core makes following a port create event.
-
- var portCap *ic.PortCapability
- var err error
- // First get the port capability
- if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, port.PortNo); err != nil {
- logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return false, err
- }
-
- // Get stored logical device
- ldevice := agent.getLogicalDeviceWithoutLock()
-
- logger.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
- portCap.Port.RootPort = false
- portCap.Port.Id = port.Label
- portCap.Port.OfpPort.PortNo = port.PortNo
- portCap.Port.DeviceId = childDevice.Id
- portCap.Port.DevicePortNo = port.PortNo
- clonedPorts := clonePorts(ldevice.Ports)
- if clonedPorts == nil {
- clonedPorts = make([]*voltha.LogicalPort, 0)
- }
- clonedPorts = append(clonedPorts, portCap.Port)
-
- if err = agent.addLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts, portCap.Port, childDevice); err != nil {
- return false, err
- }
-
- return true, nil
-}
-
-func clonePorts(ports []*voltha.LogicalPort) []*voltha.LogicalPort {
- return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
-}
-
-//updateLogicalDevicePortsWithoutLock updates the
-func (agent *LogicalAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
- oldPorts := device.Ports
- device.Ports = newPorts
- if err := agent.updateLogicalDeviceWithoutLock(ctx, device); err != nil {
- return err
- }
- agent.portUpdated(oldPorts, newPorts)
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_MODIFY, newPort.OfpPort)
return nil
}
-// addLogicalDevicePortsWithoutLock add the new ports to the logical device, update routes associated with those new
-// ports and send an add port event to the OF controller
-func (agent *LogicalAgent) addLogicalDevicePortsWithoutLock(ctx context.Context, lDevice *voltha.LogicalDevice, newPorts []*voltha.LogicalPort, lp *voltha.LogicalPort, device *voltha.Device) error {
- oldPorts := lDevice.Ports
- lDevice.Ports = newPorts
- if err := agent.updateLogicalDeviceWithoutLock(ctx, lDevice); err != nil {
+// disableLogicalPort disabled the logical port
+func (agent *LogicalAgent) disableLogicalPort(ctx context.Context, lPortNo uint32) error {
+ portHandle, have := agent.portLoader.Lock(lPortNo)
+ if !have {
+ return status.Errorf(codes.NotFound, "port-%d-not-exist", lPortNo)
+ }
+ defer portHandle.Unlock()
+
+ oldPort := portHandle.GetReadOnly()
+
+ newPort := *oldPort // only clone the struct(s) that will be changed
+ newOfpPort := *oldPort.OfpPort
+ newPort.OfpPort = &newOfpPort
+
+ newOfpPort.Config = (newOfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ if err := portHandle.Update(ctx, &newPort); err != nil {
return err
}
+ agent.orderedEvents.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_MODIFY, newPort.OfpPort)
+ return nil
+}
+
+// addNNILogicalPort adds an NNI port to the logical device. It returns a bool representing whether a port has been
+// added and an error in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+ logger.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
+
+ label := fmt.Sprintf("nni-%d", port.PortNo)
+ tmpPort := &voltha.LogicalPort{
+ RootPort: true,
+ DeviceId: device.Id,
+ Id: label,
+ DevicePortNo: port.PortNo,
+ OfpPort: &voltha.OfpPort{
+ PortNo: port.PortNo,
+ Name: label,
+ },
+ OfpPortStats: &ofp.OfpPortStats{},
+ }
+
+ portHandle, created, err := agent.portLoader.LockOrCreate(ctx, tmpPort)
+ if err != nil {
+ return err
+ }
+ defer portHandle.Unlock()
+
+ if !created {
+ logger.Debugw("port-already-exist", log.Fields{"port": port})
+ return nil
+ }
+
+ // TODO: VOL-3202 Change the port creation logic to include the port capability. This will eliminate
+ // the port capability request that the Core makes following a port create event.
+ // TODO: VOL-3202 the port lock should not be held while getPortCapability() runs (preferably not while *any*
+ // external request runs), this is a temporary hack to avoid updating port state before the port is ready
+
+ // First get the port capability
+ portCap, err := agent.deviceMgr.getPortCapability(ctx, device.Id, port.PortNo)
+ if err != nil {
+ logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
+ return err
+ }
+
+ newPort := portCap.Port
+ newPort.RootPort = true
+ newPort.DeviceId = device.Id
+ newPort.Id = label
+ newPort.DevicePortNo = port.PortNo
+ newPort.OfpPort.PortNo = port.PortNo
+ newPort.OfpPort.Name = label
+
+ // TODO: VOL-3202 shouldn't create tmp port then update, should prepare complete port first then LockOrCreate()
+ // the use of context.Background() is required to ensure we don't get an inconsistent logical port state
+ // while doing this, and can be removed later.
+ if err := portHandle.Update(ctx, newPort); err != nil {
+ if err := portHandle.Delete(context.Background()); err != nil {
+ return fmt.Errorf("unable-to-delete-%d: %s", port.PortNo, err)
+ }
+ return err
+ }
+
+ // ensure that no events will be sent until this one is
+ queuePosition := agent.orderedEvents.assignQueuePosition()
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
// First setup the routes
- if err := agent.updateRoutes(context.Background(), device, lp, newPorts); err != nil {
+ if err := agent.updateRoutes(context.Background(), device, newPort, agent.listLogicalDevicePorts()); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
- logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
+ logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": newPort.OfpPort.PortNo, "error": err})
}
- // Send a port update event
- agent.portUpdated(oldPorts, newPorts)
+ // send event, and allow any queued events to be sent as well
+ queuePosition.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, newPort.OfpPort)
}()
-
return nil
}
-// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
-func diff(oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
- newPorts = make(map[string]*voltha.LogicalPort, len(newList))
- changedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
- deletedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
-
- for _, n := range newList {
- newPorts[n.Id] = n
+// addUNILogicalPort adds an UNI port to the logical device. It returns a bool representing whether a port has been
+// added and an error in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) error {
+ logger.Debugw("addUNILogicalPort", log.Fields{"port": port})
+ if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
+ logger.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
+ return nil
}
- for _, o := range oldList {
- if n, have := newPorts[o.Id]; have {
- delete(newPorts, o.Id) // not new
- if !proto.Equal(n, o) {
- changedPorts[n.Id] = n // changed
- }
- } else {
- deletedPorts[o.Id] = o // deleted
+ tmpPort := &voltha.LogicalPort{
+ RootPort: false,
+ DeviceId: childDevice.Id,
+ Id: port.Label,
+ DevicePortNo: port.PortNo,
+ OfpPort: &voltha.OfpPort{
+ PortNo: port.PortNo,
+ },
+ OfpPortStats: &ofp.OfpPortStats{},
+ }
+
+ portHandle, created, err := agent.portLoader.LockOrCreate(ctx, tmpPort)
+ if err != nil {
+ return err
+ }
+ defer portHandle.Unlock()
+
+ if !created {
+ logger.Debugw("port-already-exist", log.Fields{"port": port})
+ return nil
+ }
+
+ // TODO: VOL-3202 Change the port creation logic to include the port capability. This will eliminate
+ // the port capability request that the Core makes following a port create event.
+ // TODO: VOL-3202 the port lock should not be held while getPortCapability() runs (preferably not while *any*
+ // external request runs), this is a temporary hack to avoid updating port state before the port is ready
+
+ // First get the port capability
+ portCap, err := agent.deviceMgr.getPortCapability(ctx, childDevice.Id, port.PortNo)
+ if err != nil {
+ logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
+ return err
+ }
+
+ logger.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
+ newPort := portCap.Port
+ newPort.RootPort = false
+ newPort.DeviceId = childDevice.Id
+ newPort.Id = port.Label
+ newPort.DevicePortNo = port.PortNo
+ newPort.OfpPort.PortNo = port.PortNo
+
+ // TODO: VOL-3202 shouldn't create tmp port then update, should prepare complete port first then LockOrCreate()
+ // the use of context.Background() is required to ensure we don't get an inconsistent logical port state
+ // while doing this, and can be removed later.
+ if err := portHandle.Update(ctx, newPort); err != nil {
+ if err := portHandle.Delete(context.Background()); err != nil {
+ return fmt.Errorf("unable-to-delete-%d: %s", port.PortNo, err)
}
+ return err
}
- return newPorts, changedPorts, deletedPorts
-}
+ // ensure that no events will be sent until this one is
+ queuePosition := agent.orderedEvents.assignQueuePosition()
-// portUpdated is invoked when a port is updated on the logical device
-func (agent *LogicalAgent) portUpdated(prevPorts, currPorts []*voltha.LogicalPort) interface{} {
- // Get the difference between the two list
- newPorts, changedPorts, deletedPorts := diff(prevPorts, currPorts)
-
- // Send the port change events to the OF controller
- for _, newP := range newPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
- }
- for _, change := range changedPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
- }
- for _, del := range deletedPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
- }
-
+ // Setup the routes for this device and then send the port update event to the OF Controller
+ go func() {
+ // First setup the routes
+ if err := agent.updateRoutes(context.Background(), childDevice, newPort, agent.listLogicalDevicePorts()); err != nil {
+ // This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
+ // created yet.
+ logger.Infow("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": newPort.OfpPort.PortNo, "error": err})
+ }
+ // send event, and allow any queued events to be sent as well
+ queuePosition.send(agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, newPort.OfpPort)
+ }()
return nil
}
-//GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
-//returns their port numbers. This function is invoked only during flow decomposition where the lock on the logical
-//device is already held. Therefore it is safe to retrieve the logical device without lock.
-func (agent *LogicalAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
- lPorts := make([]uint32, 0)
- var exclPort uint32
- if len(excludePort) == 1 {
- exclPort = excludePort[0]
- }
- lDevice := agent.getLogicalDeviceWithoutLock()
- for _, port := range lDevice.Ports {
- if port.OfpPort.PortNo != exclPort {
- lPorts = append(lPorts, port.OfpPort.PortNo)
- }
- }
- return lPorts
+// send is a convenience to avoid calling both assignQueuePosition and qp.send
+func (e *orderedEvents) send(agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
+ qp := e.assignQueuePosition()
+ go qp.send(agent, deviceID, reason, desc)
}
-// helpers for agent.logicalPortsNo
+// TODO: shouldn't need to guarantee event ordering like this
+// event ordering should really be protected by per-LogicalPort lock
+// once routing uses on-demand calculation only, this should be changed
+// assignQueuePosition ensures that no events will be sent until this thread calls send() on the returned queuePosition
+func (e *orderedEvents) assignQueuePosition() queuePosition {
+ e.mutex.Lock()
+ defer e.mutex.Unlock()
-func (agent *LogicalAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
- agent.lockLogicalPortsNo.Lock()
- defer agent.lockLogicalPortsNo.Unlock()
- if exist := agent.logicalPortsNo[portNo]; !exist {
- agent.logicalPortsNo[portNo] = nniPort
+ prev := e.last
+ next := make(chan struct{})
+ e.last = next
+ return queuePosition{
+ prev: prev,
+ next: next,
}
}
-func (agent *LogicalAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
- agent.lockLogicalPortsNo.Lock()
- defer agent.lockLogicalPortsNo.Unlock()
- for _, lp := range lps {
- if exist := agent.logicalPortsNo[lp.DevicePortNo]; !exist {
- agent.logicalPortsNo[lp.DevicePortNo] = lp.RootPort
- }
- }
+// orderedEvents guarantees the order that events are sent, while allowing events to back up.
+type orderedEvents struct {
+ mutex sync.Mutex
+ last <-chan struct{}
}
-func (agent *LogicalAgent) deleteLogicalPortsFromMap(portsNo []uint32) {
- agent.lockLogicalPortsNo.Lock()
- defer agent.lockLogicalPortsNo.Unlock()
- for _, pNo := range portsNo {
- delete(agent.logicalPortsNo, pNo)
- }
+type queuePosition struct {
+ prev <-chan struct{}
+ next chan<- struct{}
}
+// send waits for its turn, then sends the event, then notifies the next in line
+func (qp queuePosition) send(agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
+ if qp.prev != nil {
+ <-qp.prev // wait for turn
+ }
+ agent.ldeviceMgr.SendChangeEvent(deviceID, reason, desc)
+ close(qp.next) // notify next
+}
+
+// GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
+// returns their port numbers.
+func (agent *LogicalAgent) GetWildcardInputPorts(excludePort uint32) map[uint32]struct{} {
+ portIDs := agent.portLoader.ListIDs()
+ delete(portIDs, excludePort)
+ return portIDs
+}
+
+// isNNIPort return true iff the specified port belongs to the parent (OLT) device
func (agent *LogicalAgent) isNNIPort(portNo uint32) bool {
- agent.lockLogicalPortsNo.RLock()
- defer agent.lockLogicalPortsNo.RUnlock()
- if exist := agent.logicalPortsNo[portNo]; exist {
- return agent.logicalPortsNo[portNo]
+ portHandle, have := agent.portLoader.Lock(portNo)
+ if !have {
+ return false
}
- return false
+ defer portHandle.Unlock()
+
+ // any root-device logical port is an NNI port
+ return portHandle.GetReadOnly().RootPort
}
-func (agent *LogicalAgent) getFirstNNIPort() (uint32, error) {
- agent.lockLogicalPortsNo.RLock()
- defer agent.lockLogicalPortsNo.RUnlock()
- for portNo, nni := range agent.logicalPortsNo {
- if nni {
- return portNo, nil
- }
+// getAnyNNIPort returns an NNI port
+func (agent *LogicalAgent) getAnyNNIPort() (uint32, error) {
+ for portID := range agent.portLoader.ListIDsForDevice(agent.rootDeviceID) {
+ return portID, nil
}
return 0, status.Error(codes.NotFound, "No NNI port found")
}
-//GetNNIPorts returns NNI ports.
-func (agent *LogicalAgent) GetNNIPorts() []uint32 {
- agent.lockLogicalPortsNo.RLock()
- defer agent.lockLogicalPortsNo.RUnlock()
- nniPorts := make([]uint32, 0)
- for portNo, nni := range agent.logicalPortsNo {
- if nni {
- nniPorts = append(nniPorts, portNo)
- }
- }
- return nniPorts
+//GetNNIPorts returns all NNI ports
+func (agent *LogicalAgent) GetNNIPorts() map[uint32]struct{} {
+ return agent.portLoader.ListIDsForDevice(agent.rootDeviceID)
}
// getUNILogicalPortNo returns the UNI logical port number specified in the flow
diff --git a/rw_core/core/device/logical_agent_route.go b/rw_core/core/device/logical_agent_route.go
index fa96caf..dbf5e57 100644
--- a/rw_core/core/device/logical_agent_route.go
+++ b/rw_core/core/device/logical_agent_route.go
@@ -19,6 +19,7 @@
import (
"context"
"fmt"
+
"github.com/opencord/voltha-go/rw_core/route"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
@@ -34,7 +35,7 @@
// Controller-bound flow
if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
- logger.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
+ logger.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo})
if agent.isNNIPort(ingressPortNo) {
//This is a trap on the NNI Port
if agent.deviceRoutes.IsRoutesEmpty() {
@@ -55,7 +56,7 @@
}
// Treat it as if the output port is the first NNI of the OLT
var err error
- if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
+ if egressPortNo, err = agent.getAnyNNIPort(); err != nil {
logger.Warnw("no-nni-port", log.Fields{"error": err})
return nil, err
}
@@ -97,13 +98,7 @@
}
defer agent.requestQueue.RequestComplete()
- if agent.deviceRoutes == nil {
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
- }
- // Get all the logical ports on that logical device
- lDevice := agent.getLogicalDeviceWithoutLock()
-
- if err := agent.deviceRoutes.ComputeRoutes(ctx, lDevice.Ports); err != nil {
+ if err := agent.deviceRoutes.ComputeRoutes(ctx, agent.listLogicalDevicePorts()); err != nil {
return err
}
if err := agent.deviceRoutes.Print(); err != nil {
@@ -113,7 +108,7 @@
}
//updateRoutes updates the device routes
-func (agent *LogicalAgent) updateRoutes(ctx context.Context, device *voltha.Device, lp *voltha.LogicalPort, lps []*voltha.LogicalPort) error {
+func (agent *LogicalAgent) updateRoutes(ctx context.Context, device *voltha.Device, lp *voltha.LogicalPort, lps map[uint32]*voltha.LogicalPort) error {
logger.Debugw("updateRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": device.Id, "port:": lp})
if err := agent.deviceRoutes.AddPort(ctx, lp, device, lps); err != nil {
@@ -129,12 +124,7 @@
func (agent *LogicalAgent) updateAllRoutes(ctx context.Context, device *voltha.Device) error {
logger.Debugw("updateAllRoutes", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-id": device.Id, "ports-count": len(device.Ports)})
- ld, err := agent.GetLogicalDevice(ctx)
- if err != nil {
- return err
- }
-
- if err := agent.deviceRoutes.AddAllPorts(ctx, device, ld.Ports); err != nil {
+ if err := agent.deviceRoutes.AddAllPorts(ctx, device, agent.listLogicalDevicePorts()); err != nil {
return err
}
if err := agent.deviceRoutes.Print(); err != nil {
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 2e1b1d3..40c6b9c 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -40,328 +40,6 @@
"github.com/stretchr/testify/assert"
)
-func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
- currentLogicalPorts := []*voltha.LogicalPort{}
- updatedLogicalPorts := []*voltha.LogicalPort{}
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
- assert.Equal(t, 0, len(newPorts))
- assert.Equal(t, 0, len(changedPorts))
- assert.Equal(t, 0, len(deletedPorts))
-}
-
-func TestLogicalDeviceAgent_diff_nochange_2(t *testing.T) {
- currentLogicalPorts := []*voltha.LogicalPort{
- {
- Id: "1231",
- DeviceId: "d1234",
- DevicePortNo: 1,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 1,
- Name: "port1",
- Config: 1,
- State: 1,
- },
- },
- {
- Id: "1232",
- DeviceId: "d1234",
- DevicePortNo: 2,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 2,
- Name: "port2",
- Config: 1,
- State: 1,
- },
- },
- {
- Id: "1233",
- DeviceId: "d1234",
- DevicePortNo: 3,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 3,
- Name: "port3",
- Config: 1,
- State: 1,
- },
- },
- }
- updatedLogicalPorts := []*voltha.LogicalPort{
- {
- Id: "1231",
- DeviceId: "d1234",
- DevicePortNo: 1,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 1,
- Name: "port1",
- Config: 1,
- State: 1,
- },
- },
- {
- Id: "1232",
- DeviceId: "d1234",
- DevicePortNo: 2,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 2,
- Name: "port2",
- Config: 1,
- State: 1,
- },
- },
- {
- Id: "1233",
- DeviceId: "d1234",
- DevicePortNo: 3,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 3,
- Name: "port3",
- Config: 1,
- State: 1,
- },
- },
- }
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
- assert.Equal(t, 0, len(newPorts))
- assert.Equal(t, 0, len(changedPorts))
- assert.Equal(t, 0, len(deletedPorts))
-}
-
-func TestLogicalDeviceAgent_diff_add(t *testing.T) {
- currentLogicalPorts := []*voltha.LogicalPort{}
- updatedLogicalPorts := []*voltha.LogicalPort{
- {
- Id: "1231",
- DeviceId: "d1234",
- DevicePortNo: 1,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 1,
- Name: "port1",
- Config: 1,
- State: 1,
- },
- },
- {
- Id: "1232",
- DeviceId: "d1234",
- DevicePortNo: 2,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 2,
- Name: "port2",
- Config: 1,
- State: 1,
- },
- },
- }
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
- assert.Equal(t, 2, len(newPorts))
- assert.Equal(t, 0, len(changedPorts))
- assert.Equal(t, 0, len(deletedPorts))
- assert.Equal(t, updatedLogicalPorts[0], newPorts[updatedLogicalPorts[0].Id])
- assert.Equal(t, updatedLogicalPorts[1], newPorts[updatedLogicalPorts[1].Id])
-}
-
-func TestLogicalDeviceAgent_diff_delete(t *testing.T) {
- currentLogicalPorts := []*voltha.LogicalPort{
- {
- Id: "1231",
- DeviceId: "d1234",
- DevicePortNo: 1,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 1,
- Name: "port1",
- Config: 1,
- State: 1,
- },
- },
- }
- updatedLogicalPorts := []*voltha.LogicalPort{}
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
- assert.Equal(t, 0, len(newPorts))
- assert.Equal(t, 0, len(changedPorts))
- assert.Equal(t, 1, len(deletedPorts))
- assert.Equal(t, currentLogicalPorts[0], deletedPorts[currentLogicalPorts[0].Id])
-}
-
-func TestLogicalDeviceAgent_diff_changed(t *testing.T) {
- currentLogicalPorts := []*voltha.LogicalPort{
- {
- Id: "1231",
- DeviceId: "d1234",
- DevicePortNo: 1,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 1,
- Name: "port1",
- Config: 1,
- State: 1,
- },
- },
- {
- Id: "1232",
- DeviceId: "d1234",
- DevicePortNo: 2,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 2,
- Name: "port2",
- Config: 1,
- State: 1,
- },
- },
- {
- Id: "1233",
- DeviceId: "d1234",
- DevicePortNo: 3,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 3,
- Name: "port3",
- Config: 1,
- State: 1,
- },
- },
- }
- updatedLogicalPorts := []*voltha.LogicalPort{
- {
- Id: "1231",
- DeviceId: "d1234",
- DevicePortNo: 1,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 1,
- Name: "port1",
- Config: 4,
- State: 4,
- },
- },
- {
- Id: "1232",
- DeviceId: "d1234",
- DevicePortNo: 2,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 2,
- Name: "port2",
- Config: 4,
- State: 4,
- },
- },
- {
- Id: "1233",
- DeviceId: "d1234",
- DevicePortNo: 3,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 3,
- Name: "port3",
- Config: 1,
- State: 1,
- },
- },
- }
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
- assert.Equal(t, 0, len(newPorts))
- assert.Equal(t, 2, len(changedPorts))
- assert.Equal(t, 0, len(deletedPorts))
- assert.Equal(t, updatedLogicalPorts[0], changedPorts[updatedLogicalPorts[0].Id])
- assert.Equal(t, updatedLogicalPorts[1], changedPorts[updatedLogicalPorts[1].Id])
-}
-
-func TestLogicalDeviceAgent_diff_mix(t *testing.T) {
- currentLogicalPorts := []*voltha.LogicalPort{
- {
- Id: "1231",
- DeviceId: "d1234",
- DevicePortNo: 1,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 1,
- Name: "port1",
- Config: 1,
- State: 1,
- },
- },
- {
- Id: "1232",
- DeviceId: "d1234",
- DevicePortNo: 2,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 2,
- Name: "port2",
- Config: 1,
- State: 1,
- },
- },
- {
- Id: "1233",
- DeviceId: "d1234",
- DevicePortNo: 3,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 3,
- Name: "port3",
- Config: 1,
- State: 1,
- },
- },
- }
- updatedLogicalPorts := []*voltha.LogicalPort{
- {
- Id: "1231",
- DeviceId: "d1234",
- DevicePortNo: 1,
- RootPort: true,
- OfpPort: &ofp.OfpPort{
- PortNo: 1,
- Name: "port1",
- Config: 4,
- State: 4,
- },
- },
- {
- Id: "1232",
- DeviceId: "d1234",
- DevicePortNo: 2,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 2,
- Name: "port2",
- Config: 4,
- State: 4,
- },
- },
- {
- Id: "1234",
- DeviceId: "d1234",
- DevicePortNo: 4,
- RootPort: false,
- OfpPort: &ofp.OfpPort{
- PortNo: 4,
- Name: "port4",
- Config: 4,
- State: 4,
- },
- },
- }
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
- assert.Equal(t, 1, len(newPorts))
- assert.Equal(t, 2, len(changedPorts))
- assert.Equal(t, 1, len(deletedPorts))
- assert.Equal(t, updatedLogicalPorts[0], changedPorts[updatedLogicalPorts[0].Id])
- assert.Equal(t, updatedLogicalPorts[1], changedPorts[updatedLogicalPorts[1].Id])
- assert.Equal(t, currentLogicalPorts[2], deletedPorts[currentLogicalPorts[2].Id])
-}
-
type LDATest struct {
etcdServer *mock_etcd.EtcdServer
deviceMgr *Manager
@@ -442,7 +120,7 @@
DevicePortNo: 3,
RootPort: false,
OfpPort: &ofp.OfpPort{
- PortNo: 4,
+ PortNo: 3,
Name: "port3",
Config: 4,
State: 4,
@@ -508,6 +186,16 @@
clonedLD.DatapathId = rand.Uint64()
lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
+ for _, port := range clonedLD.Ports {
+ handle, created, err := lDeviceAgent.portLoader.LockOrCreate(context.Background(), port)
+ if err != nil {
+ panic(err)
+ }
+ handle.Unlock()
+ if !created {
+ t.Errorf("port %d already exists", port.OfpPort.PortNo)
+ }
+ }
err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
assert.Nil(t, err)
lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
@@ -522,7 +210,7 @@
// Change the state of the first port to FAILED
localWG.Add(1)
go func() {
- err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
+ err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
assert.Nil(t, err)
localWG.Done()
}()
@@ -530,7 +218,7 @@
// Change the state of the second port to TESTING
localWG.Add(1)
go func() {
- err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
+ err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
assert.Nil(t, err)
localWG.Done()
}()
@@ -538,9 +226,9 @@
// Change the state of the third port to UNKNOWN and then back to ACTIVE
localWG.Add(1)
go func() {
- err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
+ err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
assert.Nil(t, err)
- err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
+ err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
assert.Nil(t, err)
localWG.Done()
}()
@@ -582,9 +270,11 @@
expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
- updatedLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
- assert.NotNil(t, updatedLogicalDevice)
- assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice))
+
+ updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts()
+ for _, p := range expectedChange.Ports {
+ assert.True(t, proto.Equal(p, updatedLogicalDevicePorts[p.DevicePortNo]))
+ }
globalWG.Done()
}
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index d6ca4ce..b3f26da 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -20,6 +20,7 @@
"context"
"errors"
"io"
+ "strconv"
"strings"
"sync"
"time"
@@ -260,6 +261,8 @@
}
// Device is child device
// retrieve parent device using child device ID
+ // TODO: return (string, have) instead of *string
+ // also: If not root device, just return device.parentID instead of loading the parent device.
if parentDevice := ldMgr.deviceMgr.getParentDevice(ctx, device); parentDevice != nil {
return &parentDevice.ParentId, nil
}
@@ -276,26 +279,6 @@
return ldMgr.getLogicalDeviceID(ctx, device)
}
-func (ldMgr *LogicalManager) getLogicalPortID(ctx context.Context, device *voltha.Device) (*voltha.LogicalPortId, error) {
- // Get the logical device where this device is attached
- var lDeviceID *string
- var err error
- if lDeviceID, err = ldMgr.getLogicalDeviceID(ctx, device); err != nil {
- return nil, err
- }
- var lDevice *voltha.LogicalDevice
- if lDevice, err = ldMgr.GetLogicalDevice(ctx, &voltha.ID{Id: *lDeviceID}); err != nil {
- return nil, err
- }
- // Go over list of ports
- for _, port := range lDevice.Ports {
- if port.DeviceId == device.Id {
- return &voltha.LogicalPortId{Id: *lDeviceID, PortId: port.Id}, nil
- }
- }
- return nil, status.Errorf(codes.NotFound, "%s", device.Id)
-}
-
// ListLogicalDeviceFlows returns the flows of logical device
func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
logger.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id.Id})
@@ -333,22 +316,29 @@
// ListLogicalDevicePorts returns logical device ports
func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id.Id})
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
- return agent.ListLogicalDevicePorts(ctx)
+ agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- return nil, status.Errorf(codes.NotFound, "%s", id.Id)
+
+ ports := agent.listLogicalDevicePorts()
+ ctr, ret := 0, make([]*voltha.LogicalPort, len(ports))
+ for _, port := range ports {
+ ret[ctr] = port
+ ctr++
+ }
+ return &voltha.LogicalPorts{Items: ret}, nil
}
// GetLogicalDevicePort returns logical device port details
func (ldMgr *LogicalManager) GetLogicalDevicePort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
- // Get the logical device where this device is attached
- var err error
- var lDevice *voltha.LogicalDevice
- if lDevice, err = ldMgr.GetLogicalDevice(ctx, &voltha.ID{Id: lPortID.Id}); err != nil {
- return nil, err
+ // Get the logical device where this port is attached
+ agent := ldMgr.getLogicalDeviceAgent(ctx, lPortID.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", lPortID.Id)
}
- // Go over list of ports
- for _, port := range lDevice.Ports {
+
+ for _, port := range agent.listLogicalDevicePorts() {
if port.Id == lPortID.PortId {
return port, nil
}
@@ -373,30 +363,6 @@
return nil
}
-// deleteLogicalPort removes the logical port associated with a device
-func (ldMgr *LogicalManager) deleteLogicalPort(ctx context.Context, lPortID *voltha.LogicalPortId) error {
- logger.Debugw("deleting-logical-port", log.Fields{"LDeviceId": lPortID.Id})
- // Get logical port
- var logicalPort *voltha.LogicalPort
- var err error
- if logicalPort, err = ldMgr.GetLogicalDevicePort(ctx, lPortID); err != nil {
- logger.Debugw("no-logical-device-port-present", log.Fields{"logicalPortId": lPortID.PortId})
- return err
- }
- // Sanity check
- if logicalPort.RootPort {
- return errors.New("device-root")
- }
- if agent := ldMgr.getLogicalDeviceAgent(ctx, lPortID.Id); agent != nil {
- if err := agent.deleteLogicalPort(ctx, logicalPort); err != nil {
- logger.Warnw("deleting-logicalport-failed", log.Fields{"LDeviceId": lPortID.Id, "error": err})
- }
- }
-
- logger.Debug("deleting-logical-port-ends")
- return nil
-}
-
// deleteLogicalPort removes the logical port associated with a child device
func (ldMgr *LogicalManager) deleteLogicalPorts(ctx context.Context, deviceID string) error {
logger.Debugw("deleting-logical-ports", log.Fields{"device-id": deviceID})
@@ -469,7 +435,7 @@
return err
}
if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
- if err := agent.updatePortState(ctx, deviceID, portNo, state); err != nil {
+ if err := agent.updatePortState(ctx, portNo, state); err != nil {
return err
}
}
@@ -487,7 +453,7 @@
return err
}
if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
- if err := agent.updatePortsState(ctx, device, state); err != nil {
+ if err := agent.updatePortsState(ctx, device.Id, state); err != nil {
return err
}
}
@@ -547,7 +513,11 @@
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- return &empty.Empty{}, agent.enableLogicalPort(ctx, id.PortId)
+ portNo, err := strconv.ParseUint(id.PortId, 10, 32)
+ if err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "failed to parse %s as a number", id.PortId)
+ }
+ return &empty.Empty{}, agent.enableLogicalPort(ctx, uint32(portNo))
}
// DisableLogicalDevicePort disables logical device port
@@ -557,7 +527,11 @@
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- return &empty.Empty{}, agent.disableLogicalPort(ctx, id.PortId)
+ portNo, err := strconv.ParseUint(id.PortId, 10, 32)
+ if err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "failed to parse %s as a number", id.PortId)
+ }
+ return &empty.Empty{}, agent.disableLogicalPort(ctx, uint32(portNo))
}
func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
diff --git a/rw_core/core/device/logical_port/common.go b/rw_core/core/device/logical_port/common.go
new file mode 100644
index 0000000..85e6af2
--- /dev/null
+++ b/rw_core/core/device/logical_port/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package port
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "port"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/device/logical_port/loader.go b/rw_core/core/device/logical_port/loader.go
new file mode 100644
index 0000000..ab6713e
--- /dev/null
+++ b/rw_core/core/device/logical_port/loader.go
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package port
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Loader hides all low-level locking & synchronization related to port state updates
+type Loader struct {
+ dbProxy *model.Proxy
+ // this lock protects the ports map, it does not protect individual ports
+ lock sync.RWMutex
+ ports map[uint32]*chunk
+ deviceLookup map[string]map[uint32]struct{}
+}
+
+// chunk keeps a port and the lock for this port
+type chunk struct {
+ // this lock is used to synchronize all access to the port, and also to the "deleted" variable
+ lock sync.Mutex
+ deleted bool
+
+ port *voltha.LogicalPort
+}
+
+func NewLoader(dbProxy *model.Proxy) *Loader {
+ return &Loader{
+ dbProxy: dbProxy,
+ ports: make(map[uint32]*chunk),
+ deviceLookup: make(map[string]map[uint32]struct{}),
+ }
+}
+
+// Load queries existing ports from the kv,
+// and should only be called once when first created.
+func (loader *Loader) Load(ctx context.Context) {
+ loader.lock.Lock()
+ defer loader.lock.Unlock()
+
+ var ports []*voltha.LogicalPort
+ if err := loader.dbProxy.List(ctx, &ports); err != nil {
+ logger.Errorw("failed-to-list-ports-from-cluster-data-proxy", log.Fields{"error": err})
+ return
+ }
+ for _, port := range ports {
+ loader.ports[port.OfpPort.PortNo] = &chunk{port: port}
+ loader.addLookup(port.DeviceId, port.OfpPort.PortNo)
+ }
+}
+
+// LockOrCreate locks this port if it exists, or creates a new port if it does not.
+// In the case of port creation, the provided "port" must not be modified afterwards.
+func (loader *Loader) LockOrCreate(ctx context.Context, port *voltha.LogicalPort) (*Handle, bool, error) {
+ // try to use read lock instead of full lock if possible
+ if handle, have := loader.Lock(port.OfpPort.PortNo); have {
+ return handle, false, nil
+ }
+
+ loader.lock.Lock()
+ entry, have := loader.ports[port.OfpPort.PortNo]
+ if !have {
+ entry := &chunk{port: port}
+ loader.ports[port.OfpPort.PortNo] = entry
+ loader.addLookup(port.DeviceId, port.OfpPort.PortNo)
+ entry.lock.Lock()
+ loader.lock.Unlock()
+
+ if err := loader.dbProxy.Set(ctx, fmt.Sprint(port.OfpPort.PortNo), port); err != nil {
+ // revert the map
+ loader.lock.Lock()
+ delete(loader.ports, port.OfpPort.PortNo)
+ loader.removeLookup(port.DeviceId, port.OfpPort.PortNo)
+ loader.lock.Unlock()
+
+ entry.deleted = true
+ entry.lock.Unlock()
+ return nil, false, err
+ }
+ return &Handle{loader: loader, chunk: entry}, true, nil
+ }
+ loader.lock.Unlock()
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.LockOrCreate(ctx, port)
+ }
+ return &Handle{loader: loader, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this port, and returns a handle which can be used to access the port until it's unlocked.
+// This handle ensures that the port cannot be accessed if the lock is not held.
+// Returns false if the port is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (loader *Loader) Lock(id uint32) (*Handle, bool) {
+ loader.lock.RLock()
+ entry, have := loader.ports[id]
+ loader.lock.RUnlock()
+
+ if !have {
+ return nil, false
+ }
+
+ entry.lock.Lock()
+ if entry.deleted {
+ entry.lock.Unlock()
+ return loader.Lock(id)
+ }
+ return &Handle{loader: loader, chunk: entry}, true
+}
+
+// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
+// This enforces correct Lock()-Usage()-Unlock() ordering.
+type Handle struct {
+ loader *Loader
+ chunk *chunk
+}
+
+// GetReadOnly returns an *voltha.LogicalPort which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *voltha.LogicalPort {
+ return h.chunk.port
+}
+
+// Update updates an existing port in the kv.
+// The provided "port" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, port *voltha.LogicalPort) error {
+ if err := h.loader.dbProxy.Set(ctx, fmt.Sprint(port.OfpPort.PortNo), port); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-port-%v: %s", port.OfpPort.PortNo, err)
+ }
+ h.chunk.port = port
+ return nil
+}
+
+// Delete removes the device from the kv
+func (h *Handle) Delete(ctx context.Context) error {
+ if err := h.loader.dbProxy.Remove(ctx, fmt.Sprint(h.chunk.port.OfpPort.PortNo)); err != nil {
+ return fmt.Errorf("couldnt-delete-port-from-store-%v", h.chunk.port.OfpPort.PortNo)
+ }
+ h.chunk.deleted = true
+
+ h.loader.lock.Lock()
+ delete(h.loader.ports, h.chunk.port.OfpPort.PortNo)
+ h.loader.removeLookup(h.chunk.port.DeviceId, h.chunk.port.OfpPort.PortNo)
+ h.loader.lock.Unlock()
+
+ h.Unlock()
+ return nil
+}
+
+// Unlock releases the lock on the port
+func (h *Handle) Unlock() {
+ if h.chunk != nil {
+ h.chunk.lock.Unlock()
+ h.chunk = nil // attempting to access the port through this handle in future will panic
+ }
+}
+
+// ListIDs returns a snapshot of all the managed port IDs
+// TODO: iterating through ports safely is expensive now, since all ports are stored & locked separately
+// should avoid this where possible
+func (loader *Loader) ListIDs() map[uint32]struct{} {
+ loader.lock.RLock()
+ defer loader.lock.RUnlock()
+ // copy the IDs so caller can safely iterate
+ ret := make(map[uint32]struct{}, len(loader.ports))
+ for id := range loader.ports {
+ ret[id] = struct{}{}
+ }
+ return ret
+}
+
+// ListIDsForDevice lists ports belonging to the specified device
+func (loader *Loader) ListIDsForDevice(deviceID string) map[uint32]struct{} {
+ loader.lock.RLock()
+ defer loader.lock.RUnlock()
+ // copy the IDs so caller can safely iterate
+ devicePorts := loader.deviceLookup[deviceID]
+ ret := make(map[uint32]struct{}, len(devicePorts))
+ for id := range devicePorts {
+ ret[id] = struct{}{}
+ }
+ return ret
+}
+
+func (loader *Loader) addLookup(deviceID string, portNo uint32) {
+ if devicePorts, have := loader.deviceLookup[deviceID]; have {
+ devicePorts[portNo] = struct{}{}
+ } else {
+ loader.deviceLookup[deviceID] = map[uint32]struct{}{portNo: {}}
+ }
+}
+
+func (loader *Loader) removeLookup(deviceID string, portNo uint32) {
+ if devicePorts, have := loader.deviceLookup[deviceID]; have {
+ delete(devicePorts, portNo)
+ if len(devicePorts) == 0 {
+ delete(loader.deviceLookup, deviceID)
+ }
+ }
+}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index c7af54a..d12cee9 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -1168,23 +1168,6 @@
return nil
}
-// DeleteLogicalPort removes the logical port associated with a device
-func (dMgr *Manager) DeleteLogicalPort(ctx context.Context, device *voltha.Device) error {
- logger.Info("deleteLogicalPort")
- var err error
- // Get the logical port associated with this device
- var lPortID *voltha.LogicalPortId
- if lPortID, err = dMgr.logicalDeviceMgr.getLogicalPortID(ctx, device); err != nil {
- logger.Warnw("getLogical-port-error", log.Fields{"deviceId": device.Id, "error": err})
- return err
- }
- if err = dMgr.logicalDeviceMgr.deleteLogicalPort(ctx, lPortID); err != nil {
- logger.Warnw("deleteLogical-port-error", log.Fields{"deviceId": device.Id})
- return err
- }
- return nil
-}
-
// DeleteLogicalPorts removes the logical ports associated with that deviceId
func (dMgr *Manager) DeleteLogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
logger.Debugw("delete-all-logical-ports", log.Fields{"device-id": cDevice.Id})
diff --git a/rw_core/core/device/meter/loader.go b/rw_core/core/device/meter/loader.go
index daae9ae..c597006 100644
--- a/rw_core/core/device/meter/loader.go
+++ b/rw_core/core/device/meter/loader.go
@@ -30,11 +30,10 @@
// Loader hides all low-level locking & synchronization related to meter state updates
type Loader struct {
+ dbProxy *model.Proxy
// this lock protects the meters map, it does not protect individual meters
lock sync.RWMutex
meters map[uint32]*chunk
-
- dbProxy *model.Proxy
}
// chunk keeps a meter and the lock for this meter
@@ -48,8 +47,8 @@
func NewLoader(dbProxy *model.Proxy) *Loader {
return &Loader{
- meters: make(map[uint32]*chunk),
dbProxy: dbProxy,
+ meters: make(map[uint32]*chunk),
}
}
@@ -86,8 +85,6 @@
loader.lock.Unlock()
if err := loader.dbProxy.Set(ctx, fmt.Sprint(meter.Config.MeterId), meter); err != nil {
- logger.Errorw("failed-adding-meter-to-db", log.Fields{"meterID": meter.Config.MeterId, "err": err})
-
// revert the map
loader.lock.Lock()
delete(loader.meters, meter.Config.MeterId)
@@ -130,6 +127,8 @@
return &Handle{loader: loader, chunk: entry}, true
}
+// Handle is allocated for each Lock() call, all modifications are made using it, and it is invalidated by Unlock()
+// This enforces correct Lock()-Usage()-Unlock() ordering.
type Handle struct {
loader *Loader
chunk *chunk
@@ -173,10 +172,10 @@
}
}
-// List returns a snapshot of all the managed meter IDs
+// ListIDs returns a snapshot of all the managed meter IDs
// TODO: iterating through meters safely is expensive now, since all meters are stored & locked separately
// should avoid this where possible
-func (loader *Loader) List() map[uint32]struct{} {
+func (loader *Loader) ListIDs() map[uint32]struct{} {
loader.lock.RLock()
defer loader.lock.RUnlock()
// copy the IDs so caller can safely iterate
diff --git a/rw_core/coreif/logical_device_agent_if.go b/rw_core/coreif/logical_device_agent_if.go
index ac4201a..33ea5e2 100644
--- a/rw_core/coreif/logical_device_agent_if.go
+++ b/rw_core/coreif/logical_device_agent_if.go
@@ -30,7 +30,7 @@
type LogicalDeviceAgent interface {
GetDeviceRoutes() *route.DeviceRoutes
GetLogicalDevice(ctx context.Context) (*voltha.LogicalDevice, error)
- GetWildcardInputPorts(excludePort ...uint32) []uint32
+ GetWildcardInputPorts(excludePort uint32) map[uint32]struct{}
GetRoute(ctx context.Context, ingressPortNo uint32, egressPortNo uint32) ([]route.Hop, error)
- GetNNIPorts() []uint32
+ GetNNIPorts() map[uint32]struct{}
}
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index eb338db..80a9604 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -140,13 +140,11 @@
logger.Debug("trap-uni")
//inPortNo is 0 for wildcard input case, do not include upstream port for controller bound flow in input
- var inPorts []uint32
+ var inPorts = map[uint32]struct{}{inPortNo: {}}
if inPortNo == 0 {
inPorts = agent.GetWildcardInputPorts(egressHop.Egress) // exclude egress_hop.egress_port.port_no
- } else {
- inPorts = []uint32{inPortNo}
}
- for _, inputPort := range inPorts {
+ for inputPort := range inPorts {
// Upstream flow on parent (olt) device
faParent := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie, "meter_id": uint64(meterID), "write_metadata": metadataFromwriteMetadata},
@@ -457,8 +455,11 @@
//so that a valid path can be found for the flow
nniPorts := agent.GetNNIPorts()
if len(nniPorts) > 0 {
- inPortNo = nniPorts[0]
- logger.Debugw("assigning-nni-port-as-in-port-for-multicast-flow", log.Fields{"nni": nniPorts[0], "flow:": flow})
+ for port := range nniPorts {
+ inPortNo = port
+ break
+ }
+ logger.Debugw("assigning-nni-port-as-in-port-for-multicast-flow", log.Fields{"nni": inPortNo, "flow:": flow})
}
}
outPortNo := fu.GetOutPort(flow)
diff --git a/rw_core/flowdecomposition/flow_decomposer_test.go b/rw_core/flowdecomposition/flow_decomposer_test.go
index 4c6ca8c..c3bbff7 100644
--- a/rw_core/flowdecomposition/flow_decomposer_test.go
+++ b/rw_core/flowdecomposition/flow_decomposer_test.go
@@ -413,15 +413,11 @@
return tfd.defaultRules
}
-func (tfd *testFlowDecomposer) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
- lPorts := make([]uint32, 0)
- var exclPort uint32
- if len(excludePort) == 1 {
- exclPort = excludePort[0]
- }
- for portno := range tfd.logicalPorts {
- if portno != exclPort {
- lPorts = append(lPorts, portno)
+func (tfd *testFlowDecomposer) GetWildcardInputPorts(excludePort uint32) map[uint32]struct{} {
+ lPorts := make(map[uint32]struct{})
+ for portNo := range tfd.logicalPorts {
+ if portNo != excludePort {
+ lPorts[portNo] = struct{}{}
}
}
return lPorts
@@ -449,11 +445,11 @@
return nil, status.Errorf(codes.FailedPrecondition, "no route from:%d to:%d", ingressPortNo, egressPortNo)
}
-func (tfd *testFlowDecomposer) GetNNIPorts() []uint32 {
- nniPorts := make([]uint32, 0)
+func (tfd *testFlowDecomposer) GetNNIPorts() map[uint32]struct{} {
+ nniPorts := make(map[uint32]struct{})
for portNo, nni := range tfd.logicalPortsNo {
if nni {
- nniPorts = append(nniPorts, portNo)
+ nniPorts[portNo] = struct{}{}
}
}
return nniPorts
diff --git a/rw_core/route/device_route.go b/rw_core/route/device_route.go
index 3cb8470..5cd75f2 100644
--- a/rw_core/route/device_route.go
+++ b/rw_core/route/device_route.go
@@ -20,11 +20,12 @@
"context"
"errors"
"fmt"
+ "sync"
+
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "sync"
)
var ErrNoRoute = errors.New("no route")
@@ -119,7 +120,7 @@
}
//ComputeRoutes calculates all the routes between the logical ports. This will clear up any existing route
-func (dr *DeviceRoutes) ComputeRoutes(ctx context.Context, lps []*voltha.LogicalPort) error {
+func (dr *DeviceRoutes) ComputeRoutes(ctx context.Context, lps map[uint32]*voltha.LogicalPort) error {
dr.routeBuildLock.Lock()
defer dr.routeBuildLock.Unlock()
@@ -208,7 +209,7 @@
// AddPort augments the current set of routes with new routes corresponding to the logical port "lp". If the routes have
// not been built yet then use logical port "lps" to compute all current routes (lps includes lp)
-func (dr *DeviceRoutes) AddPort(ctx context.Context, lp *voltha.LogicalPort, device *voltha.Device, lps []*voltha.LogicalPort) error {
+func (dr *DeviceRoutes) AddPort(ctx context.Context, lp *voltha.LogicalPort, device *voltha.Device, lps map[uint32]*voltha.LogicalPort) error {
logger.Debugw("add-port-to-routes", log.Fields{"port": lp, "count-logical-ports": len(lps)})
// Adding NNI port
@@ -221,7 +222,7 @@
}
// AddUNIPort setup routes between the logical UNI port lp and all registered NNI ports
-func (dr *DeviceRoutes) AddUNIPort(ctx context.Context, lp *voltha.LogicalPort, device *voltha.Device, lps []*voltha.LogicalPort) error {
+func (dr *DeviceRoutes) AddUNIPort(ctx context.Context, lp *voltha.LogicalPort, device *voltha.Device, lps map[uint32]*voltha.LogicalPort) error {
logger.Debugw("add-uni-port-to-routes", log.Fields{"port": lp, "count-logical-ports": len(lps)})
dr.routeBuildLock.Lock()
@@ -258,7 +259,7 @@
}
// AddNNIPort setup routes between the logical NNI port lp and all registered UNI ports
-func (dr *DeviceRoutes) AddNNIPort(ctx context.Context, lp *voltha.LogicalPort, device *voltha.Device, lps []*voltha.LogicalPort) error {
+func (dr *DeviceRoutes) AddNNIPort(ctx context.Context, lp *voltha.LogicalPort, device *voltha.Device, lps map[uint32]*voltha.LogicalPort) error {
logger.Debugw("add-port-to-routes", log.Fields{"port": lp, "logical-ports-count": len(lps), "device-id": device.Id})
dr.routeBuildLock.Lock()
@@ -316,7 +317,7 @@
}
// AddAllPorts setups up new routes using all ports on the device. lps includes the device's logical port
-func (dr *DeviceRoutes) AddAllPorts(ctx context.Context, device *voltha.Device, lps []*voltha.LogicalPort) error {
+func (dr *DeviceRoutes) AddAllPorts(ctx context.Context, device *voltha.Device, lps map[uint32]*voltha.LogicalPort) error {
logger.Debugw("add-all-port-to-routes", log.Fields{"logical-ports-count": len(lps), "device-id": device.Id})
for _, lp := range lps {
if lp.DeviceId == device.Id {
@@ -356,16 +357,16 @@
}
// isUpToDate returns true if device is up to date
-func (dr *DeviceRoutes) isUpToDate(ld *voltha.LogicalDevice) bool {
+func (dr *DeviceRoutes) isUpToDate(ldPorts map[uint32]*voltha.LogicalPort) bool {
dr.routeBuildLock.Lock()
defer dr.routeBuildLock.Unlock()
numNNI, numUNI := 0, 0
- if ld != nil {
- if len(dr.logicalPorts) != len(ld.Ports) {
+ if ldPorts != nil {
+ if len(dr.logicalPorts) != len(ldPorts) {
return false
}
numNNI = len(dr.RootPorts)
- numUNI = len(ld.Ports) - numNNI
+ numUNI = len(ldPorts) - numNNI
}
return len(dr.Routes) == numNNI*numUNI*2
}
diff --git a/rw_core/route/device_route_test.go b/rw_core/route/device_route_test.go
index fbbc802..4fe095b 100644
--- a/rw_core/route/device_route_test.go
+++ b/rw_core/route/device_route_test.go
@@ -19,15 +19,16 @@
"context"
"errors"
"fmt"
- "github.com/opencord/voltha-protos/v3/go/openflow_13"
- "github.com/opencord/voltha-protos/v3/go/voltha"
- "github.com/stretchr/testify/assert"
"math/rand"
"reflect"
"strings"
"sync"
"testing"
"time"
+
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "github.com/stretchr/testify/assert"
)
const (
@@ -53,7 +54,8 @@
}
type logicalDeviceManager struct {
- logicalDevice *voltha.LogicalDevice
+ logicalDeviceID string
+ ports map[uint32]*voltha.LogicalPort
deviceRoutes *DeviceRoutes
ldChnl chan portRegistration
numLogicalPorts int
@@ -61,8 +63,14 @@
}
func newLogicalDeviceManager(ld *voltha.LogicalDevice, ch chan portRegistration, totalLogicalPorts int, done chan struct{}) *logicalDeviceManager {
+ ports := make(map[uint32]*voltha.LogicalPort)
+ for _, p := range ld.Ports {
+ ports[p.DevicePortNo] = p
+ }
+
return &logicalDeviceManager{
- logicalDevice: ld,
+ logicalDeviceID: ld.Id,
+ ports: ports,
ldChnl: ch,
numLogicalPorts: totalLogicalPorts,
done: done,
@@ -70,7 +78,7 @@
}
func (ldM *logicalDeviceManager) start(getDevice GetDeviceFunc, buildRoutes bool) {
- ldM.deviceRoutes = NewDeviceRoutes(ldM.logicalDevice.Id, getDevice)
+ ldM.deviceRoutes = NewDeviceRoutes(ldM.logicalDeviceID, getDevice)
ofpPortNo := uint32(1)
for portReg := range ldM.ldChnl {
if portReg.port == nil {
@@ -84,15 +92,14 @@
DevicePortNo: portReg.port.PortNo,
RootPort: portReg.rootPort,
}
- ldM.logicalDevice.Ports = append(ldM.logicalDevice.Ports, lp)
+ ldM.ports[lp.DevicePortNo] = lp
if buildRoutes {
device, err := getDevice(context.WithValue(context.Background(), testSetupPhase, true), lp.DeviceId)
if err != nil {
fmt.Println("Error when getting device:", lp.DeviceId, err)
}
- err = ldM.deviceRoutes.AddPort(context.Background(), lp, device, ldM.logicalDevice.Ports)
- if err != nil && !strings.Contains(err.Error(), "code = FailedPrecondition") {
- fmt.Println("(Error when adding port:", lp, len(ldM.logicalDevice.Ports), err)
+ if err := ldM.deviceRoutes.AddPort(context.Background(), lp, device, ldM.ports); err != nil && !strings.Contains(err.Error(), "code = FailedPrecondition") {
+ fmt.Println("(Error when adding port:", lp, len(ldM.ports), err)
}
}
ofpPortNo++
@@ -111,7 +118,7 @@
func newOltManager(oltDeviceID string, ldMgr *logicalDeviceManager, numNNIPort int, numPonPortOnOlt int, ch chan onuRegistration) *oltManager {
return &oltManager{
- olt: &voltha.Device{Id: oltDeviceID, ParentId: ldMgr.logicalDevice.Id, Root: true},
+ olt: &voltha.Device{Id: oltDeviceID, ParentId: ldMgr.logicalDeviceID, Root: true},
logicalDeviceMgr: ldMgr,
numNNIPort: numNNIPort,
numPonPortOnOlt: numPonPortOnOlt,
@@ -185,21 +192,21 @@
func (onuM *onuManager) start(startingOltPeerPortNo int, numPonPortOnOlt int) {
var wg sync.WaitGroup
for oltPonNo := startingOltPeerPortNo; oltPonNo < startingOltPeerPortNo+numPonPortOnOlt; oltPonNo++ {
- for i := 0; i < onuM.numOnus; i++ {
+ for onuID := 0; onuID < onuM.numOnus; onuID++ {
wg.Add(1)
- go func(idx int, oltPonNum int) {
+ go func(onuID int, oltPonNum int) {
var onu *voltha.Device
defer wg.Done()
- id := fmt.Sprintf("%d-onu-%d", oltPonNum, idx)
+ id := fmt.Sprintf("%d-onu-%d", oltPonNum, onuID)
onu = &voltha.Device{Id: id, ParentId: onuM.oltMgr.olt.Id, ParentPortNo: uint32(oltPonNum)}
- ponPort := &voltha.Port{Label: fmt.Sprintf("%s:pon-%d", onu.Id, idx), PortNo: 1, DeviceId: onu.Id, Type: voltha.Port_PON_ONU}
+ ponPort := &voltha.Port{Label: fmt.Sprintf("%s:pon-%d", onu.Id, onuID), PortNo: 1, DeviceId: onu.Id, Type: voltha.Port_PON_ONU}
ponPort.Peers = make([]*voltha.Port_PeerPort, 0)
peerPort := voltha.Port_PeerPort{DeviceId: onuM.oltMgr.olt.Id, PortNo: uint32(oltPonNum)}
ponPort.Peers = append(ponPort.Peers, &peerPort)
onu.Ports = make([]*voltha.Port, 0)
onu.Ports = append(onu.Ports, ponPort)
for j := onuM.startingUniPortNo; j < onuM.numUnisPerOnu+onuM.startingUniPortNo; j++ {
- uniPort := &voltha.Port{Label: fmt.Sprintf("%s:uni-%d", onu.Id, j), PortNo: uint32(j), DeviceId: onu.Id, Type: voltha.Port_ETHERNET_UNI}
+ uniPort := &voltha.Port{Label: fmt.Sprintf("%s:uni-%d", onu.Id, j), PortNo: uint32(oltPonNum)<<12 + uint32(onuID+1)<<4 + uint32(j), DeviceId: onu.Id, Type: voltha.Port_ETHERNET_UNI}
onu.Ports = append(onu.Ports, uniPort)
}
onuM.deviceLock.Lock()
@@ -210,7 +217,7 @@
oltPonNo: uint32(oltPonNum),
onuPonNo: 1,
}
- }(i, oltPonNo)
+ }(onuID, oltPonNo)
}
}
wg.Wait()
@@ -278,17 +285,17 @@
// Computes the routes
start := time.Now()
- err := ldMgr.deviceRoutes.ComputeRoutes(context.TODO(), ldMgr.logicalDevice.Ports)
+ err := ldMgr.deviceRoutes.ComputeRoutes(context.TODO(), ldMgr.ports)
assert.Nil(t, err)
// Validate the routes are up to date
- assert.True(t, ldMgr.deviceRoutes.isUpToDate(ld))
+ assert.True(t, ldMgr.deviceRoutes.isUpToDate(ldMgr.ports))
// Validate the expected number of routes
assert.EqualValues(t, 2*numNNIPort*numPonPortOnOlt*numOnuPerOltPonPort*numUniPerOnu, len(ldMgr.deviceRoutes.Routes))
// Validate the root ports
- for _, port := range ldMgr.logicalDevice.Ports {
+ for _, port := range ldMgr.ports {
assert.Equal(t, port.RootPort, ldMgr.deviceRoutes.IsRootPort(port.OfpPort.PortNo))
}
fmt.Println(fmt.Sprintf("Total Time:%dms, Total Routes:%d NumGetDeviceInvoked:%d", time.Since(start)/time.Millisecond, len(ldMgr.deviceRoutes.Routes), onuMgr.numGetDeviceInvoked))
@@ -326,13 +333,13 @@
ldMgr.deviceRoutes.Print()
// Validate the routes are up to date
- assert.True(t, ldMgr.deviceRoutes.isUpToDate(ld))
+ assert.True(t, ldMgr.deviceRoutes.isUpToDate(ldMgr.ports))
// Validate the expected number of routes
assert.EqualValues(t, 2*numNNIPort*numPonPortOnOlt*numOnuPerOltPonPort*numUniPerOnu, len(ldMgr.deviceRoutes.Routes))
// Validate the root ports
- for _, port := range ldMgr.logicalDevice.Ports {
+ for _, port := range ldMgr.ports {
assert.Equal(t, port.RootPort, ldMgr.deviceRoutes.IsRootPort(port.OfpPort.PortNo))
}
@@ -367,7 +374,7 @@
close(oltMgrChnl1)
close(ldMgrChnl1)
- err := ldMgr1.deviceRoutes.ComputeRoutes(context.TODO(), ldMgr1.logicalDevice.Ports)
+ err := ldMgr1.deviceRoutes.ComputeRoutes(context.TODO(), ldMgr1.ports)
assert.Nil(t, err)
routesGeneratedAllAtOnce := ldMgr1.deviceRoutes.Routes
diff --git a/tests/core/api/grpc_nbi_api_handler_client_test.go b/tests/core/api/grpc_nbi_api_handler_client_test.go
index ac9da6c..53dcd07 100644
--- a/tests/core/api/grpc_nbi_api_handler_client_test.go
+++ b/tests/core/api/grpc_nbi_api_handler_client_test.go
@@ -361,7 +361,9 @@
for _, ld := range lresponse.Items {
logicalDevices[ld.Id] = ld
// Ensure each logical device have two ports
- assert.Equal(t, 2, len(ld.Ports))
+ ports, err := stub.ListLogicalDevicePorts(ctx, &voltha.ID{Id: ld.Id})
+ assert.Nil(t, err)
+ assert.Equal(t, 2, len(ports.Items))
}
//7. Disable all ONUs & check status & check logical device
@@ -399,7 +401,9 @@
for _, ld := range lresponse.Items {
logicalDevices[ld.Id] = ld
// Ensure each logical device have one port - only olt port
- assert.Equal(t, 1, len(ld.Ports))
+ ports, err := stub.ListLogicalDevicePorts(ctx, &common.ID{Id: ld.Id})
+ assert.Nil(t, err)
+ assert.Equal(t, 1, len(ports.Items))
}
//8. Enable all ONUs & check status & check logical device