Added a helper type to more safely handle async request completion.
Fixes VOL-2286
Change-Id: Ifcbbfdf64c3614838adbbaa11ca69d3d49c44861
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index e04b4d6..3dbc2df 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -398,20 +398,20 @@
log.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceId})
return err
} else {
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]coreutils.Response, 0)
for _, child := range children.Items {
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
- go func(device *voltha.Device, ch chan interface{}) {
- if err = agent.setupUNILogicalPorts(nil, device); err != nil {
- log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": device.Id})
- ch <- status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", device.Id)
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ go func(child *voltha.Device) {
+ if err = agent.setupUNILogicalPorts(nil, child); err != nil {
+ log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
+ response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
}
- ch <- nil
- }(child, ch)
+ response.Done()
+ }(child)
}
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
}
@@ -1058,20 +1058,20 @@
func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]coreutils.Response, 0)
for deviceId, value := range deviceRules.GetRules() {
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
- go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ go func(deviceId string, value *fu.FlowsAndGroups) {
+ if err := agent.deviceMgr.addFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
- ch <- status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId)
+ response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
}
- ch <- nil
- }(deviceId, value.ListFlows(), value.ListGroups())
+ response.Done()
+ }(deviceId, value)
}
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -1080,20 +1080,20 @@
func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]coreutils.Response, 0)
for deviceId, value := range deviceRules.GetRules() {
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
- go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ go func(deviceId string, value *fu.FlowsAndGroups) {
+ if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
- ch <- status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId)
+ response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
}
- ch <- nil
- }(deviceId, value.ListFlows(), value.ListGroups())
+ response.Done()
+ }(deviceId, value)
}
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -1102,20 +1102,20 @@
func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
- chnlsList := make([]chan interface{}, 0)
+ responses := make([]coreutils.Response, 0)
for deviceId, value := range deviceRules.GetRules() {
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
- go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ go func(deviceId string, value *fu.FlowsAndGroups) {
+ if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
- ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+ response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
}
- ch <- nil
- }(deviceId, value.ListFlows(), value.ListGroups())
+ response.Done()
+ }(deviceId, value)
}
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil