VOL-1623-meter support and handling techprofile and fix for flow delete , now migrated to onosproject/onos:1.13.9-rc4
Change in flowupdate API towards adapters
Remove meter_get API from adapter to core
Added dependent vendor library files downloaded by "dep-ensure -update"
Added techprofile changes in the single commit
Review comments are addressed
submiting patch for integration tests for meter changes and modifications in unit test for updated flow decomposer logic
- submitting on behalf of "Salman.Siddiqui@radisys.com"
Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management
Rebased
Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management
- submitting on behalf of "Salman.Siddiqui@radisys.com"
pulled latest protos
verified EAPOL/DHCP/HSIA data with Edgecore OLT & TW ONT kit for one subcriber
verified delete/re-add is working end to end for the same subscriber
Change-Id: Idb232b7a0f05dc0c7e68266ac885740a3adff317
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 7b9e00b..a61ca25 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -215,16 +215,16 @@
ch <- nil
}
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, ch chan interface{}) {
- if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups); err != nil {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+ if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
ch <- err
}
ch <- nil
}
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, ch chan interface{}) {
- if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups); err != nil {
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+ if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.lastData.Id, "error": err})
ch <- err
}
@@ -233,8 +233,8 @@
//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
//adapters
-func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry) error {
- log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+ log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
if (len(newFlows) | len(newGroups)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
@@ -302,7 +302,7 @@
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
return nil
}
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, chAdapters)
} else {
flowChanges := &ofp.FlowChanges{
@@ -314,7 +314,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
}
// store the changed data
@@ -323,6 +323,7 @@
go agent.updateDeviceWithoutLockAsync(device, chdB)
if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
return status.Errorf(codes.Aborted, "errors-%s", res)
}
@@ -331,7 +332,7 @@
//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
//adapters
-func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": flowsToDel, "groups": groupsToDel})
if (len(flowsToDel) | len(groupsToDel)) == 0 {
@@ -393,7 +394,7 @@
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
return nil
}
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, chAdapters)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, chAdapters)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -404,7 +405,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDel},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
}
// store the changed data
@@ -421,7 +422,7 @@
//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
//also sends the updates to the adapters
-func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
if (len(updatedFlows) | len(updatedGroups)) == 0 {
@@ -457,7 +458,7 @@
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, chAdapters)
} else {
var flowsToAdd []*ofp.OfpFlowStats
var flowsToDelete []*ofp.OfpFlowStats
@@ -512,7 +513,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
}
// store the updated data