VOL-1623-meter support and handling techprofile and fix for flow delete , now migrated to onosproject/onos:1.13.9-rc4
Change in flowupdate API towards adapters
Remove meter_get API from adapter to core
Added dependent vendor library files downloaded by "dep-ensure -update"
Added techprofile changes in the single commit
Review comments are addressed
submiting patch for integration tests for meter changes and modifications in unit test for updated flow decomposer logic
- submitting on behalf of "Salman.Siddiqui@radisys.com"
Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management
Rebased
Load test for meter updated and other flow management test cases with meter
- Performed load test for 1K meters serially and parallely and added more TC in flow management
- submitting on behalf of "Salman.Siddiqui@radisys.com"
pulled latest protos
verified EAPOL/DHCP/HSIA data with Edgecore OLT & TW ONT kit for one subcriber
verified delete/re-add is working end to end for the same subscriber
Change-Id: Idb232b7a0f05dc0c7e68266ac885740a3adff317
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 41f71a6..9511b9d 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -418,11 +418,11 @@
return unPackResponse(rpc, deviceId, success, result)
}
-func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flowsInUpdate": len(flows.Items), "groupsToUpdate": len(groups.Items)})
toTopic := ap.getAdapterTopic(device.Adapter)
rpc := "update_flows_bulk"
- args := make([]*kafka.KVArg, 3)
+ args := make([]*kafka.KVArg, 4)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
@@ -435,6 +435,10 @@
Key: "groups",
Value: groups,
}
+ args[3] = &kafka.KVArg{
+ Key: "flow_metadata",
+ Value: flowMetadata,
+ }
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
@@ -443,7 +447,7 @@
return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
+func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("UpdateFlowsIncremental",
log.Fields{
"deviceId": device.Id,
@@ -455,7 +459,7 @@
})
toTopic := ap.getAdapterTopic(device.Adapter)
rpc := "update_flows_incrementally"
- args := make([]*kafka.KVArg, 3)
+ args := make([]*kafka.KVArg, 4)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
@@ -469,6 +473,10 @@
Value: groupChanges,
}
+ args[3] = &kafka.KVArg{
+ Key: "flow_metadata",
+ Value: flowMetadata,
+ }
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 7b9e00b..a61ca25 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -215,16 +215,16 @@
ch <- nil
}
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, ch chan interface{}) {
- if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups); err != nil {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+ if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
ch <- err
}
ch <- nil
}
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, ch chan interface{}) {
- if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups); err != nil {
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
+ if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.lastData.Id, "error": err})
ch <- err
}
@@ -233,8 +233,8 @@
//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
//adapters
-func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry) error {
- log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+ log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
if (len(newFlows) | len(newGroups)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
@@ -302,7 +302,7 @@
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
return nil
}
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, chAdapters)
} else {
flowChanges := &ofp.FlowChanges{
@@ -314,7 +314,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
}
// store the changed data
@@ -323,6 +323,7 @@
go agent.updateDeviceWithoutLockAsync(device, chdB)
if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
return status.Errorf(codes.Aborted, "errors-%s", res)
}
@@ -331,7 +332,7 @@
//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
//adapters
-func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": flowsToDel, "groups": groupsToDel})
if (len(flowsToDel) | len(groupsToDel)) == 0 {
@@ -393,7 +394,7 @@
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
return nil
}
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, chAdapters)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, chAdapters)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -404,7 +405,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDel},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
}
// store the changed data
@@ -421,7 +422,7 @@
//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
//also sends the updates to the adapters
-func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
if (len(updatedFlows) | len(updatedGroups)) == 0 {
@@ -457,7 +458,7 @@
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, chAdapters)
} else {
var flowsToAdd []*ofp.OfpFlowStats
var flowsToDelete []*ofp.OfpFlowStats
@@ -512,7 +513,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
}
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, chAdapters)
}
// store the updated data
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 606e36f..f417b05 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -671,26 +671,26 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) addFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
- log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceId})
+func (dMgr *DeviceManager) addFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+ log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceId, "flowMetadata": flowMetadata})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.addFlowsAndGroups(flows, groups)
+ return agent.addFlowsAndGroups(flows, groups, flowMetadata)
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) deleteFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
+func (dMgr *DeviceManager) deleteFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.deleteFlowsAndGroups(flows, groups)
+ return agent.deleteFlowsAndGroups(flows, groups, flowMetadata)
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) updateFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
+func (dMgr *DeviceManager) updateFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("updateFlowsAndGroups", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.updateFlowsAndGroups(flows, groups)
+ return agent.updateFlowsAndGroups(flows, groups, flowMetadata)
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index f2c16e7..bdb3e39 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -943,13 +943,17 @@
return nil, nil
}
-//@TODO useless stub, what should this actually do?
-func (handler *APIHandler) GetMeterStatsOfLogicalDevice(
- ctx context.Context,
- in *common.ID,
-) (*openflow_13.MeterStatsReply, error) {
- log.Debug("GetMeterStatsOfLogicalDevice-stub")
- return nil, nil
+func (handler *APIHandler) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
+
+ log.Debugw("ListLogicalDeviceMeters", log.Fields{"id": *id})
+ if handler.competeForTransaction() {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+ return nil, err // TODO: Return empty meter entry
+ } else {
+ defer txn.Close()
+ }
+ }
+ return handler.logicalDeviceMgr.ListLogicalDeviceMeters(ctx, id.Id)
}
//@TODO useless stub, what should this actually do?
@@ -986,11 +990,25 @@
return successResp, nil
}
-//@TODO useless stub, what should this actually do?
-func (handler *APIHandler) UpdateLogicalDeviceMeterTable(
- ctx context.Context,
- in *openflow_13.MeterModUpdate,
-) (*empty.Empty, error) {
- log.Debug("UpdateLogicalDeviceMeterTable-stub")
- return nil, nil
+// This function sends meter mod request to logical device manager and waits for response
+func (handler *APIHandler) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
+ log.Debugw("UpdateLogicalDeviceMeterTable-request",
+ log.Fields{"meter": meter, "test": common.TestModeKeys_api_test.String()})
+ if isTestMode(ctx) {
+ out := new(empty.Empty)
+ return out, nil
+ }
+
+ if handler.competeForTransaction() {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: meter.Id}); err != nil {
+ return new(empty.Empty), err
+ } else {
+ defer txn.Close()
+ }
+ }
+
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.logicalDeviceMgr.updateMeterTable(ctx, meter.Id, meter.MeterMod, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 70349d8..49e1463 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -45,6 +45,7 @@
deviceGraph *graph.DeviceGraph
flowProxy *model.Proxy
groupProxy *model.Proxy
+ meterProxy *model.Proxy
ldProxy *model.Proxy
portProxies map[string]*model.Proxy
portProxiesLock sync.RWMutex
@@ -139,6 +140,10 @@
ctx,
fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
false)
+ agent.meterProxy = agent.clusterDataProxy.CreateProxy(
+ ctx,
+ fmt.Sprintf("/logical_devices/%s/meters", agent.logicalDeviceId),
+ false)
agent.groupProxy = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceId),
@@ -199,6 +204,18 @@
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
+func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters() (*ofp.Meters, error) {
+ log.Debug("ListLogicalDeviceMeters")
+ agent.lockLogicalDevice.RLock()
+ defer agent.lockLogicalDevice.RUnlock()
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ cMeters := (proto.Clone(lDevice.Meters)).(*ofp.Meters)
+ return cMeters, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups() (*ofp.FlowGroups, error) {
log.Debug("ListLogicalDeviceFlowGroups")
agent.lockLogicalDevice.RLock()
@@ -261,6 +278,16 @@
}
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.meterProxy.Update(updateCtx, "/", meters, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "failed-updating-logical-device-meters:%s", agent.logicalDeviceId)
+ }
+ return nil
+}
+
+//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
@@ -520,6 +547,222 @@
"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
}
+// updateMeterTable updates the meter table of that logical device
+func (agent *LogicalDeviceAgent) updateMeterTable(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+ log.Debug("updateMeterTable")
+ if meterMod == nil {
+ return nil
+ }
+ switch meterMod.GetCommand() {
+ case ofp.OfpMeterModCommand_OFPMC_ADD:
+ return agent.meterAdd(meterMod)
+ case ofp.OfpMeterModCommand_OFPMC_DELETE:
+ return agent.meterDelete(meterMod)
+ case ofp.OfpMeterModCommand_OFPMC_MODIFY:
+ return agent.meterModify(meterMod)
+ }
+ return status.Errorf(codes.Internal,
+ "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, meterMod.GetCommand())
+
+}
+
+func (agent *LogicalDeviceAgent) meterAdd(meterMod *ofp.OfpMeterMod) error {
+ log.Debugw("meterAdd", log.Fields{"metermod": *meterMod})
+ if meterMod == nil {
+ return nil
+ }
+ log.Debug("Waiting for logical device lock!!")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ log.Debug("Acquired logical device lock")
+ var lDevice *voltha.LogicalDevice
+ var err error
+ if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+ log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+ }
+
+ var meters []*ofp.OfpMeterEntry
+ if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+ meters = lDevice.Meters.Items
+ }
+ log.Debugw("Available meters", log.Fields{"meters": meters})
+
+ for _, meter := range meters {
+ if meterMod.MeterId == meter.Config.MeterId {
+ log.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
+ return nil
+ }
+ }
+
+ meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+ meters = append(meters, meterEntry)
+ //Update model
+ if err := agent.updateLogicalDeviceMetersWithoutLock(&ofp.Meters{Items: meters}); err != nil {
+ log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return err
+ }
+ log.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry, "updated-meters": lDevice.Meters})
+ return nil
+}
+
+func (agent *LogicalDeviceAgent) meterDelete(meterMod *ofp.OfpMeterMod) error {
+ log.Debug("meterDelete", log.Fields{"meterMod": *meterMod})
+ if meterMod == nil {
+ return nil
+ }
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+
+ var lDevice *voltha.LogicalDevice
+ var err error
+ if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+ log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+ }
+
+ var meters []*ofp.OfpMeterEntry
+ var flows []*ofp.OfpFlowStats
+ updatedFlows := make([]*ofp.OfpFlowStats, 0)
+ if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+ meters = lDevice.Meters.Items
+ }
+ if lDevice.Flows != nil && lDevice.Flows.Items != nil {
+ flows = lDevice.Flows.Items
+ }
+
+ changedMeter := false
+ changedFow := false
+ log.Debugw("Available meters", log.Fields{"meters": meters})
+ for index, meter := range meters {
+ if meterMod.MeterId == meter.Config.MeterId {
+ flows = lDevice.Flows.Items
+ changedFow, updatedFlows = agent.getUpdatedFlowsAfterDeletebyMeterId(flows, meterMod.MeterId)
+ meters = append(meters[:index], meters[index+1:]...)
+ log.Debugw("Meter has been deleted", log.Fields{"meter": meter, "index": index})
+ changedMeter = true
+ break
+ }
+ }
+ if changedMeter {
+ //Update model
+ metersToUpdate := &ofp.Meters{}
+ if lDevice.Meters != nil {
+ metersToUpdate = &ofp.Meters{Items: meters}
+ }
+ if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+ log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return err
+ }
+ log.Debug("Meter-deleted-from-DB-successfully", log.Fields{"updatedMeters": metersToUpdate, "no-of-meter": len(metersToUpdate.Items)})
+
+ }
+ if changedFow {
+ //Update model
+ if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: updatedFlows}); err != nil {
+ log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return err
+ }
+ log.Debug("Flows-associated-with-meter-deleted-from-DB-successfully",
+ log.Fields{"updated-no-of-flows": len(updatedFlows), "meter": meterMod.MeterId})
+ }
+ log.Debugw("meterDelete success", log.Fields{"meterID": meterMod.MeterId})
+ return nil
+}
+
+func (agent *LogicalDeviceAgent) meterModify(meterMod *ofp.OfpMeterMod) error {
+ log.Debug("meterModify")
+ if meterMod == nil {
+ return nil
+ }
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+
+ var lDevice *voltha.LogicalDevice
+ var err error
+ if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+ log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+ }
+
+ var meters []*ofp.OfpMeterEntry
+ if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+ meters = lDevice.Meters.Items
+ }
+ changedMeter := false
+ for index, meter := range meters {
+ if meterMod.MeterId == meter.Config.MeterId {
+ newmeterEntry := fu.MeterEntryFromMeterMod(meterMod)
+ newmeterEntry.Stats.FlowCount = meter.Stats.FlowCount
+ meters[index] = newmeterEntry
+ changedMeter = true
+ log.Debugw("Found meter, replaced with new meter", log.Fields{"old meter": meter, "new meter": newmeterEntry})
+ break
+ }
+ }
+ if changedMeter {
+ //Update model
+ metersToUpdate := &ofp.Meters{}
+ if lDevice.Meters != nil {
+ metersToUpdate = &ofp.Meters{Items: meters}
+ }
+ if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+ log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return err
+ }
+ log.Debugw("meter-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
+ return nil
+ }
+
+ log.Errorw("Meter not found ", log.Fields{"meter": meterMod})
+ return errors.New(fmt.Sprintf("no-logical-device-present:%d", meterMod.MeterId))
+
+}
+
+func (agent *LogicalDeviceAgent) getUpdatedFlowsAfterDeletebyMeterId(flows []*ofp.OfpFlowStats, meterId uint32) (bool, []*ofp.OfpFlowStats) {
+ log.Infow("Delete flows matching meter", log.Fields{"meter": meterId})
+ changed := false
+ //updatedFlows := make([]*ofp.OfpFlowStats, 0)
+ for index := len(flows) - 1; index >= 0; index-- {
+ if mId := fu.GetMeterIdFromFlow(flows[index]); mId != 0 && mId == meterId {
+ log.Debugw("Flow to be deleted", log.Fields{"flow": flows[index], "index": index})
+ flows = append(flows[:index], flows[index+1:]...)
+ changed = true
+ }
+ }
+ return changed, flows
+}
+
+func (agent *LogicalDeviceAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats) bool {
+
+ flowCommand := modCommand.GetCommand()
+ meterId := fu.GetMeterIdFromFlow(flow)
+ log.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterId})
+ if meterId == 0 {
+ log.Debugw("No meter present in the flow", log.Fields{"flow": *flow})
+ return false
+ }
+ if meters == nil {
+ log.Debug("No meters present in logical device")
+ return false
+ }
+ changedMeter := false
+ for _, meter := range meters {
+ if meterId == meter.Config.MeterId { // Found meter in Logicaldevice
+ if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
+ meter.Stats.FlowCount += 1
+ changedMeter = true
+ } else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
+ meter.Stats.FlowCount -= 1
+ changedMeter = true
+ }
+ log.Debugw("Found meter, updated meter flow stats", log.Fields{" meterId": meterId})
+ break
+ }
+ }
+ return changedMeter
+}
+
//flowAdd adds a flow to the flow table of that logical device
func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
log.Debug("flowAdd")
@@ -537,12 +780,19 @@
}
var flows []*ofp.OfpFlowStats
+ var meters []*ofp.OfpMeterEntry
+ var flow *ofp.OfpFlowStats
+
if lDevice.Flows != nil && lDevice.Flows.Items != nil {
flows = lDevice.Flows.Items
}
+ if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+ meters = lDevice.Meters.Items
+ }
updatedFlows := make([]*ofp.OfpFlowStats, 0)
changed := false
+ updated := false
checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
if checkOverlap {
if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
@@ -550,13 +800,13 @@
log.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
} else {
// Add flow
- flow := fu.FlowStatsEntryFromFlowModMessage(mod)
+ flow = fu.FlowStatsEntryFromFlowModMessage(mod)
flows = append(flows, flow)
updatedFlows = append(updatedFlows, flow)
changed = true
}
} else {
- flow := fu.FlowStatsEntryFromFlowModMessage(mod)
+ flow = fu.FlowStatsEntryFromFlowModMessage(mod)
idx := fu.FindFlows(flows, flow)
if idx >= 0 {
oldFlow := flows[idx]
@@ -568,6 +818,7 @@
flows[idx] = flow
updatedFlows = append(updatedFlows, flow)
changed = true
+ updated = true
}
} else {
flows = append(flows, flow)
@@ -576,10 +827,15 @@
}
}
if changed {
+ var flowMetadata voltha.FlowMetadata
+ if err := agent.GetMeterConfig(updatedFlows, meters, &flowMetadata); err != nil { // This should never happen,meters should be installed before flow arrives
+ log.Error("Meter-referred-in-flows-not-present")
+ return err
+ }
deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+ if err := agent.addDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
return err
}
@@ -589,10 +845,55 @@
log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return err
}
+ if !updated {
+ changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, flow)
+ metersToUpdate := &ofp.Meters{}
+ if lDevice.Meters != nil {
+ metersToUpdate = &ofp.Meters{Items: meters}
+ }
+ if changedMeterStats {
+ //Update model
+ if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+ log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return err
+ }
+ log.Debugw("meter-stats-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
+
+ }
+ }
+
}
return nil
}
+func (agent *LogicalDeviceAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
+ m := make(map[uint32]bool)
+ for _, flow := range flows {
+ if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 && m[flowMeterID] == false {
+ foundMeter := false
+ // Meter is present in the flow , Get from logical device
+ for _, meter := range meters {
+ if flowMeterID == meter.Config.MeterId {
+ metadata.Meters = append(metadata.Meters, meter.Config)
+ log.Debugw("Found meter in logical device",
+ log.Fields{"meterID": flowMeterID, "meter-band": meter.Config})
+ m[flowMeterID] = true
+ foundMeter = true
+ break
+ }
+ }
+ if !foundMeter {
+ log.Errorw("Meter-referred-by-flow-is-not-found-in-logicaldevice",
+ log.Fields{"meterID": flowMeterID, "Avaliable-meters": meters, "flow": *flow})
+ return errors.New("Meter-referred-by-flow-is-not-found-in-logicaldevice")
+ }
+ }
+ }
+ log.Debugw("meter-bands-for-flows", log.Fields{"flows": len(flows), "metadata": metadata})
+ return nil
+
+}
+
//flowDelete deletes a flow from the flow table of that logical device
func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
log.Debug("flowDelete")
@@ -608,8 +909,17 @@
log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
}
- flows := lDevice.Flows.Items
+ var meters []*ofp.OfpMeterEntry
+ var flows []*ofp.OfpFlowStats
+
+ if lDevice.Flows != nil && lDevice.Flows.Items != nil {
+ flows = lDevice.Flows.Items
+ }
+
+ if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+ meters = lDevice.Meters.Items
+ }
//build a list of what to keep vs what to delete
toKeep := make([]*ofp.OfpFlowStats, 0)
toDelete := make([]*ofp.OfpFlowStats, 0)
@@ -631,10 +941,15 @@
//Update flows
if len(toDelete) > 0 {
+ var flowMetadata voltha.FlowMetadata
+ if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
+ log.Error("Meter-referred-in-flows-not-present")
+ return errors.New("Meter-referred-in-flows-not-present")
+ }
deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{})
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+ if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
return err
}
@@ -649,15 +964,15 @@
return nil
}
-func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
- log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
+ log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
chnlsList := make([]chan interface{}, 0)
for deviceId, value := range deviceRules.GetRules() {
ch := make(chan interface{})
chnlsList = append(chnlsList, ch)
go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
+ if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
ch <- status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId)
}
@@ -671,7 +986,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
chnlsList := make([]chan interface{}, 0)
@@ -679,7 +994,7 @@
ch := make(chan interface{})
chnlsList = append(chnlsList, ch)
go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups); err != nil {
+ if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
ch <- status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId)
}
@@ -693,7 +1008,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
chnlsList := make([]chan interface{}, 0)
@@ -701,7 +1016,7 @@
ch := make(chan interface{})
chnlsList = append(chnlsList, ch)
go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups); err != nil {
+ if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups, flowMetadata); err != nil {
log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
}
@@ -730,22 +1045,50 @@
log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
}
- flows := lDevice.Flows.Items
- changed := false
+ var meters []*ofp.OfpMeterEntry
+ var flows []*ofp.OfpFlowStats
+ if lDevice.Meters != nil && lDevice.Meters.Items != nil {
+ meters = lDevice.Meters.Items
+ }
+ if lDevice.Flows != nil && lDevice.Flows.Items != nil {
+ flows = lDevice.Flows.Items
+ }
+
+ changedFlow := false
+ changedMeter := false
flow := fu.FlowStatsEntryFromFlowModMessage(mod)
+ flowsToDelete := make([]*ofp.OfpFlowStats, 0)
idx := fu.FindFlows(flows, flow)
if idx >= 0 {
+ changedMeter = agent.updateFlowCountOfMeterStats(mod, meters, flow)
+ flowsToDelete = append(flowsToDelete, flows[idx])
flows = append(flows[:idx], flows[idx+1:]...)
- changed = true
+ changedFlow = true
} else {
return errors.New(fmt.Sprintf("Cannot delete flow - %s", flow))
}
+ if changedMeter {
+ //Update model
+ metersToUpdate := &ofp.Meters{}
+ if lDevice.Meters != nil {
+ metersToUpdate = &ofp.Meters{Items: meters}
+ }
+ if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+ log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return err
+ }
- if changed {
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: []*ofp.OfpFlowStats{flow}}, ofp.FlowGroups{})
+ }
+ if changedFlow {
+ var flowMetadata voltha.FlowMetadata
+ if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
+ log.Error("Meter-referred-in-flows-not-present")
+ return err
+ }
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{})
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+ if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
return err
}
@@ -788,8 +1131,7 @@
deviceRules := agent.flowDecomposer.DecomposeRules(agent, *lDevice.Flows, ofp.FlowGroups{Items: groups})
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
- if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+ if err := agent.addDeviceFlowsAndGroups(deviceRules, nil); err != nil {
log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
return err
}
@@ -841,7 +1183,7 @@
deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+ if err := agent.updateDeviceFlowsAndGroups(deviceRules, nil); err != nil {
log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
return err
}
@@ -891,7 +1233,7 @@
deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: lDevice.Flows.Items}, ofp.FlowGroups{Items: groups})
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+ if err := agent.updateDeviceFlowsAndGroups(deviceRules, nil); err != nil {
log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
return err
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index a097736..71843ff 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -476,6 +476,25 @@
sendAPIResponse(ctx, ch, res)
}
+func (ldMgr *LogicalDeviceManager) updateMeterTable(ctx context.Context, id string, meter *openflow_13.OfpMeterMod, ch chan interface{}) {
+ log.Debugw("updateMeterTable", log.Fields{"logicalDeviceId": id})
+ var res interface{}
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ res = agent.updateMeterTable(ctx, meter)
+ log.Debugw("updateMeterTable-result", log.Fields{"result": res})
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", id)
+ }
+ sendAPIResponse(ctx, ch, res)
+}
+
+func (ldMgr *LogicalDeviceManager) ListLogicalDeviceMeters(ctx context.Context, id string) (*openflow_13.Meters, error) {
+ log.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.ListLogicalDeviceMeters()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
+}
func (ldMgr *LogicalDeviceManager) updateGroupTable(ctx context.Context, id string, groupMod *openflow_13.OfpGroupMod, ch chan interface{}) {
log.Debugw("updateGroupTable", log.Fields{"logicalDeviceId": id})
var res interface{}