Added a helper type to more safely handle async request completion.

Fixes VOL-2286

Change-Id: Ifcbbfdf64c3614838adbbaa11ca69d3d49c44861
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index b7599b7..9dd41fb 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -239,20 +239,20 @@
 	return nil
 }
 
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
 	if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
 		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceId, "error": err})
-		ch <- err
+		response.Error(err)
 	}
-	ch <- nil
+	response.Done()
 }
 
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
 	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
 		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceId, "error": err})
-		ch <- err
+		response.Error(err)
 	}
-	ch <- nil
+	response.Done()
 }
 
 //addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
@@ -317,7 +317,7 @@
 	// Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
 	// to send their responses.  These channels will be garbage collected once all the responses are
 	// received
-	chAdapters := make(chan interface{})
+	response := coreutils.NewResponse()
 	dType := agent.adapterMgr.getDeviceType(device.Type)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 
@@ -325,7 +325,7 @@
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
 			return nil
 		}
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, chAdapters)
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, response)
 
 	} else {
 		flowChanges := &ofp.FlowChanges{
@@ -337,7 +337,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	// store the changed data
@@ -347,7 +347,7 @@
 		return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
 		log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
@@ -411,14 +411,14 @@
 	}
 
 	// Send update to adapters
-	chAdapters := make(chan interface{})
+	response := coreutils.NewResponse()
 	dType := agent.adapterMgr.getDeviceType(device.Type)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
 			return nil
 		}
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, chAdapters)
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -429,7 +429,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	// store the changed data
@@ -439,7 +439,7 @@
 		return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -478,12 +478,12 @@
 			"updatedGroups": updatedGroups,
 		})
 
-	chAdapters := make(chan interface{})
+	response := coreutils.NewResponse()
 	dType := agent.adapterMgr.getDeviceType(device.Type)
 
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, chAdapters)
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
 	} else {
 		var flowsToAdd []*ofp.OfpFlowStats
 		var flowsToDelete []*ofp.OfpFlowStats
@@ -538,7 +538,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	// store the updated data
@@ -548,7 +548,7 @@
 		return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 56db2b1..ce7ecd9 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -594,13 +594,13 @@
 		return nil
 	}
 
-	chnlsList := make([]chan interface{}, 0)
+	responses := make([]utils.Response, 0)
 	for _, rootDeviceId := range rootDeviceIds {
 		if rootDevice, _ := dMgr.getDeviceFromModel(rootDeviceId); rootDevice != nil {
 			if rootDevice.Adapter == adapter.Id {
 				if isOkToReconcile(rootDevice) {
 					log.Debugw("reconciling-root-device", log.Fields{"rootId": rootDevice.Id})
-					chnlsList = dMgr.sendReconcileDeviceRequest(rootDevice, chnlsList)
+					responses = append(responses, dMgr.sendReconcileDeviceRequest(rootDevice))
 				} else {
 					log.Debugw("not-reconciling-root-device", log.Fields{"rootId": rootDevice.Id, "state": rootDevice.AdminState})
 				}
@@ -612,7 +612,7 @@
 							if childDevice.Adapter == adapter.Id {
 								if isOkToReconcile(childDevice) {
 									log.Debugw("reconciling-child-device", log.Fields{"childId": childDevice.Id})
-									chnlsList = dMgr.sendReconcileDeviceRequest(childDevice, chnlsList)
+									responses = append(responses, dMgr.sendReconcileDeviceRequest(childDevice))
 								} else {
 									log.Debugw("not-reconciling-child-device", log.Fields{"childId": childDevice.Id, "state": childDevice.AdminState})
 								}
@@ -627,9 +627,9 @@
 			}
 		}
 	}
-	if len(chnlsList) > 0 {
+	if len(responses) > 0 {
 		// Wait for completion
-		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, chnlsList...); res != nil {
+		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
 			return status.Errorf(codes.Aborted, "errors-%s", res)
 		}
 	} else {
@@ -638,36 +638,35 @@
 	return nil
 }
 
-func (dMgr *DeviceManager) sendReconcileDeviceRequest(device *voltha.Device, chnlsList []chan interface{}) []chan interface{} {
+func (dMgr *DeviceManager) sendReconcileDeviceRequest(device *voltha.Device) utils.Response {
 	// Send a reconcile request to the adapter. Since this Core may not be managing this device then there is no
 	// point of creating a device agent (if the device is not being managed by this Core) before sending the request
 	// to the adapter.   We will therefore bypass the adapter adapter and send the request directly to teh adapter via
 	// the adapter_proxy.
-	ch := make(chan interface{})
-	chnlsList = append(chnlsList, ch)
+	response := utils.NewResponse()
 	go func(device *voltha.Device) {
 		if err := dMgr.adapterProxy.ReconcileDevice(context.Background(), device); err != nil {
 			log.Errorw("reconcile-request-failed", log.Fields{"deviceId": device.Id, "error": err})
-			ch <- status.Errorf(codes.Internal, "device: %s", device.Id)
+			response.Error(status.Errorf(codes.Internal, "device: %s", device.Id))
 		}
-		ch <- nil
+		response.Done()
 	}(device)
 
-	return chnlsList
+	return response
 }
 
 func (dMgr *DeviceManager) reconcileChildDevices(parentDeviceId string) error {
 	if parentDevice, _ := dMgr.getDeviceFromModel(parentDeviceId); parentDevice != nil {
-		chnlsList := make([]chan interface{}, 0)
+		responses := make([]utils.Response, 0)
 		for _, port := range parentDevice.Ports {
 			for _, peer := range port.Peers {
 				if childDevice, _ := dMgr.getDeviceFromModel(peer.DeviceId); childDevice != nil {
-					chnlsList = dMgr.sendReconcileDeviceRequest(childDevice, chnlsList)
+					responses = append(responses, dMgr.sendReconcileDeviceRequest(childDevice))
 				}
 			}
 		}
 		// Wait for completion
-		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, chnlsList...); res != nil {
+		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
 			return status.Errorf(codes.Aborted, "errors-%s", res)
 		}
 	}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index e04b4d6..3dbc2df 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -398,20 +398,20 @@
 		log.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceId})
 		return err
 	} else {
-		chnlsList := make([]chan interface{}, 0)
+		responses := make([]coreutils.Response, 0)
 		for _, child := range children.Items {
-			ch := make(chan interface{})
-			chnlsList = append(chnlsList, ch)
-			go func(device *voltha.Device, ch chan interface{}) {
-				if err = agent.setupUNILogicalPorts(nil, device); err != nil {
-					log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": device.Id})
-					ch <- status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", device.Id)
+			response := coreutils.NewResponse()
+			responses = append(responses, response)
+			go func(child *voltha.Device) {
+				if err = agent.setupUNILogicalPorts(nil, child); err != nil {
+					log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
+					response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
 				}
-				ch <- nil
-			}(child, ch)
+				response.Done()
+			}(child)
 		}
 		// Wait for completion
-		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
 			return status.Errorf(codes.Aborted, "errors-%s", res)
 		}
 	}
