VOL-3121 - Separated out logical ports from logical agent.
Similar to flows/groups/meters.
Also modified device_route tests to generate unique port IDs (`.OfpPort.PortNo`s) across all UNI ports withing each test, i.e. within an OLT.
Also replaced logicalPortsNo map & associated NNI vs UNI logic with root device checks.
Change-Id: Ib0cecbf7d4f8d509ce7c989b9ccf697c8b0d17d6
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index f943106..c14750d 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -19,7 +19,6 @@
import (
"context"
"encoding/hex"
- "fmt"
"sync"
"time"
@@ -27,6 +26,7 @@
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/core/device/group"
+ "github.com/opencord/voltha-go/rw_core/core/device/logical_port"
"github.com/opencord/voltha-go/rw_core/core/device/meter"
fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
"github.com/opencord/voltha-go/rw_core/route"
@@ -42,26 +42,26 @@
// LogicalAgent represent attributes of logical device agent
type LogicalAgent struct {
- logicalDeviceID string
- serialNumber string
- rootDeviceID string
- deviceMgr *Manager
- ldeviceMgr *LogicalManager
- ldProxy *model.Proxy
- stopped bool
- deviceRoutes *route.DeviceRoutes
- logicalPortsNo map[uint32]bool //value is true for NNI port
- lockLogicalPortsNo sync.RWMutex
- flowDecomposer *fd.FlowDecomposer
- defaultTimeout time.Duration
- logicalDevice *voltha.LogicalDevice
- requestQueue *coreutils.RequestQueue
- startOnce sync.Once
- stopOnce sync.Once
+ logicalDeviceID string
+ serialNumber string
+ rootDeviceID string
+ deviceMgr *Manager
+ ldeviceMgr *LogicalManager
+ ldProxy *model.Proxy
+ stopped bool
+ deviceRoutes *route.DeviceRoutes
+ flowDecomposer *fd.FlowDecomposer
+ defaultTimeout time.Duration
+ logicalDevice *voltha.LogicalDevice
+ requestQueue *coreutils.RequestQueue
+ orderedEvents orderedEvents
+ startOnce sync.Once
+ stopOnce sync.Once
flowLoader *flow.Loader
meterLoader *meter.Loader
groupLoader *group.Loader
+ portLoader *port.Loader
}
func newLogicalAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
@@ -74,13 +74,13 @@
ldProxy: ldProxy,
ldeviceMgr: ldeviceMgr,
flowDecomposer: fd.NewFlowDecomposer(deviceMgr),
- logicalPortsNo: make(map[uint32]bool),
defaultTimeout: defaultTimeout,
requestQueue: coreutils.NewRequestQueue(),
flowLoader: flow.NewLoader(dbProxy.SubPath("logical_flows").Proxy(id)),
groupLoader: group.NewLoader(dbProxy.SubPath("logical_groups").Proxy(id)),
meterLoader: meter.NewLoader(dbProxy.SubPath("logical_meters").Proxy(id)),
+ portLoader: port.NewLoader(dbProxy.SubPath("logical_ports").Proxy(id)),
}
agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
return agent
@@ -134,7 +134,7 @@
}
logger.Debugw("logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
- agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
+ agent.logicalDevice = ld
// Setup the logicalports - internal processing, no need to propagate the client context
go func() {
@@ -158,14 +158,13 @@
agent.rootDeviceID = ld.RootDeviceId
// Update the last data
- agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
+ agent.logicalDevice = ld
- // Setup the local list of logical ports
- agent.addLogicalPortsToMap(ld.Ports)
// load the flows, meters and groups from KV to cache
agent.flowLoader.Load(ctx)
agent.meterLoader.Load(ctx)
agent.groupLoader.Load(ctx)
+ agent.portLoader.Load(ctx)
}
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
@@ -200,6 +199,8 @@
} else {
logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
}
+ // TODO: remove all entries from all loaders
+ // TODO: don't allow any more modifications to flows/groups/meters/ports or to any logical device field
agent.stopped = true
@@ -217,28 +218,6 @@
return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice), nil
}
-// getLogicalDeviceWithoutLock returns a cloned logical device to a function that already holds the agent lock.
-func (agent *LogicalAgent) getLogicalDeviceWithoutLock() *voltha.LogicalDevice {
- logger.Debug("getLogicalDeviceWithoutLock")
- return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
-}
-
-//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
-func (agent *LogicalAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
- if agent.stopped {
- return fmt.Errorf("logical device agent stopped-%s", logicalDevice.Id)
- }
-
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if err := agent.ldProxy.Set(updateCtx, agent.logicalDeviceID, logicalDevice); err != nil {
- logger.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
- return err
- }
-
- agent.logicalDevice = logicalDevice
- return nil
-}
-
func (agent *LogicalAgent) addFlowsAndGroupsToDevices(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})