Removed updateDeviceWithoutLockAsync to avoid a potential lockless write issue.
Fixes VOL-2181
Change-Id: I7db6e87370017dea7552432e7777396bd4ca1a7a
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index e01d419..b7599b7 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -239,13 +239,6 @@
return nil
}
-func (agent *DeviceAgent) updateDeviceWithoutLockAsync(device *voltha.Device, ch chan interface{}) {
- if err := agent.updateDeviceWithoutLock(device); err != nil {
- ch <- status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
- }
- ch <- nil
-}
-
func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceId, "error": err})
@@ -325,7 +318,6 @@
// to send their responses. These channels will be garbage collected once all the responses are
// received
chAdapters := make(chan interface{})
- chdB := make(chan interface{})
dType := agent.adapterMgr.getDeviceType(device.Type)
if !dType.AcceptsAddRemoveFlowUpdates {
@@ -351,9 +343,11 @@
// store the changed data
device.Flows = &voltha.Flows{Items: updatedFlows}
device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
- go agent.updateDeviceWithoutLockAsync(device, chdB)
+ if err := agent.updateDeviceWithoutLock(device); err != nil {
+ return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
+ }
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
return status.Errorf(codes.Aborted, "errors-%s", res)
}
@@ -418,7 +412,6 @@
// Send update to adapters
chAdapters := make(chan interface{})
- chdB := make(chan interface{})
dType := agent.adapterMgr.getDeviceType(device.Type)
if !dType.AcceptsAddRemoveFlowUpdates {
if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
@@ -442,9 +435,11 @@
// store the changed data
device.Flows = &voltha.Flows{Items: flowsToKeep}
device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
- go agent.updateDeviceWithoutLockAsync(device, chdB)
+ if err := agent.updateDeviceWithoutLock(device); err != nil {
+ return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
+ }
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -484,7 +479,6 @@
})
chAdapters := make(chan interface{})
- chdB := make(chan interface{})
dType := agent.adapterMgr.getDeviceType(device.Type)
// Process bulk flow update differently than incremental update
@@ -550,9 +544,11 @@
// store the updated data
device.Flows = &voltha.Flows{Items: updatedFlows}
device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
- go agent.updateDeviceWithoutLockAsync(device, chdB)
+ if err := agent.updateDeviceWithoutLock(device); err != nil {
+ return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
+ }
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil