Added a helper type to more safely handle async request completion.
Fixes VOL-2286
Change-Id: Ifcbbfdf64c3614838adbbaa11ca69d3d49c44861
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index b7599b7..9dd41fb 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -239,20 +239,20 @@
return nil
}
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceId, "error": err})
- ch <- err
+ response.Error(err)
}
- ch <- nil
+ response.Done()
}
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceId, "error": err})
- ch <- err
+ response.Error(err)
}
- ch <- nil
+ response.Done()
}
//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
@@ -317,7 +317,7 @@
// Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
// to send their responses. These channels will be garbage collected once all the responses are
// received
- chAdapters := make(chan interface{})
+ response := coreutils.NewResponse()
dType := agent.adapterMgr.getDeviceType(device.Type)
if !dType.AcceptsAddRemoveFlowUpdates {
@@ -325,7 +325,7 @@
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
return nil
}
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, chAdapters)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, response)
} else {
flowChanges := &ofp.FlowChanges{
@@ -337,7 +337,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
}
// store the changed data
@@ -347,7 +347,7 @@
return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
return status.Errorf(codes.Aborted, "errors-%s", res)
}
@@ -411,14 +411,14 @@
}
// Send update to adapters
- chAdapters := make(chan interface{})
+ response := coreutils.NewResponse()
dType := agent.adapterMgr.getDeviceType(device.Type)
if !dType.AcceptsAddRemoveFlowUpdates {
if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
return nil
}
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, chAdapters)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -429,7 +429,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDel},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
}
// store the changed data
@@ -439,7 +439,7 @@
return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -478,12 +478,12 @@
"updatedGroups": updatedGroups,
})
- chAdapters := make(chan interface{})
+ response := coreutils.NewResponse()
dType := agent.adapterMgr.getDeviceType(device.Type)
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, chAdapters)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
} else {
var flowsToAdd []*ofp.OfpFlowStats
var flowsToDelete []*ofp.OfpFlowStats
@@ -538,7 +538,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
}
// store the updated data
@@ -548,7 +548,7 @@
return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 56db2b1..ce7ecd9 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -594,13 +594,13 @@
return nil
}
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]utils.Response, 0)
for _, rootDeviceId := range rootDeviceIds {
if rootDevice, _ := dMgr.getDeviceFromModel(rootDeviceId); rootDevice != nil {
if rootDevice.Adapter == adapter.Id {
if isOkToReconcile(rootDevice) {
log.Debugw("reconciling-root-device", log.Fields{"rootId": rootDevice.Id})
- chnlsList = dMgr.sendReconcileDeviceRequest(rootDevice, chnlsList)
+ responses = append(responses, dMgr.sendReconcileDeviceRequest(rootDevice))
} else {
log.Debugw("not-reconciling-root-device", log.Fields{"rootId": rootDevice.Id, "state": rootDevice.AdminState})
}
@@ -612,7 +612,7 @@
if childDevice.Adapter == adapter.Id {
if isOkToReconcile(childDevice) {
log.Debugw("reconciling-child-device", log.Fields{"childId": childDevice.Id})
- chnlsList = dMgr.sendReconcileDeviceRequest(childDevice, chnlsList)
+ responses = append(responses, dMgr.sendReconcileDeviceRequest(childDevice))
} else {
log.Debugw("not-reconciling-child-device", log.Fields{"childId": childDevice.Id, "state": childDevice.AdminState})
}
@@ -627,9 +627,9 @@
}
}
}
- if len(chnlsList) > 0 {
+ if len(responses) > 0 {
// Wait for completion
- if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, chnlsList...); res != nil {
+ if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
} else {
@@ -638,36 +638,35 @@
return nil
}
-func (dMgr *DeviceManager) sendReconcileDeviceRequest(device *voltha.Device, chnlsList []chan interface{}) []chan interface{} {
+func (dMgr *DeviceManager) sendReconcileDeviceRequest(device *voltha.Device) utils.Response {
// Send a reconcile request to the adapter. Since this Core may not be managing this device then there is no
// point of creating a device agent (if the device is not being managed by this Core) before sending the request
// to the adapter. We will therefore bypass the adapter adapter and send the request directly to teh adapter via
// the adapter_proxy.
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
+ response := utils.NewResponse()
go func(device *voltha.Device) {
if err := dMgr.adapterProxy.ReconcileDevice(context.Background(), device); err != nil {
log.Errorw("reconcile-request-failed", log.Fields{"deviceId": device.Id, "error": err})
- ch <- status.Errorf(codes.Internal, "device: %s", device.Id)
+ response.Error(status.Errorf(codes.Internal, "device: %s", device.Id))
}
- ch <- nil
+ response.Done()
}(device)
- return chnlsList
+ return response
}
func (dMgr *DeviceManager) reconcileChildDevices(parentDeviceId string) error {
if parentDevice, _ := dMgr.getDeviceFromModel(parentDeviceId); parentDevice != nil {
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]utils.Response, 0)
for _, port := range parentDevice.Ports {
for _, peer := range port.Peers {
if childDevice, _ := dMgr.getDeviceFromModel(peer.DeviceId); childDevice != nil {
- chnlsList = dMgr.sendReconcileDeviceRequest(childDevice, chnlsList)
+ responses = append(responses, dMgr.sendReconcileDeviceRequest(childDevice))
}
}
}
// Wait for completion
- if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, chnlsList...); res != nil {
+ if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index e04b4d6..3dbc2df 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -398,20 +398,20 @@
log.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceId})
return err
} else {
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]coreutils.Response, 0)
for _, child := range children.Items {
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
- go func(device *voltha.Device, ch chan interface{}) {
- if err = agent.setupUNILogicalPorts(nil, device); err != nil {
- log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": device.Id})
- ch <- status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", device.Id)
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ go func(child *voltha.Device) {
+ if err = agent.setupUNILogicalPorts(nil, child); err != nil {
+ log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
+ response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
}
- ch <- nil
- }(child, ch)
+ response.Done()
+ }(child)
}
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
}
@@ -1058,20 +1058,20 @@
func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]coreutils.Response, 0)
for deviceId, value := range deviceRules.GetRules() {
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
- go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ go func(deviceId string, value *fu.FlowsAndGroups) {
+ if err := agent.deviceMgr.addFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
- ch <- status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId)
+ response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
}
- ch <- nil
- }(deviceId, value.ListFlows(), value.ListGroups())
+ response.Done()
+ }(deviceId, value)
}
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -1080,20 +1080,20 @@
func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]coreutils.Response, 0)
for deviceId, value := range deviceRules.GetRules() {
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
- go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ go func(deviceId string, value *fu.FlowsAndGroups) {
+ if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
- ch <- status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId)
+ response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
}
- ch <- nil
- }(deviceId, value.ListFlows(), value.ListGroups())
+ response.Done()
+ }(deviceId, value)
}
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -1102,20 +1102,20 @@
func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]coreutils.Response, 0)
for deviceId, value := range deviceRules.GetRules() {
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
- go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ go func(deviceId string, value *fu.FlowsAndGroups) {
+ if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
- ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+ response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
}
- ch <- nil
- }(deviceId, value.ListFlows(), value.ListGroups())
+ response.Done()
+ }(deviceId, value)
}
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil