[VOL-1588] Improve Flow Add performance
This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.
The flow update/deletion performance will be addressed in a separate
commit.
Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 9704fff..836269e 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,7 +17,6 @@
import (
"context"
- "fmt"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
@@ -34,6 +33,7 @@
type DeviceAgent struct {
deviceId string
deviceType string
+ isRootdevice bool
lastData *voltha.Device
adapterProxy *AdapterProxy
adapterMgr *AdapterManager
@@ -41,14 +41,13 @@
clusterDataProxy *model.Proxy
deviceProxy *model.Proxy
exitChannel chan int
- flowProxy *model.Proxy
- groupProxy *model.Proxy
lockDevice sync.RWMutex
+ defaultTimeout int64
}
//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
//preprovisioning
-func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
+func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout int64) *DeviceAgent {
var agent DeviceAgent
agent.adapterProxy = ap
cloned := (proto.Clone(device)).(*voltha.Device)
@@ -63,6 +62,7 @@
// overwritten by the child adapter during a device update request
cloned.Vlan = device.ProxyAddress.ChannelId
}
+ agent.isRootdevice = device.Root
agent.deviceId = cloned.Id
agent.deviceType = cloned.Type
agent.lastData = cloned
@@ -71,6 +71,7 @@
agent.exitChannel = make(chan int, 1)
agent.clusterDataProxy = cdProxy
agent.lockDevice = sync.RWMutex{}
+ agent.defaultTimeout = timeout
return &agent
}
@@ -101,16 +102,6 @@
agent.deviceProxy = agent.clusterDataProxy.CreateProxy("/devices/"+agent.deviceId, false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
- agent.flowProxy = agent.clusterDataProxy.CreateProxy(
- fmt.Sprintf("/devices/%s/flows", agent.deviceId),
- false)
- agent.groupProxy = agent.clusterDataProxy.CreateProxy(
- fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
- false)
-
- agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
- agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-
log.Debug("device-agent-started")
return nil
}
@@ -201,38 +192,117 @@
return nil
}
-func (agent *DeviceAgent) updateFlows(flows []*ofp.OfpFlowStats) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows})
- var oldData *voltha.Flows
- if storedData, err := agent.getDeviceWithoutLock(); err != nil {
- return status.Errorf(codes.NotFound, "%s", agent.deviceId)
- } else {
- oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
- log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
-
- // store the changed data
- afterUpdate := agent.flowProxy.Update("/", &ofp.Flows{Items: flows}, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
-
- return nil
+func (agent *DeviceAgent) updateDeviceWithoutLockAsync(device *voltha.Device, ch chan interface{}) {
+ if err := agent.updateDeviceWithoutLock(device); err != nil {
+ ch <- status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
}
+ ch <- nil
}
-func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, ch chan interface{}) {
+ if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups); err != nil {
+ log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ ch <- err
+ }
+ ch <- nil
+}
+
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, ch chan interface{}) {
+ if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups); err != nil {
+ log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ ch <- err
+ }
+ ch <- nil
+}
+
+func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry) error {
+ if (len(newFlows) | len(newGroups)) == 0 {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ return nil
+ }
+
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
- if _, err := agent.getDeviceWithoutLock(); err != nil {
+ log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ var existingFlows *voltha.Flows
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
+ existingFlows = proto.Clone(device.Flows).(*voltha.Flows)
+ existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+ log.Debugw("addFlows", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "existingFlows": existingFlows, "groups": newGroups, "existingGroups": existingGroups})
+
+ var updatedFlows []*ofp.OfpFlowStats
+ var flowsToDelete []*ofp.OfpFlowStats
+ var groupsToDelete []*ofp.OfpGroupEntry
+ var updatedGroups []*ofp.OfpGroupEntry
+
+ // Process flows
+ for _, flow := range newFlows {
+ updatedFlows = append(updatedFlows, flow)
+ }
+
+ for _, flow := range existingFlows.Items {
+ if idx := fu.FindFlows(newFlows, flow); idx == -1 {
+ updatedFlows = append(updatedFlows, flow)
+ } else {
+ flowsToDelete = append(flowsToDelete, flow)
+ }
+ }
+
+ // Process groups
+ for _, g := range newGroups {
+ updatedGroups = append(updatedGroups, g)
+ }
+
+ for _, group := range existingGroups.Items {
+ if fu.FindGroup(newGroups, group.Desc.GroupId) == -1 { // does not exist now
+ updatedGroups = append(updatedGroups, group)
+ } else {
+ groupsToDelete = append(groupsToDelete, group)
+ }
+ }
+
+ // Sanity check
+ if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ return nil
+
+ }
+ // Send update to adapters
+ chAdapters := make(chan interface{})
+ defer close(chAdapters)
+ chdB := make(chan interface{})
+ defer close(chdB)
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+
+ if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ return nil
+ }
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: newFlows},
+ ToRemove: &voltha.Flows{Items: flowsToDelete},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: newGroups},
+ ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ }
+
// store the changed data
- afterUpdate := agent.groupProxy.Update("/", &ofp.FlowGroups{Items: groups}, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ device.Flows = &voltha.Flows{Items: updatedFlows}
+ device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+ go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -677,7 +747,6 @@
func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
agent.lockDevice.Lock()
- //defer agent.lockDevice.Unlock()
// Work only on latest data
if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
agent.lockDevice.Unlock()
@@ -707,7 +776,6 @@
func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
agent.lockDevice.Lock()
- //defer agent.lockDevice.Unlock()
// Work only on latest data
// TODO: Get list of ports from device directly instead of the entire device
if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
@@ -766,7 +834,7 @@
func (agent *DeviceAgent) addPort(port *voltha.Port) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
+ log.Debugw("addLogicalPortToMap", log.Fields{"deviceId": agent.deviceId})
// Work only on latest data
if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
@@ -775,7 +843,7 @@
cloned := proto.Clone(storeDevice).(*voltha.Device)
if cloned.Ports == nil {
// First port
- log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
+ log.Debugw("addLogicalPortToMap-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
cloned.Ports = make([]*voltha.Port, 0)
}
cp := proto.Clone(port).(*voltha.Port)
@@ -822,160 +890,6 @@
}
}
-//flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapters
-func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
- log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
-
- var previousData *voltha.Flows
- var latestData *voltha.Flows
-
- var ok bool
- if previousData, ok = args[0].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- return nil
- }
- if latestData, ok = args[1].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- return nil
- }
-
- // Sanity check - should not happen as this is already handled in logical device agent
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debugw("flow-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
- return nil
- }
-
- var device *voltha.Device
- var err error
- if device, err = agent.getDeviceWithoutLock(); err != nil {
- log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
- return nil
- }
- groups := device.FlowGroups
-
- // Send update to adapters
- dType := agent.adapterMgr.getDeviceType(device.Type)
- if !dType.AcceptsAddRemoveFlowUpdates {
- if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
- log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
- return err
- }
- return nil
- }
- // Incremental flow changes accepted
- var toAdd []*ofp.OfpFlowStats
- var toDelete []*ofp.OfpFlowStats
-
- for _, flow := range latestData.Items {
- if fu.FindFlowById(previousData.Items, flow) == -1 { // did not exist before
- toAdd = append(toAdd, flow)
- }
- }
- for _, flow := range previousData.Items {
- if fu.FindFlowById(latestData.Items, flow) == -1 { // does not exist now
- toDelete = append(toDelete, flow)
- }
- }
- flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: toAdd},
- ToRemove: &voltha.Flows{Items: toDelete},
- }
- // Send an empty group changes as it would be dealt with a call to groupTableUpdated
- groupChanges := &ofp.FlowGroupChanges{}
-
- // Send changes only
- if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
- log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
- return err
- }
-
- return nil
-}
-
-//groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapters
-func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
- log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
-
- var previousData *voltha.FlowGroups
- var latestData *voltha.FlowGroups
-
- var ok bool
- if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- return nil
- }
- if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- return nil
- }
-
- // Sanity check - should not happen as this is already handled in logical device agent
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debugw("group-table-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
- return nil
- }
-
- var device *voltha.Device
- var err error
- if device, err = agent.getDeviceWithoutLock(); err != nil {
- log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
- return nil
- }
- flows := device.Flows
-
- // Send update to adapters
- dType := agent.adapterMgr.getDeviceType(device.Type)
- if !dType.AcceptsAddRemoveFlowUpdates {
- if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
- log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
- return err
- }
- return nil
- }
-
- // Incremental group changes accepted
- var toAdd []*ofp.OfpGroupEntry
- var toDelete []*ofp.OfpGroupEntry
- var toUpdate []*ofp.OfpGroupEntry
-
- for _, group := range latestData.Items {
- if idx := fu.FindGroup(previousData.Items, group.Desc.GroupId); idx == -1 { // did not exist before
- toAdd = append(toAdd, group)
- } else { // existed before
- if previousData.Items[idx].String() != group.String() { // there is a change
- toUpdate = append(toUpdate, group)
- }
- }
- }
- for _, group := range previousData.Items {
- if fu.FindGroup(latestData.Items, group.Desc.GroupId) == -1 { // does not exist now
- toDelete = append(toDelete, group)
- }
- }
- groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: toAdd},
- ToRemove: &voltha.FlowGroups{Items: toDelete},
- ToUpdate: &voltha.FlowGroups{Items: toUpdate},
- }
- // Send an empty flow changes as it should have been dealt with a call to flowTableUpdated
- flowChanges := &ofp.FlowChanges{}
-
- // Send changes only
- if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
- log.Debugw("update-incremental-group-error", log.Fields{"id": agent.lastData.Id, "error": err})
- return err
- }
- return nil
-}
-
// TODO: A generic device update by attribute
func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
agent.lockDevice.Lock()