[VOL-2164] Update rw-core to use the Async Kafka API
This commit consists of the following:
1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering. With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced. Testing did not
show and noticeable latencies.
2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable). Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction. The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.
3) Adapter requests are processed to completion before sending a
reponse back to the adapter. Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter. Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.
4) Some changes are made when retrieving a handler to execute
a device state transition. This was necessary as there was some
transition overlap found.
Update after multiple reviews.
Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index dfc3907..4e029c8 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -21,14 +21,10 @@
"encoding/hex"
"errors"
"fmt"
- "github.com/opencord/voltha-go/rw_core/route"
- "reflect"
- "sync"
- "time"
-
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
+ "github.com/opencord/voltha-go/rw_core/route"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -37,6 +33,13 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "reflect"
+ "sync"
+ "time"
+)
+
+const (
+ maxOrderedLogicalDeviceRequestQueueSize = 1000
)
// LogicalDeviceAgent represent attributes of logical device agent
@@ -53,19 +56,20 @@
meterProxy *model.Proxy
ldProxy *model.Proxy
portProxies map[string]*model.Proxy
- portProxiesLock sync.RWMutex
- lockLogicalDevice sync.RWMutex
lockDeviceRoutes sync.RWMutex
logicalPortsNo map[uint32]bool //value is true for NNI port
lockLogicalPortsNo sync.RWMutex
flowDecomposer *fd.FlowDecomposer
- defaultTimeout int64
+ defaultTimeout time.Duration
logicalDevice *voltha.LogicalDevice
+ requestQueue *coreutils.RequestQueue
+ startOnce sync.Once
+ stopOnce sync.Once
}
func newLogicalDeviceAgent(id string, deviceID string, ldeviceMgr *LogicalDeviceManager,
deviceMgr *DeviceManager,
- cdProxy *model.Proxy, timeout int64) *LogicalDeviceAgent {
+ cdProxy *model.Proxy, timeout time.Duration) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
agent.exitChannel = make(chan int, 1)
agent.logicalDeviceID = id
@@ -74,27 +78,40 @@
agent.clusterDataProxy = cdProxy
agent.ldeviceMgr = ldeviceMgr
agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
- agent.lockLogicalDevice = sync.RWMutex{}
agent.portProxies = make(map[string]*model.Proxy)
- agent.portProxiesLock = sync.RWMutex{}
- agent.lockLogicalPortsNo = sync.RWMutex{}
- agent.lockDeviceRoutes = sync.RWMutex{}
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
+ agent.requestQueue = coreutils.NewRequestQueue(agent.logicalDeviceID, maxOrderedLogicalDeviceRequestQueueSize)
return &agent
}
// start creates the logical device and add it to the data model
-func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromdB bool) error {
- log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceID, "loadFromdB": loadFromdB})
+func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromDB bool) error {
+ needToStart := false
+ if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
+ return nil
+ }
+
+ log.Infow("starting-logical_device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
+
+ var startSucceeded bool
+ defer func() {
+ if !startSucceeded {
+ if err := agent.stop(ctx); err != nil {
+ log.Errorw("failed-to-cleanup-after-unsuccessful-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+ }
+ }()
+
+ // Launch the request queue - it will launch a go routine
+ agent.requestQueue.Start()
+
var ld *voltha.LogicalDevice
- var err error
- if !loadFromdB {
+ if !loadFromDB {
//Build the logical device based on information retrieved from the device adapter
var switchCap *ic.SwitchCapability
var err error
if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceID); err != nil {
- log.Errorw("error-creating-logical-device", log.Fields{"error": err})
return err
}
ld = &voltha.LogicalDevice{Id: agent.logicalDeviceID, RootDeviceId: agent.rootDeviceID}
@@ -102,7 +119,6 @@
// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
var datapathID uint64
if datapathID, err = CreateDataPathID(agent.logicalDeviceID); err != nil {
- log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
return err
}
ld.DatapathId = datapathID
@@ -113,42 +129,35 @@
ld.FlowGroups = &ofp.FlowGroups{Items: nil}
ld.Ports = []*voltha.LogicalPort{}
- agent.lockLogicalDevice.Lock()
// Save the logical device
added, err := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, "")
if err != nil {
- log.Errorw("failed-to-save-logical-devices-to-cluster-proxy", log.Fields{"error": err})
- agent.lockLogicalDevice.Unlock()
return err
}
if added == nil {
- log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+ log.Errorw("failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
} else {
- log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+ log.Debugw("logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
}
agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
- agent.lockLogicalDevice.Unlock()
- // TODO: Set the logical ports in a separate call once the port update issue is fixed.
+ // Setup the logicalports - internal processing, no need to propagate the client context
go func() {
- err := agent.setupLogicalPorts(ctx)
+ err := agent.setupLogicalPorts(context.Background())
if err != nil {
log.Errorw("unable-to-setup-logical-ports", log.Fields{"error": err})
}
}()
-
} else {
// load from dB - the logical may not exist at this time. On error, just return and the calling function
// will destroy this agent.
- agent.lockLogicalDevice.Lock()
logicalDevice, err := agent.clusterDataProxy.Get(ctx, "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
if err != nil {
- return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
+ return err
}
ld, ok := logicalDevice.(*voltha.LogicalDevice)
if !ok {
- agent.lockLogicalDevice.Unlock()
return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
}
// Update the root device Id
@@ -157,20 +166,16 @@
// Update the last data
agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
- agent.lockLogicalDevice.Unlock()
-
// Setup the local list of logical ports
agent.addLogicalPortsToMap(ld.Ports)
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ var err error
agent.flowProxy, err = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceID),
false)
if err != nil {
- log.Errorw("failed-to-create-flow-proxy", log.Fields{"error": err})
return err
}
agent.meterProxy, err = agent.clusterDataProxy.CreateProxy(
@@ -201,89 +206,118 @@
if agent.ldProxy != nil {
agent.ldProxy.RegisterCallback(model.PostUpdate, agent.portUpdated)
} else {
- log.Errorw("logical-device-proxy-null", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return status.Error(codes.Internal, "logical-device-proxy-null")
}
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
- if loadFromdB {
+ if loadFromDB {
go func() {
if err := agent.buildRoutes(context.Background()); err != nil {
- log.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
}
+ startSucceeded = true
+
return nil
}
-// stop stops the logical devuce agent. This removes the logical device from the data model.
+// stop stops the logical device agent. This removes the logical device from the data model.
func (agent *LogicalDeviceAgent) stop(ctx context.Context) error {
- log.Info("stopping-logical_device-agent")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ var returnErr error
+ agent.stopOnce.Do(func() {
+ log.Info("stopping-logical_device-agent")
- //Remove the logical device from the model
- if removed, err := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); err != nil {
- log.Errorw("failed-to-remove-device", log.Fields{"error": err})
- return err
- } else if removed == nil {
- log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
- } else {
- log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
- }
- agent.exitChannel <- 1
- log.Info("logical_device-agent-stopped")
- return nil
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ // This should never happen - an error is returned only if the agent is stopped and an agent is only stopped once.
+ returnErr = err
+ return
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ //Remove the logical device from the model
+ if removed, err := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); err != nil {
+ returnErr = err
+ } else if removed == nil {
+ returnErr = status.Errorf(codes.Aborted, "failed-to-remove-logical-ldevice-%s", agent.logicalDeviceID)
+ } else {
+ log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+ }
+
+ // Stop the request queue and request complete indication
+ agent.requestQueue.Stop()
+
+ close(agent.exitChannel)
+
+ log.Info("logical_device-agent-stopped")
+ })
+ return returnErr
}
// GetLogicalDevice returns the latest logical device data
-func (agent *LogicalDeviceAgent) GetLogicalDevice() *voltha.LogicalDevice {
- agent.lockLogicalDevice.RLock()
- defer agent.lockLogicalDevice.RUnlock()
-
- return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
+func (agent *LogicalDeviceAgent) GetLogicalDevice(ctx context.Context) (*voltha.LogicalDevice, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice), nil
}
// ListLogicalDeviceFlows returns logical device flows
-func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows() *ofp.Flows {
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
log.Debug("ListLogicalDeviceFlows")
- logicalDevice := agent.GetLogicalDevice()
- if logicalDevice.Flows == nil {
- return &ofp.Flows{}
+ logicalDevice, err := agent.GetLogicalDevice(ctx)
+ if err != nil {
+ return nil, err
}
- return (proto.Clone(logicalDevice.Flows)).(*ofp.Flows)
+ if logicalDevice.Flows == nil {
+ return &ofp.Flows{}, nil
+ }
+ return (proto.Clone(logicalDevice.Flows)).(*ofp.Flows), nil
}
// ListLogicalDeviceMeters returns logical device meters
-func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters() *ofp.Meters {
+func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters(ctx context.Context) (*ofp.Meters, error) {
log.Debug("ListLogicalDeviceMeters")
- logicalDevice := agent.GetLogicalDevice()
- if logicalDevice.Meters == nil {
- return &ofp.Meters{}
+ logicalDevice, err := agent.GetLogicalDevice(ctx)
+ if err != nil {
+ return nil, err
}
- return (proto.Clone(logicalDevice.Meters)).(*ofp.Meters)
+ if logicalDevice.Meters == nil {
+ return &ofp.Meters{}, nil
+ }
+ return (proto.Clone(logicalDevice.Meters)).(*ofp.Meters), nil
}
// ListLogicalDeviceFlowGroups returns logical device flow groups
-func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups() *ofp.FlowGroups {
+func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups(ctx context.Context) (*ofp.FlowGroups, error) {
log.Debug("ListLogicalDeviceFlowGroups")
- logicalDevice := agent.GetLogicalDevice()
- if logicalDevice.FlowGroups == nil {
- return &ofp.FlowGroups{}
+ logicalDevice, err := agent.GetLogicalDevice(ctx)
+ if err != nil {
+ return nil, err
}
- return (proto.Clone(logicalDevice.FlowGroups)).(*ofp.FlowGroups)
+ if logicalDevice.FlowGroups == nil {
+ return &ofp.FlowGroups{}, nil
+ }
+ return (proto.Clone(logicalDevice.FlowGroups)).(*ofp.FlowGroups), nil
}
// ListLogicalDevicePorts returns logical device ports
-func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() *voltha.LogicalPorts {
+func (agent *LogicalDeviceAgent) ListLogicalDevicePorts(ctx context.Context) (*voltha.LogicalPorts, error) {
log.Debug("ListLogicalDevicePorts")
- logicalDevice := agent.GetLogicalDevice()
+ logicalDevice, err := agent.GetLogicalDevice(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if logicalDevice == nil {
+ return &voltha.LogicalPorts{}, nil
+ }
lPorts := make([]*voltha.LogicalPort, 0)
lPorts = append(lPorts, logicalDevice.Ports...)
- return &voltha.LogicalPorts{Items: lPorts}
+ return &voltha.LogicalPorts{Items: lPorts}, nil
}
//updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
@@ -379,7 +413,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(child *voltha.Device) {
- if err = agent.setupUNILogicalPorts(ctx, child); err != nil {
+ if err = agent.setupUNILogicalPorts(context.Background(), child); err != nil {
log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
}
@@ -420,8 +454,10 @@
// updatePortState updates the port state of the device
func (agent *LogicalDeviceAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
log.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
// Get the latest logical device info
cloned := agent.getLogicalDeviceWithoutLock()
for idx, lPort := range cloned.Ports {
@@ -447,8 +483,10 @@
// updatePortsState updates the ports state related to the device
func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
// Get the latest logical device info
cloned := agent.getLogicalDeviceWithoutLock()
for _, lport := range cloned.Ports {
@@ -494,8 +532,10 @@
// deleteAllLogicalPorts deletes all logical ports associated with this device
func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
// Get the latest logical device info
ld := agent.getLogicalDeviceWithoutLock()
@@ -522,8 +562,10 @@
// deleteAllUNILogicalPorts deletes all UNI logical ports associated with this parent device
func (agent *LogicalDeviceAgent) deleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
log.Debugw("delete-all-uni-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
// Get the latest logical device info
ld := agent.getLogicalDeviceWithoutLock()
@@ -558,7 +600,9 @@
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceID)
}
- agent.logicalDevice = (proto.Clone(logicalDevice)).(*voltha.LogicalDevice)
+ //agent.logicalDevice = (proto.Clone(logicalDevice)).(*voltha.LogicalDevice)
+ agent.logicalDevice = logicalDevice
+
return nil
}
@@ -568,12 +612,15 @@
agent.lockDeviceRoutes.Lock()
defer agent.lockDeviceRoutes.Unlock()
- ld := agent.GetLogicalDevice()
+ ld, err := agent.GetLogicalDevice(ctx)
+ if err != nil {
+ return err
+ }
if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
return nil
}
- log.Debug("Generation of device graph required")
+ log.Debug("Generation of device route required")
if err := agent.buildRoutes(ctx); err != nil {
return err
}
@@ -651,9 +698,10 @@
if meterMod == nil {
return nil
}
- log.Debug("Waiting for logical device lock!!")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
log.Debug("Acquired logical device lock")
lDevice := agent.getLogicalDeviceWithoutLock()
@@ -686,8 +734,10 @@
if meterMod == nil {
return nil
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
lDevice := agent.getLogicalDeviceWithoutLock()
@@ -742,8 +792,10 @@
if meterMod == nil {
return nil
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
lDevice := agent.getLogicalDeviceWithoutLock()
@@ -831,8 +883,10 @@
if mod == nil {
return nil
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
lDevice := agent.getLogicalDeviceWithoutLock()
@@ -904,11 +958,6 @@
}
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
- log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
-
// Update model
if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
@@ -930,7 +979,17 @@
}
}
+ // Send the flows to the devices
+ respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &flowMetadata)
+ // Create the go routines to wait
+ go func() {
+ // Wait for completion
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
+ log.Warnw("failure-to-add-flows", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
+ // TODO : revert added flow
+ }
+ }()
}
return nil
}
@@ -970,8 +1029,10 @@
if mod == nil {
return nil
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
lDevice := agent.getLogicalDeviceWithoutLock()
@@ -1027,29 +1088,38 @@
}
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
- log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
-
if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: toKeep}); err != nil {
log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
+
+ // Update the devices
+ respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+
+ // Wait for the responses
+ go func() {
+ // Wait for completion
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ // TODO: Revert the flow deletion
+ }
+ }()
}
//TODO: send announcement on delete
return nil
}
-func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
- log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
+func (agent *LogicalDeviceAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+ log.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
+ ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ defer cancel()
if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
@@ -1057,21 +1127,20 @@
response.Done()
}(deviceID, value)
}
- // Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
- return status.Errorf(codes.Aborted, "errors-%s", res)
- }
- return nil
+ // Return responses (an array of channels) for the caller to wait for a response from the far end.
+ return responses
}
-func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
- log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+func (agent *LogicalDeviceAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+ log.Debugw("send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
+ ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ defer cancel()
if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
@@ -1079,21 +1148,19 @@
response.Done()
}(deviceID, value)
}
- // Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
- return status.Errorf(codes.Aborted, "errors-%s", res)
- }
- return nil
+ return responses
}
-func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
- log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+func (agent *LogicalDeviceAgent) updateFlowsAndGroupsOfDevice(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+ log.Debugw("send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
+ ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ defer cancel()
if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
@@ -1101,11 +1168,7 @@
response.Done()
}(deviceID, value)
}
- // Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
- return status.Errorf(codes.Aborted, "errors-%s", res)
- }
- return nil
+ return responses
}
//flowDeleteStrict deletes a flow from the flow table of that logical device
@@ -1114,8 +1177,10 @@
if mod == nil {
return nil
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
lDevice := agent.getLogicalDeviceWithoutLock()
@@ -1172,15 +1237,21 @@
}
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
- log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
-
if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
+
+ // Update the devices
+ respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+
+ // Wait for completion
+ go func() {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ log.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ //TODO: Revert flow changes
+ }
+ }()
}
return nil
}
@@ -1200,8 +1271,10 @@
if groupMod == nil {
return nil
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
lDevice := agent.getLogicalDeviceWithoutLock()
@@ -1216,19 +1289,25 @@
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
log.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
- if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
- log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
- } else {
- return fmt.Errorf("Groups %d already present", groupMod.GroupId)
+
+ // Update the devices
+ respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
+
+ // Wait for completion
+ go func() {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ log.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ //TODO: Revert flow changes
+ }
+ }()
+ return nil
}
- return nil
+ return fmt.Errorf("Groups %d already present", groupMod.GroupId)
}
func (agent *LogicalDeviceAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
@@ -1236,8 +1315,10 @@
if groupMod == nil {
return nil
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
lDevice := agent.getLogicalDeviceWithoutLock()
groups := lDevice.FlowGroups.Items
@@ -1266,23 +1347,29 @@
}
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, nil); err != nil {
- log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
- return err
+ if groupsChanged {
+ if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
+ log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ return err
+ }
}
- }
+ if flowsChanged {
+ if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
+ log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ return err
+ }
+ }
- if groupsChanged {
- if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
- log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- return err
- }
- }
- if flowsChanged {
- if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
- log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- return err
- }
+ // Update the devices
+ respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
+
+ // Wait for completion
+ go func() {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ log.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ //TODO: Revert flow changes
+ }
+ }()
}
return nil
}
@@ -1292,8 +1379,10 @@
if groupMod == nil {
return nil
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
lDevice := agent.getLogicalDeviceWithoutLock()
groups := lDevice.FlowGroups.Items
@@ -1315,24 +1404,32 @@
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
log.Debugw("rules", log.Fields{"rules for group-modify": deviceRules.String()})
- if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
- log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
- //lDevice.FlowGroups.Items = groups
if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
+
+ // Update the devices
+ respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
+
+ // Wait for completion
+ go func() {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ log.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ //TODO: Revert flow changes
+ }
+ }()
}
return nil
}
// deleteLogicalPort removes the logical port
func (agent *LogicalDeviceAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
logicalDevice := agent.getLogicalDeviceWithoutLock()
@@ -1368,8 +1465,10 @@
// deleteLogicalPorts removes the logical ports associated with that deviceId
func (agent *LogicalDeviceAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
logicalDevice := agent.getLogicalDeviceWithoutLock()
lPortstoKeep := []*voltha.LogicalPort{}
@@ -1403,8 +1502,10 @@
// enableLogicalPort enables the logical port
func (agent *LogicalDeviceAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
logicalDevice := agent.getLogicalDeviceWithoutLock()
@@ -1424,8 +1525,10 @@
// disableLogicalPort disabled the logical port
func (agent *LogicalDeviceAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
// Get the most up to date logical device
logicalDevice := agent.getLogicalDeviceWithoutLock()
@@ -1546,9 +1649,12 @@
//rebuildRoutes rebuilds the device routes
func (agent *LogicalDeviceAgent) buildRoutes(ctx context.Context) error {
- log.Debugw("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ log.Debugf("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
if agent.deviceRoutes == nil {
agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
}
@@ -1568,8 +1674,11 @@
//updateRoutes updates the device routes
func (agent *LogicalDeviceAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
log.Debugw("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
if agent.deviceRoutes == nil {
agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
}
@@ -1673,13 +1782,15 @@
log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
return false, nil
}
- agent.lockLogicalDevice.RLock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return false, err
+ }
if agent.portExist(device, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
- agent.lockLogicalDevice.RUnlock()
+ agent.requestQueue.RequestComplete()
return false, nil
}
- agent.lockLogicalDevice.RUnlock()
+ agent.requestQueue.RequestComplete()
var portCap *ic.PortCapability
var err error
@@ -1689,8 +1800,11 @@
return false, err
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return false, err
+ }
+
+ defer agent.requestQueue.RequestComplete()
// Double check again if this port has been already added since the getPortCapability could have taken a long time
if agent.portExist(device, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
@@ -1722,7 +1836,7 @@
clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
go func() {
if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
- log.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
+ log.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
}
}()
@@ -1749,13 +1863,16 @@
log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
return false, nil
}
- agent.lockLogicalDevice.RLock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return false, err
+ }
+
if agent.portExist(childDevice, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
- agent.lockLogicalDevice.RUnlock()
+ agent.requestQueue.RequestComplete()
return false, nil
}
- agent.lockLogicalDevice.RUnlock()
+ agent.requestQueue.RequestComplete()
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -1763,8 +1880,10 @@
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
return false, err
}
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return false, err
+ }
+ defer agent.requestQueue.RequestComplete()
// Double check again if this port has been already added since the getPortCapability could have taken a long time
if agent.portExist(childDevice, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})