VOL-3121 - Separated ports from devices.
Similar to flows/groups/meters/logical ports.
Also added ListDevicePorts and GetDevicePort to the adapter API.
Also removed unused `// +build integration` tests.
Change-Id: I586adb9f46a249c9430d4205ef5db2d105dbbe06
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 2b18296..9fbeb9d 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -29,6 +29,7 @@
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/core/device/group"
+ "github.com/opencord/voltha-go/rw_core/core/device/port"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -48,7 +49,7 @@
deviceID string
parentID string
deviceType string
- isRootdevice bool
+ isRootDevice bool
adapterProxy *remote.AdapterProxy
adapterMgr *adapter.Manager
deviceMgr *Manager
@@ -63,32 +64,33 @@
flowLoader *flow.Loader
groupLoader *group.Loader
+ portLoader *port.Loader
}
//newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbProxy *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
- var agent Agent
- agent.adapterProxy = ap
- if device.Id == "" {
- agent.deviceID = coreutils.CreateDeviceID()
- } else {
- agent.deviceID = device.Id
+func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
+ deviceID := device.Id
+ if deviceID == "" {
+ deviceID = coreutils.CreateDeviceID()
}
- agent.isRootdevice = device.Root
- agent.parentID = device.ParentId
- agent.deviceType = device.Type
- agent.deviceMgr = deviceMgr
- agent.adapterMgr = deviceMgr.adapterMgr
- agent.exitChannel = make(chan int, 1)
- agent.dbProxy = deviceProxy
- agent.defaultTimeout = timeout
- agent.device = proto.Clone(device).(*voltha.Device)
- agent.requestQueue = coreutils.NewRequestQueue()
- agent.flowLoader = flow.NewLoader(dbProxy.SubPath("flows").Proxy(device.Id))
- agent.groupLoader = group.NewLoader(dbProxy.SubPath("groups").Proxy(device.Id))
-
- return &agent
+ return &Agent{
+ deviceID: deviceID,
+ adapterProxy: ap,
+ isRootDevice: device.Root,
+ parentID: device.ParentId,
+ deviceType: device.Type,
+ deviceMgr: deviceMgr,
+ adapterMgr: deviceMgr.adapterMgr,
+ exitChannel: make(chan int, 1),
+ dbProxy: deviceProxy,
+ defaultTimeout: timeout,
+ device: proto.Clone(device).(*voltha.Device),
+ requestQueue: coreutils.NewRequestQueue(),
+ flowLoader: flow.NewLoader(dbPath.SubPath("flows").Proxy(deviceID)),
+ groupLoader: group.NewLoader(dbPath.SubPath("groups").Proxy(deviceID)),
+ portLoader: port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
+ }
}
// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
@@ -124,6 +126,7 @@
// load the flows and groups from KV to cache
agent.flowLoader.Load(ctx)
agent.groupLoader.Load(ctx)
+ agent.portLoader.Load(ctx)
logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
} else {
@@ -134,8 +137,6 @@
device = (proto.Clone(deviceToCreate)).(*voltha.Device)
device.Id = agent.deviceID
device.AdminState = voltha.AdminState_PREPROVISIONED
- device.FlowGroups = &ofp.FlowGroups{Items: nil}
- device.Flows = &ofp.Flows{Items: nil}
if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
// Set the default vlan ID to the one specified by the parent adapter. It can be
// overwritten by the child adapter during a device update request
@@ -202,6 +203,7 @@
agent.device = device
agent.flowLoader.Load(ctx)
agent.groupLoader.Load(ctx)
+ agent.portLoader.Load(ctx)
logger.Debugw(ctx, "reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}
@@ -714,27 +716,26 @@
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-ud": agent.deviceID})
- //Remove the associated peer ports on the parent device
- parentDevice := agent.getDeviceWithoutLock()
- var updatedPeers []*voltha.Port_PeerPort
- for _, port := range parentDevice.Ports {
- updatedPeers = make([]*voltha.Port_PeerPort, 0)
- for _, peerPort := range port.Peers {
- if peerPort.DeviceId != device.Id {
- updatedPeers = append(updatedPeers, peerPort)
+ // Remove the associated peer ports on the parent device
+ for portID := range agent.portLoader.ListIDs() {
+ if portHandle, have := agent.portLoader.Lock(portID); have {
+ oldPort := portHandle.GetReadOnly()
+ updatedPeers := make([]*voltha.Port_PeerPort, 0)
+ for _, peerPort := range oldPort.Peers {
+ if peerPort.DeviceId != device.Id {
+ updatedPeers = append(updatedPeers, peerPort)
+ }
}
+ newPort := *oldPort
+ newPort.Peers = updatedPeers
+ if err := portHandle.Update(ctx, &newPort); err != nil {
+ portHandle.Unlock()
+ return nil
+ }
+ portHandle.Unlock()
}
- port.Peers = updatedPeers
- }
- if err := agent.updateDeviceInStoreWithoutLock(ctx, parentDevice, false, ""); err != nil {
- return err
}
//send request to adapter