Added a helper type to more safely handle async request completion.

Fixes VOL-2286

Change-Id: Ifcbbfdf64c3614838adbbaa11ca69d3d49c44861
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index b7599b7..9dd41fb 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -239,20 +239,20 @@
 	return nil
 }
 
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
 	if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
 		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceId, "error": err})
-		ch <- err
+		response.Error(err)
 	}
-	ch <- nil
+	response.Done()
 }
 
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
 	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
 		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceId, "error": err})
-		ch <- err
+		response.Error(err)
 	}
-	ch <- nil
+	response.Done()
 }
 
 //addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
@@ -317,7 +317,7 @@
 	// Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
 	// to send their responses.  These channels will be garbage collected once all the responses are
 	// received
-	chAdapters := make(chan interface{})
+	response := coreutils.NewResponse()
 	dType := agent.adapterMgr.getDeviceType(device.Type)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 
@@ -325,7 +325,7 @@
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
 			return nil
 		}
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, chAdapters)
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, response)
 
 	} else {
 		flowChanges := &ofp.FlowChanges{
@@ -337,7 +337,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	// store the changed data
@@ -347,7 +347,7 @@
 		return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
 		log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
@@ -411,14 +411,14 @@
 	}
 
 	// Send update to adapters
-	chAdapters := make(chan interface{})
+	response := coreutils.NewResponse()
 	dType := agent.adapterMgr.getDeviceType(device.Type)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
 			return nil
 		}
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, chAdapters)
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -429,7 +429,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	// store the changed data
@@ -439,7 +439,7 @@
 		return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -478,12 +478,12 @@
 			"updatedGroups": updatedGroups,
 		})
 
-	chAdapters := make(chan interface{})
+	response := coreutils.NewResponse()
 	dType := agent.adapterMgr.getDeviceType(device.Type)
 
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, chAdapters)
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
 	} else {
 		var flowsToAdd []*ofp.OfpFlowStats
 		var flowsToDelete []*ofp.OfpFlowStats
@@ -538,7 +538,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	// store the updated data
@@ -548,7 +548,7 @@
 		return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil