[VOL-3005] Separate Flows from Device
Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.
Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 357c49a..17ac266 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -34,6 +34,7 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/common"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
@@ -50,6 +51,7 @@
logicalDeviceMgr *LogicalManager
kafkaICProxy kafka.InterContainerProxy
stateTransitions *TransitionMap
+ dbPath *model.Path
dProxy *model.Proxy
coreInstanceID string
defaultTimeout time.Duration
@@ -57,13 +59,15 @@
deviceLoadingInProgress map[string][]chan int
}
-func NewManagers(dbProxy *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+//NewManagers creates the Manager and the Logical Manager.
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
adapterProxy: remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
coreInstanceID: coreInstanceID,
- dProxy: dbProxy.Proxy("devices"),
+ dbPath: dbPath,
+ dProxy: dbPath.Proxy("devices"),
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout * time.Millisecond,
deviceLoadingInProgress: make(map[string][]chan int),
@@ -74,8 +78,8 @@
Manager: event.NewManager(),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
- dbProxy: dbProxy,
- ldProxy: dbProxy.Proxy("logical_devices"),
+ dbPath: dbPath,
+ ldProxy: dbPath.Proxy("logical_devices"),
defaultTimeout: defaultCoreTimeout,
logicalDeviceLoadingInProgress: make(map[string][]chan int),
}
@@ -157,7 +161,7 @@
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
device, err = agent.start(ctx, device)
if err != nil {
logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
@@ -220,22 +224,34 @@
// ListDeviceFlows returns the flow details for a specific device entry
func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
logger.Debugw("ListDeviceFlows", log.Fields{"device-id": id.Id})
- device, err := dMgr.getDevice(ctx, id.Id)
- if err != nil {
- return &ofp.Flows{}, err
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return &ofp.Flows{}, status.Errorf(codes.NotFound, "device-%s", id.Id)
}
- return device.Flows, nil
+
+ flows := agent.listDeviceFlows()
+ ctr, ret := 0, make([]*ofp.OfpFlowStats, len(flows))
+ for _, flow := range flows {
+ ret[ctr] = flow
+ ctr++
+ }
+ return &openflow_13.Flows{Items: ret}, nil
}
// ListDeviceFlowGroups returns the flow group details for a specific device entry
func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
logger.Debugw("ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
-
- device, err := dMgr.getDevice(ctx, id.Id)
- if err != nil {
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
}
- return device.GetFlowGroups(), nil
+ groups := agent.listDeviceGroups()
+ ctr, ret := 0, make([]*openflow_13.OfpGroupEntry, len(groups))
+ for _, group := range groups {
+ ret[ctr] = group
+ ctr++
+ }
+ return &voltha.FlowGroups{Items: ret}, nil
}
// stopManagingDevice stops the management of the device as well as any of its reference device and logical device.
@@ -408,7 +424,7 @@
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.Id) {
logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
if _, err := agent.start(ctx, nil); err != nil {
logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
} else {
@@ -471,7 +487,7 @@
// Proceed with the loading only if the device exist in the Model (could have been deleted)
if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
} else {
@@ -1029,7 +1045,7 @@
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
childDevice, err := agent.start(ctx, childDevice)
if err != nil {
logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})