Fix for delete device hangs.
Don't wait for adapter responses for flow update operations holding the device lock.
Amendments:
Create a completed response instead of using nil.
Fix ro_core docker builds
Change-Id: I04228c7bee5cf83c493d885c6751fd911a32c4f6
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 0f4c177..1cc1673 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -257,14 +257,12 @@
response.Done()
}
-//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
-//adapters
-func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
if (len(newFlows) | len(newGroups)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
- return nil
+ return coreutils.DoneResponse(), nil
}
agent.lockDevice.Lock()
@@ -273,7 +271,7 @@
var device *voltha.Device
var err error
if device, err = agent.getDeviceWithoutLock(); err != nil {
- return status.Errorf(codes.NotFound, "%s", agent.deviceID)
+ return coreutils.DoneResponse(), status.Errorf(codes.NotFound, "%s", agent.deviceID)
}
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
@@ -307,7 +305,7 @@
// Sanity check
if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
- return nil
+ return coreutils.DoneResponse(), nil
}
// Send update to adapters
@@ -321,7 +319,7 @@
if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
- return nil
+ return coreutils.DoneResponse(), nil
}
go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, response)
@@ -342,25 +340,32 @@
device.Flows = &voltha.Flows{Items: updatedFlows}
device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
if err := agent.updateDeviceWithoutLock(device); err != nil {
- return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
+ return response, nil
+}
+
+//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
+//adapters
+func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+ response, err := agent.addFlowsAndGroupsToAdapter(newFlows, newGroups, flowMetadata)
+ if err != nil {
+ return err
+ }
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
return status.Errorf(codes.Aborted, "errors-%s", res)
}
-
return nil
}
-//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
-//adapters
-func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (agent *DeviceAgent) deleteFlowsAndGroupsFromAdapter(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
if (len(flowsToDel) | len(groupsToDel)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
- return nil
+ return coreutils.DoneResponse(), nil
}
agent.lockDevice.Lock()
@@ -370,7 +375,7 @@
var err error
if device, err = agent.getDeviceWithoutLock(); err != nil {
- return status.Errorf(codes.NotFound, "%s", agent.deviceID)
+ return coreutils.DoneResponse(), status.Errorf(codes.NotFound, "%s", agent.deviceID)
}
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
@@ -405,7 +410,7 @@
// Sanity check
if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
- return nil
+ return coreutils.DoneResponse(), nil
}
// Send update to adapters
@@ -414,7 +419,7 @@
if !dType.AcceptsAddRemoveFlowUpdates {
if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
- return nil
+ return coreutils.DoneResponse(), nil
}
go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
} else {
@@ -434,24 +439,32 @@
device.Flows = &voltha.Flows{Items: flowsToKeep}
device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
if err := agent.updateDeviceWithoutLock(device); err != nil {
- return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
+ return response, nil
+
+}
+
+//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
+//adapters
+func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+ response, err := agent.deleteFlowsAndGroupsFromAdapter(flowsToDel, groupsToDel, flowMetadata)
+ if err != nil {
+ return err
+ }
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
-
}
-//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
-//also sends the updates to the adapters
-func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (agent *DeviceAgent) updateFlowsAndGroupsToAdapter(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
if (len(updatedFlows) | len(updatedGroups)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
- return nil
+ return coreutils.DoneResponse(), nil
}
agent.lockDevice.Lock()
@@ -459,14 +472,14 @@
var device *voltha.Device
var err error
if device, err = agent.getDeviceWithoutLock(); err != nil {
- return status.Errorf(codes.NotFound, "%s", agent.deviceID)
+ return coreutils.DoneResponse(), status.Errorf(codes.NotFound, "%s", agent.deviceID)
}
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
- return nil
+ return coreutils.DoneResponse(), nil
}
log.Debugw("updating-flows-and-groups",
@@ -524,7 +537,7 @@
// Sanity check
if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
- return nil
+ return coreutils.DoneResponse(), nil
}
flowChanges := &ofp.FlowChanges{
@@ -543,9 +556,19 @@
device.Flows = &voltha.Flows{Items: updatedFlows}
device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
if err := agent.updateDeviceWithoutLock(device); err != nil {
- return status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
+ return response, nil
+}
+
+//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
+//also sends the updates to the adapters
+func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+ response, err := agent.updateFlowsAndGroupsToAdapter(updatedFlows, updatedGroups, flowMetadata)
+ if err != nil {
+ return err
+ }
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 584d923..82465ef 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -58,6 +58,19 @@
}
}
+// Fake a completed response.
+func DoneResponse() Response {
+ r := Response{
+ &response{
+ err: nil,
+ ch: make(chan struct{}),
+ done: true,
+ },
+ }
+ close(r.ch)
+ return r
+}
+
// Error sends a response with the given error. It may only be called once.
func (r Response) Error(err error) {
// if this is called twice, it will panic; this is intentional