Added a helper type to more safely handle async request completion.

Fixes VOL-2286

Change-Id: Ifcbbfdf64c3614838adbbaa11ca69d3d49c44861
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index e04b4d6..3dbc2df 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -398,20 +398,20 @@
 		log.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceId})
 		return err
 	} else {
-		chnlsList := make([]chan interface{}, 0)
+		responses := make([]coreutils.Response, 0)
 		for _, child := range children.Items {
-			ch := make(chan interface{})
-			chnlsList = append(chnlsList, ch)
-			go func(device *voltha.Device, ch chan interface{}) {
-				if err = agent.setupUNILogicalPorts(nil, device); err != nil {
-					log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": device.Id})
-					ch <- status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", device.Id)
+			response := coreutils.NewResponse()
+			responses = append(responses, response)
+			go func(child *voltha.Device) {
+				if err = agent.setupUNILogicalPorts(nil, child); err != nil {
+					log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
+					response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
 				}
-				ch <- nil
-			}(child, ch)
+				response.Done()
+			}(child)
 		}
 		// Wait for completion
-		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
 			return status.Errorf(codes.Aborted, "errors-%s", res)
 		}
 	}
@@ -1058,20 +1058,20 @@
 func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
 
-	chnlsList := make([]chan interface{}, 0)
+	responses := make([]coreutils.Response, 0)
 	for deviceId, value := range deviceRules.GetRules() {
-		ch := make(chan interface{})
-		chnlsList = append(chnlsList, ch)
-		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+		response := coreutils.NewResponse()
+		responses = append(responses, response)
+		go func(deviceId string, value *fu.FlowsAndGroups) {
+			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
-				ch <- status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId)
+				response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
 			}
-			ch <- nil
-		}(deviceId, value.ListFlows(), value.ListGroups())
+			response.Done()
+		}(deviceId, value)
 	}
 	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -1080,20 +1080,20 @@
 func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
 
-	chnlsList := make([]chan interface{}, 0)
+	responses := make([]coreutils.Response, 0)
 	for deviceId, value := range deviceRules.GetRules() {
-		ch := make(chan interface{})
-		chnlsList = append(chnlsList, ch)
-		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+		response := coreutils.NewResponse()
+		responses = append(responses, response)
+		go func(deviceId string, value *fu.FlowsAndGroups) {
+			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
-				ch <- status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId)
+				response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
 			}
-			ch <- nil
-		}(deviceId, value.ListFlows(), value.ListGroups())
+			response.Done()
+		}(deviceId, value)
 	}
 	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -1102,20 +1102,20 @@
 func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
 
-	chnlsList := make([]chan interface{}, 0)
+	responses := make([]coreutils.Response, 0)
 	for deviceId, value := range deviceRules.GetRules() {
-		ch := make(chan interface{})
-		chnlsList = append(chnlsList, ch)
-		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+		response := coreutils.NewResponse()
+		responses = append(responses, response)
+		go func(deviceId string, value *fu.FlowsAndGroups) {
+			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
-				ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+				response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
 			}
-			ch <- nil
-		}(deviceId, value.ListFlows(), value.ListGroups())
+			response.Done()
+		}(deviceId, value)
 	}
 	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil