[VOL-4183] Sending the right flowMetadata value to the adapters
Change-Id: Id720bc982c2f44e72c857e53f10521d8d3d1b3f8
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 9bf6361..a12bd2d 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -225,8 +225,8 @@
return agent.logicalDevice, nil
}
-func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
- logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-rules": deviceRules, "flow-metadata": flowMetadata})
+func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules) []coreutils.Response {
+ logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-rules": deviceRules})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
@@ -235,10 +235,16 @@
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
defer cancel()
+
+ flowMeterConfig, err := agent.GetMeterConfig(ctx, value.ListFlows())
+ if err != nil {
+ logger.Error(ctx, "meter-referred-in-flow-not-present")
+ response.Error(status.Errorf(codes.NotFound, "meter-referred-in-flow-not-present"))
+ return
+ }
start := time.Now()
- if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), toMetadata(flowMeterConfig)); err != nil {
logger.Errorw(ctx, "flow-add-failed", log.Fields{
"device-id": deviceId,
"error": err,
@@ -255,7 +261,7 @@
return responses
}
-func (agent *LogicalAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata, mod *ofp.OfpFlowMod) []coreutils.Response {
+func (agent *LogicalAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, mod *ofp.OfpFlowMod) []coreutils.Response {
logger.Debugw(ctx, "send-delete-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
@@ -265,10 +271,16 @@
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
defer cancel()
+
+ flowMeterConfig, err := agent.GetMeterConfig(ctx, value.ListFlows())
+ if err != nil {
+ logger.Error(ctx, "meter-referred-in-flow-not-present")
+ response.Error(status.Errorf(codes.NotFound, "meter-referred-in-flow-not-present"))
+ return
+ }
start := time.Now()
- if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), toMetadata(flowMeterConfig)); err != nil {
logger.Errorw(ctx, "flows-and-groups-delete-failed", log.Fields{
"device-id": deviceId,
"error": err,
@@ -305,12 +317,19 @@
return responses
}
-func (agent *LogicalAgent) deleteFlowsFromParentDevice(ctx context.Context, flows map[uint64]*ofp.OfpFlowStats, metadata *voltha.FlowMetadata, mod *ofp.OfpFlowMod) []coreutils.Response {
+func (agent *LogicalAgent) deleteFlowsFromParentDevice(ctx context.Context, flows map[uint64]*ofp.OfpFlowStats, mod *ofp.OfpFlowMod) []coreutils.Response {
logger.Debugw(ctx, "deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
responses := make([]coreutils.Response, 0)
for _, flow := range flows {
response := coreutils.NewResponse()
responses = append(responses, response)
+
+ flowMeterConfig, err := agent.GetMeterConfig(ctx, []*ofp.OfpFlowStats{flow})
+ if err != nil {
+ logger.Error(ctx, "meter-referred-in-flow-not-present")
+ response.Error(status.Errorf(codes.NotFound, "meter-referred-in-flow-not-present"))
+ return responses
+ }
uniPort, err := agent.getUNILogicalPortNo(flow)
if err != nil {
logger.Error(ctx, "no-uni-port-in-flow", log.Fields{"device-id": agent.rootDeviceID, "flow": flow, "error": err})
@@ -333,7 +352,7 @@
response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s %v", agent.rootDeviceID, err))
}
response.Done()
- }(uniPort, metadata)
+ }(uniPort, toMetadata(flowMeterConfig))
}
return responses
}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 31a8b69..f4c88fa 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -140,12 +140,6 @@
if changed {
updatedFlows := map[uint64]*ofp.OfpFlowStats{flow.Id: flow}
- flowMeterConfig, err := agent.GetMeterConfig(ctx, updatedFlows)
- if err != nil {
- logger.Error(ctx, "meter-referred-in-flow-not-present")
- return changed, updated, err
- }
-
groupIDs := agent.groupCache.ListIDs()
groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
for groupID := range groupIDs {
@@ -167,7 +161,7 @@
return changed, updated, err
}
}
- respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, toMetadata(flowMeterConfig))
+ respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules)
// Create the go routines to wait
go func() {
@@ -182,7 +176,7 @@
subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
// Revert added flows
- if err := agent.revertAddedFlows(subCtx, mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
+ if err := agent.revertAddedFlows(subCtx, mod, flow, flowToReplace, deviceRules); err != nil {
logger.Errorw(ctx, "failure-to-delete-flow-after-failed-addition", log.Fields{
"error": err,
"logical-device-id": agent.logicalDeviceID,
@@ -211,8 +205,8 @@
// revertAddedFlows reverts flows after the flowAdd request has failed. All flows corresponding to that flowAdd request
// will be reverted, both from the logical devices and the devices.
-func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "revert-flow-add", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
+func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules) error {
+ logger.Debugw(ctx, "revert-flow-add", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules})
flowHandle, have := agent.flowCache.Lock(addedFlow.Id)
if !have {
@@ -238,7 +232,7 @@
}
// Update the devices
- respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, metadata, mod)
+ respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, mod)
// Wait for the responses
go func() {
@@ -311,12 +305,6 @@
}
}
- metersConfig, err := agent.GetMeterConfig(ctx, toDelete)
- if err != nil { // This should never happen
- logger.Error(ctx, "meter-referred-in-flows-not-present")
- return err
- }
-
groups := make(map[uint32]*ofp.OfpGroupEntry)
for groupID := range agent.groupCache.ListIDs() {
if groupHandle, have := agent.groupCache.Lock(groupID); have {
@@ -341,9 +329,9 @@
// Update the devices
if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ctx, toDelete, toMetadata(metersConfig), mod)
+ respChnls = agent.deleteFlowsFromParentDevice(ctx, toDelete, mod)
} else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, toMetadata(metersConfig), mod)
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, mod)
}
// Wait for the responses
@@ -407,11 +395,6 @@
flowsToDelete := map[uint64]*ofp.OfpFlowStats{flow.Id: flowHandle.GetReadOnly()}
- flowMetadata, err := agent.GetMeterConfig(ctx, flowsToDelete)
- if err != nil {
- logger.Error(ctx, "meter-referred-in-flows-not-present")
- return err
- }
var respChnls []coreutils.Response
var partialRoute bool
deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, flowsToDelete, groups)
@@ -431,9 +414,9 @@
}
// Update the devices
if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ctx, flowsToDelete, toMetadata(flowMetadata), mod)
+ respChnls = agent.deleteFlowsFromParentDevice(ctx, flowsToDelete, mod)
} else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, toMetadata(flowMetadata), mod)
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, mod)
}
// Wait for completion
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index d68ae60..c778aa5 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -87,7 +87,7 @@
logger.Debugw(ctx, "rules", log.Fields{"rules-for-group-add": deviceRules.String()})
// Update the devices
- respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
+ respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules)
// Wait for completion
go func() {
@@ -172,7 +172,7 @@
logger.Debugw(ctx, "rules", log.Fields{"rules": deviceRules.String()})
// delete groups and related flows, if any
- respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &voltha.FlowMetadata{}, &ofp.OfpFlowMod{})
+ respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &ofp.OfpFlowMod{})
// Wait for completion
go func() {
diff --git a/rw_core/core/device/logical_agent_meter_helpers.go b/rw_core/core/device/logical_agent_meter_helpers.go
index f18936d..bb9de0a 100644
--- a/rw_core/core/device/logical_agent_meter_helpers.go
+++ b/rw_core/core/device/logical_agent_meter_helpers.go
@@ -26,7 +26,7 @@
)
// GetMeterConfig returns meters which which are used by the given flows
-func (agent *LogicalAgent) GetMeterConfig(ctx context.Context, flows map[uint64]*ofp.OfpFlowStats) (map[uint32]*ofp.OfpMeterConfig, error) {
+func (agent *LogicalAgent) GetMeterConfig(ctx context.Context, flows []*ofp.OfpFlowStats) (map[uint32]*ofp.OfpMeterConfig, error) {
metersConfig := make(map[uint32]*ofp.OfpMeterConfig)
for _, flow := range flows {
if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 {