@@ -1058,20 +1058,20 @@
 func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
 
-	chnlsList := make([]chan interface{}, 0)
+	responses := make([]coreutils.Response, 0)
 	for deviceId, value := range deviceRules.GetRules() {
-		ch := make(chan interface{})
-		chnlsList = append(chnlsList, ch)
-		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+		response := coreutils.NewResponse()
+		responses = append(responses, response)
+		go func(deviceId string, value *fu.FlowsAndGroups) {
+			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
-				ch <- status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId)
+				response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
 			}
-			ch <- nil
-		}(deviceId, value.ListFlows(), value.ListGroups())
+			response.Done()
+		}(deviceId, value)
 	}
 	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -1080,20 +1080,20 @@
 func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
 
-	chnlsList := make([]chan interface{}, 0)
+	responses := make([]coreutils.Response, 0)
 	for deviceId, value := range deviceRules.GetRules() {
-		ch := make(chan interface{})
-		chnlsList = append(chnlsList, ch)
-		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+		response := coreutils.NewResponse()
+		responses = append(responses, response)
+		go func(deviceId string, value *fu.FlowsAndGroups) {
+			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
-				ch <- status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId)
+				response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
 			}
-			ch <- nil
-		}(deviceId, value.ListFlows(), value.ListGroups())
+			response.Done()
+		}(deviceId, value)
 	}
 	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -1102,20 +1102,20 @@
 func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
 
-	chnlsList := make([]chan interface{}, 0)
+	responses := make([]coreutils.Response, 0)
 	for deviceId, value := range deviceRules.GetRules() {
-		ch := make(chan interface{})
-		chnlsList = append(chnlsList, ch)
-		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
+		response := coreutils.NewResponse()
+		responses = append(responses, response)
+		go func(deviceId string, value *fu.FlowsAndGroups) {
+			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
-				ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+				response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
 			}
-			ch <- nil
-		}(deviceId, value.ListFlows(), value.ListGroups())
+			response.Done()
+		}(deviceId, value)
 	}
 	// Wait for completion
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index aad1348..3a71623 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -19,7 +19,6 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"os"
-	"reflect"
 	"time"
 )
 
@@ -35,62 +34,70 @@
 	return os.Getenv("HOSTNAME")
 }
 
+type Response struct {
+	*response
+}
+type response struct {
+	err  error
+	ch   chan struct{}
+	done bool
+}
+
+func NewResponse() Response {
+	return Response{
+		&response{
+			ch: make(chan struct{}),
+		},
+	}
+}
+
+// Error sends a response with the given error.  It may only be called once.
+func (r Response) Error(err error) {
+	// if this is called twice, it will panic; this is intentional
+	r.err = err
+	r.done = true
+	close(r.ch)
+}
+
+// Done sends a non-error response unless Error has already been called, in which case this is a no-op.
+func (r Response) Done() {
+	if !r.done {
+		close(r.ch)
+	}
+}
+
 //WaitForNilOrErrorResponses waits on a variadic number of channels for either a nil response or an error
 //response. If an error is received from a given channel then the returned error array will contain that error.
 //The error will be at the index corresponding to the order in which the channel appear in the parameter list.
 //If no errors is found then nil is returned.  This method also takes in a timeout in milliseconds. If a
 //timeout is obtained then this function will stop waiting for the remaining responses and abort.
