[VOL-1037, VOL-1035] This commit consists of flow and groups
handling (from NBI to Adapters, including decomposition),
Change-Id: I4f6d9ecd3dee8a9b161708b20b0a68d030c0cb23
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 52ab584..e045fc9 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,16 +17,18 @@
import (
"context"
- "reflect"
- "sync"
-
+ "fmt"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/protos/core_adapter"
+ ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
+ fu "github.com/opencord/voltha-go/rw_core/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "reflect"
+ "sync"
)
type DeviceAgent struct {
@@ -37,6 +39,8 @@
clusterDataProxy *model.Proxy
deviceProxy *model.Proxy
exitChannel chan int
+ flowProxy *model.Proxy
+ groupProxy *model.Proxy
lockDevice sync.RWMutex
}
@@ -48,6 +52,13 @@
cloned := (proto.Clone(device)).(*voltha.Device)
cloned.Id = CreateDeviceId()
cloned.AdminState = voltha.AdminState_PREPROVISIONED
+ cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
+ cloned.Flows = &ofp.Flows{Items: nil}
+ if !device.GetRoot() && device.ProxyAddress != nil {
+ // Set the default vlan ID to the one specified by the parent adapter. It can be
+ // overwritten by the child adapter during a device update request
+ cloned.Vlan = device.ProxyAddress.ChannelId
+ }
agent.deviceId = cloned.Id
agent.lastData = cloned
agent.deviceMgr = deviceMgr
@@ -68,6 +79,17 @@
}
agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
+
+ agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
+ fmt.Sprintf("/devices/%s/flows", agent.deviceId),
+ false)
+ agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
+ fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
+ false)
+
+ agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
+ //agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
+
log.Debug("device-agent-started")
}
@@ -80,7 +102,7 @@
log.Debug("device-agent-stopped")
}
-// getDevice retrieves the latest device information from the data model
+// GetDevice retrieves the latest device information from the data model
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -146,6 +168,51 @@
return nil
}
+func (agent *DeviceAgent) updateFlows(flows []*ofp.OfpFlowStats) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows})
+ var oldData *voltha.Flows
+ if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
+ log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
+ // store the changed data
+ storedData.Flows.Items = flows
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+
+ // For now, force the callback to occur
+ go agent.flowTableUpdated(oldData, &ofp.Flows{Items: flows})
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ var oldData *voltha.FlowGroups
+ log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
+ if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ oldData = proto.Clone(storedData.FlowGroups).(*voltha.FlowGroups)
+ // store the changed data
+ storedData.FlowGroups.Items = groups
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+
+ // For now, force the callback to occur
+ go agent.groupTableUpdated(oldData, &ofp.FlowGroups{Items: groups})
+ return nil
+ }
+}
+
//disableDevice disable a device
func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
agent.lockDevice.Lock()
@@ -257,7 +324,7 @@
func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
ports := &voltha.Ports{}
- if device, _ := agent.deviceMgr.getDevice(agent.deviceId); device != nil {
+ if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
for _, port := range device.Ports {
if port.Type == portType {
ports.Items = append(ports.Items, port)
@@ -271,7 +338,7 @@
// parent device
func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
- if device, err := agent.deviceMgr.getDevice(agent.deviceId); device == nil {
+ if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
return nil, err
} else {
var switchCap *core_adapter.SwitchCapability
@@ -288,7 +355,7 @@
// device
func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
- if device, err := agent.deviceMgr.getDevice(agent.deviceId); device == nil {
+ if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
return nil, err
} else {
var portCap *core_adapter.PortCapability
@@ -500,6 +567,164 @@
}
}
+//flowTableUpdated is the callback after flows have been updated in the model to push them
+//to the adapters
+func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
+ log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
+
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+
+ var previousData *voltha.Flows
+ var latestData *voltha.Flows
+
+ var ok bool
+ if previousData, ok = args[0].(*ofp.Flows); !ok {
+ log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+ return nil
+ }
+ if latestData, ok = args[1].(*ofp.Flows); !ok {
+ log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+ return nil
+ }
+
+ // Sanity check - should not happen as this is already handled in logical device agent
+ if reflect.DeepEqual(previousData.Items, latestData.Items) {
+ log.Debugw("flow-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
+ return nil
+ }
+
+ var device *voltha.Device
+ var err error
+ if device, err = agent.getDeviceWithoutLock(); err != nil {
+ log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
+ return nil
+ }
+ groups := device.FlowGroups
+
+ // Send update to adapters
+ // Check whether the device supports incremental flow changes
+ // Assume false for test
+ acceptsAddRemoveFlowUpdates := false
+ if !acceptsAddRemoveFlowUpdates {
+ if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
+ log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ return nil
+ }
+ // Incremental flow changes accepted
+ var toAdd []*ofp.OfpFlowStats
+ var toDelete []*ofp.OfpFlowStats
+
+ for _, flow := range latestData.Items {
+ if fu.FindFlowById(previousData.Items, flow) == -1 { // did not exist before
+ toAdd = append(toAdd, flow)
+ }
+ }
+ for _, flow := range previousData.Items {
+ if fu.FindFlowById(latestData.Items, flow) == -1 { // does not exist now
+ toDelete = append(toDelete, flow)
+ }
+ }
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: toAdd},
+ ToRemove: &voltha.Flows{Items: toDelete},
+ }
+ // Send an empty group changes as it would be dealt with a call to groupTableUpdated
+ groupChanges := &ofp.FlowGroupChanges{}
+
+ // Send changes only
+ if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
+ log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+
+ return nil
+}
+
+//groupTableUpdated is the callback after group table has been updated in the model to push them
+//to the adapters
+func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
+ log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
+
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+
+ var previousData *voltha.FlowGroups
+ var latestData *voltha.FlowGroups
+
+ var ok bool
+ if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
+ log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+ return nil
+ }
+ if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
+ log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+ return nil
+ }
+
+ // Sanity check - should not happen as this is already handled in logical device agent
+ if reflect.DeepEqual(previousData.Items, latestData.Items) {
+ log.Debugw("group-table-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
+ return nil
+ }
+
+ var device *voltha.Device
+ var err error
+ if device, err = agent.getDeviceWithoutLock(); err != nil {
+ log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
+ return nil
+ }
+ flows := device.Flows
+
+ // Send update to adapters
+ // Check whether the device supports incremental flow changes
+ // Assume false for test
+ acceptsAddRemoveFlowUpdates := false
+ if !acceptsAddRemoveFlowUpdates {
+ if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
+ log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ return nil
+ }
+
+ // Incremental group changes accepted
+ var toAdd []*ofp.OfpGroupEntry
+ var toDelete []*ofp.OfpGroupEntry
+ var toUpdate []*ofp.OfpGroupEntry
+
+ for _, group := range latestData.Items {
+ if idx := fu.FindGroup(previousData.Items, group.Desc.GroupId); idx == -1 { // did not exist before
+ toAdd = append(toAdd, group)
+ } else { // existed before
+ if previousData.Items[idx].String() != group.String() { // there is a change
+ toUpdate = append(toUpdate, group)
+ }
+ }
+ }
+ for _, group := range previousData.Items {
+ if fu.FindGroup(latestData.Items, group.Desc.GroupId) == -1 { // does not exist now
+ toDelete = append(toDelete, group)
+ }
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: toAdd},
+ ToRemove: &voltha.FlowGroups{Items: toDelete},
+ ToUpdate: &voltha.FlowGroups{Items: toUpdate},
+ }
+ // Send an empty flow changes as it should have been dealt with a call to flowTableUpdated
+ flowChanges := &ofp.FlowChanges{}
+
+ // Send changes only
+ if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
+ log.Debugw("update-incremental-group-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ return nil
+}
+
// TODO: A generic device update by attribute
func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
agent.lockDevice.Lock()