[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 72b915f..85b78ef 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -24,12 +24,13 @@
"time"
"github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/route"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
- fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ fu "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -104,7 +105,7 @@
var flowToReplace *ofp.OfpFlowStats
//if flow is not found in the map, create a new entry, otherwise get the existing one.
- flowHandle, created, err := agent.flowCache.LockOrCreate(ctx, flow)
+ flowHandle, flowCreated, err := agent.flowCache.LockOrCreate(ctx, flow)
if err != nil {
return changed, updated, err
}
@@ -122,7 +123,7 @@
changed = true
}
} else {
- if !created {
+ if !flowCreated {
flowToReplace = flowHandle.GetReadOnly()
if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
flow.ByteCount = flowToReplace.ByteCount
@@ -151,9 +152,30 @@
deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, updatedFlows, groups)
if err != nil {
+ if flowCreated {
+ if er := flowHandle.Delete(ctx); er != nil {
+ logger.Errorw(ctx, "deleting-flow-from-cache-failed", log.Fields{"error": er, "flow-id": flow.Id})
+ }
+ }
return changed, updated, err
}
+ // Verify whether the flow request can proceed, usually to multiple adapters
+ // This is an optimization to address the case where a decomposed set of flows need to
+ // be sent to multiple adapters. One or more adapters may not be ready at this time.
+ // If one adapter is not ready this will result in flows being reverted from the
+ // other adapters, at times continuously as the OF controller will keep sending the
+ // flows until they are successfully added.
+ if err := agent.deviceMgr.canMultipleAdapterRequestProceed(ctx, deviceRules.Keys()); err != nil {
+ logger.Warnw(ctx, "adapters-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "flow-id": flow.Id, "error": err})
+ if flowCreated {
+ if er := flowHandle.Delete(ctx); er != nil {
+ logger.Errorw(ctx, "deleting-flow-from-cache-failed", log.Fields{"error": er, "flow-id": flow.Id})
+ }
+ }
+ return false, false, err
+ }
+
logger.Debugw(ctx, "rules", log.Fields{"rules": deviceRules.String()})
// Update store and cache
if updated {
@@ -166,7 +188,7 @@
// Create the go routines to wait
go func() {
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChannels...); res != nil {
logger.Errorw(ctx, "failed-to-add-flow-will-attempt-deletion", log.Fields{
"errors": res,
"logical-device-id": agent.logicalDeviceID,
@@ -237,7 +259,7 @@
// Wait for the responses
go func() {
// Since this action is taken following an add failure, we may also receive a failure for the revert
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-reverting-added-flows", log.Fields{
"logical-device-id": agent.logicalDeviceID,
"flow-cookie": mod.Cookie,
@@ -327,6 +349,18 @@
partialRoute = true
}
+ var devicesInFlows []string
+ if deviceRules != nil {
+ devicesInFlows = deviceRules.Keys()
+ } else {
+ devicesInFlows = []string{agent.rootDeviceID}
+ }
+
+ if err := agent.deviceMgr.canMultipleAdapterRequestProceed(ctx, devicesInFlows); err != nil {
+ logger.Warnw(ctx, "adapters-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "flow": toDelete, "error": err})
+ return err
+ }
+
// Update the devices
if partialRoute {
respChnls = agent.deleteFlowsFromParentDevice(ctx, toDelete, mod)
@@ -337,7 +371,7 @@
// Wait for the responses
go func() {
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChnls...); res != nil {
logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
context := make(map[string]string)
context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
@@ -363,6 +397,9 @@
//flowDeleteStrict deletes a flow from the flow table of that logical device
func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
+ var flowHandle *flow.Handle
+ var have bool
+
mod := flowUpdate.FlowMod
logger.Debugw(ctx, "flow-delete-strict", log.Fields{"mod": mod})
if mod == nil {
@@ -373,13 +410,18 @@
if err != nil {
return err
}
+
+ defer func() {
+ if flowHandle != nil {
+ flowHandle.Unlock()
+ }
+ }()
+
logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flow-id": flow.Id})
- flowHandle, have := agent.flowCache.Lock(flow.Id)
+ flowHandle, have = agent.flowCache.Lock(flow.Id)
if !have {
- logger.Debugw(ctx, "skipping-flow-delete-strict-request-no-flow-found", log.Fields{"flow-mod": mod})
- return nil
+ logger.Debugw(ctx, "flow-delete-strict-request-no-flow-found-continuing", log.Fields{"flow-mod": mod})
}
- defer flowHandle.Unlock()
groups := make(map[uint32]*ofp.OfpGroupEntry)
for groupID := range agent.groupCache.ListIDs() {
@@ -389,12 +431,11 @@
}
}
- if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
- return fmt.Errorf("Cannot delete flow - %s. Meter update failed", flow)
+ flowsToDelete := map[uint64]*ofp.OfpFlowStats{flow.Id: flow}
+ if flowHandle != nil {
+ flowsToDelete = map[uint64]*ofp.OfpFlowStats{flow.Id: flowHandle.GetReadOnly()}
}
- flowsToDelete := map[uint64]*ofp.OfpFlowStats{flow.Id: flowHandle.GetReadOnly()}
-
var respChnls []coreutils.Response
var partialRoute bool
deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, flowsToDelete, groups)
@@ -408,10 +449,18 @@
partialRoute = true
}
- // Update the model
- if err := flowHandle.Delete(ctx); err != nil {
+ var devicesInFlows []string
+ if deviceRules != nil {
+ devicesInFlows = deviceRules.Keys()
+ } else {
+ devicesInFlows = []string{agent.rootDeviceID}
+ }
+
+ if err := agent.deviceMgr.canMultipleAdapterRequestProceed(ctx, devicesInFlows); err != nil {
+ logger.Warnw(ctx, "adapters-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "flow": flowsToDelete, "error": err})
return err
}
+
// Update the devices
if partialRoute {
respChnls = agent.deleteFlowsFromParentDevice(ctx, flowsToDelete, mod)
@@ -420,30 +469,39 @@
}
// Wait for completion
- go func() {
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Warnw(ctx, "failure-deleting-device-flows", log.Fields{
- "flow-cookie": mod.Cookie,
- "logical-device-id": agent.logicalDeviceID,
- "errors": res,
- })
- // TODO: Revert flow changes
- // send event, and allow any queued events to be sent as well
- agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
- context := make(map[string]string)
- context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
- context["flow-id"] = fmt.Sprintf("%v", flow.Id)
- context["flow-cookie"] = fmt.Sprintf("%v", flowUpdate.FlowMod.Cookie)
- context["logical-device-id"] = agent.logicalDeviceID
- if deviceRules != nil {
- context["device-rules"] = deviceRules.String()
- }
- // Create context and send extra information as part of it.
- agent.ldeviceMgr.SendRPCEvent(ctx,
- agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
- voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
+ if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChnls...); res != nil {
+ logger.Warnw(ctx, "failure-deleting-device-flows", log.Fields{
+ "flow-cookie": mod.Cookie,
+ "logical-device-id": agent.logicalDeviceID,
+ "errors": res,
+ })
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["flow-id"] = fmt.Sprintf("%v", flow.Id)
+ context["flow-cookie"] = fmt.Sprintf("%v", flowUpdate.FlowMod.Cookie)
+ context["logical-device-id"] = agent.logicalDeviceID
+ if deviceRules != nil {
+ context["device-rules"] = deviceRules.String()
}
- }()
+ // Create context and send extra information as part of it.
+ agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
+
+ return status.Errorf(codes.Aborted, "failed deleting flows id:%d, errors:%v", flow.Id, res)
+ }
+
+ // Update meter count
+ if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
+ return fmt.Errorf("cannot delete flow - %s. Meter update failed", flow)
+ }
+
+ // Update the model
+ if flowHandle != nil {
+ if err := flowHandle.Delete(ctx); err != nil {
+ return err
+ }
+ }
return nil
}