-func WaitForNilOrErrorResponses(timeout int64, chnls ...chan interface{}) []error {
-	if len(chnls) == 0 {
-		return nil
-	}
-	// Create a timeout channel
-	tChnl := make(chan *interface{})
-	go func() {
-		time.Sleep(time.Duration(timeout) * time.Millisecond)
-		tChnl <- nil
-	}()
+func WaitForNilOrErrorResponses(timeout int64, responses ...Response) []error {
+	timedOut := make(chan struct{})
+	timer := time.AfterFunc(time.Duration(timeout)*time.Millisecond, func() { close(timedOut) })
+	defer timer.Stop()
 
-	errorsReceived := false
-	errors := make([]error, len(chnls))
-	cases := make([]reflect.SelectCase, len(chnls)+1)
-	for i, ch := range chnls {
-		cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
-	}
-	// Add the timeout channel
-	cases[len(chnls)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tChnl)}
-
-	resultsReceived := make([]bool, len(errors)+1)
-	remaining := len(cases) - 1
-	for remaining > 0 {
-		index, value, ok := reflect.Select(cases)
-		if !ok { // closed channel
-			//Set the channel at that index to nil to disable this case, hence preventing it from interfering with other cases.
-			cases[index].Chan = reflect.ValueOf(nil)
-			errors[index] = status.Error(codes.Internal, "channel closed")
-			errorsReceived = true
-		} else if index == len(chnls) { // Timeout has occurred
-			for k := range errors {
-				if !resultsReceived[k] {
-					errors[k] = status.Error(codes.Aborted, "timeout")
-				}
+	gotError := false
+	errors := make([]error, 0, len(responses))
+	for _, response := range responses {
+		var err error
+		select {
+		case <-response.ch:
+			// if a response is already available, use it
+			err = response.err
+		default:
+			// otherwise, wait for either a response or a timeout
+			select {
+			case <-response.ch:
+				err = response.err
+			case <-timedOut:
+				err = status.Error(codes.Aborted, "timeout")
 			}
-			errorsReceived = true
-			break
-		} else if value.IsNil() { // Nil means a good response
-			//do nothing
-		} else if err, ok := value.Interface().(error); ok { // error returned
-			errors[index] = err
-			errorsReceived = true
-		} else { // unknown value
-			errors[index] = status.Errorf(codes.Internal, "%s", value)
-			errorsReceived = true
 		}
-		resultsReceived[index] = true
-		remaining -= 1
+		gotError = gotError || err != nil
+		errors = append(errors, err)
 	}
 
-	if errorsReceived {
+	if gotError {
 		return errors
 	}
 	return nil
diff --git a/rw_core/utils/core_utils_test.go b/rw_core/utils/core_utils_test.go
index e0d0e75..9f8dd87 100644
--- a/rw_core/utils/core_utils_test.go
+++ b/rw_core/utils/core_utils_test.go
@@ -36,14 +36,14 @@
 	taskFailureError = status.Error(codes.Internal, "test failure task")
 }
 
-func runSuccessfulTask(ch chan interface{}, durationRange int) {
+func runSuccessfulTask(response Response, durationRange int) {
 	time.Sleep(time.Duration(rand.Intn(durationRange)) * time.Millisecond)
-	ch <- nil
+	response.Done()
 }
 
-func runFailureTask(ch chan interface{}, durationRange int) {
+func runFailureTask(response Response, durationRange int) {
 	time.Sleep(time.Duration(rand.Intn(durationRange)) * time.Millisecond)
-	ch <- taskFailureError
+	response.Error(taskFailureError)
 }
 
 func runMultipleTasks(timeout, numTasks, taskDurationRange, numSuccessfulTask, numFailuretask int) []error {
@@ -51,17 +51,17 @@
 		return []error{status.Error(codes.FailedPrecondition, "invalid-num-tasks")}
 	}
 	numSuccessfulTaskCreated := 0
-	chnls := make([]chan interface{}, numTasks)
+	responses := make([]Response, numTasks)
 	for i := 0; i < numTasks; i++ {
-		chnls[i] = make(chan interface{})
+		responses[i] = NewResponse()
 		if numSuccessfulTaskCreated < numSuccessfulTask {
-			go runSuccessfulTask(chnls[i], taskDurationRange)
+			go runSuccessfulTask(responses[i], taskDurationRange)
 			numSuccessfulTaskCreated += 1
 			continue
 		}
-		go runFailureTask(chnls[i], taskDurationRange)
+		go runFailureTask(responses[i], taskDurationRange)
 	}
-	return WaitForNilOrErrorResponses(int64(timeout), chnls...)
+	return WaitForNilOrErrorResponses(int64(timeout), responses...)
 }
 
 func getNumSuccessFailure(inputs []error) (numSuccess, numFailure, numTimeout int) {