[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 72b915f..85b78ef 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -24,12 +24,13 @@
 	"time"
 
 	"github.com/gogo/protobuf/proto"
+	"github.com/opencord/voltha-go/rw_core/core/device/flow"
 	"github.com/opencord/voltha-go/rw_core/route"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
-	fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	fu "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -104,7 +105,7 @@
 	var flowToReplace *ofp.OfpFlowStats
 
 	//if flow is not found in the map, create a new entry, otherwise get the existing one.
-	flowHandle, created, err := agent.flowCache.LockOrCreate(ctx, flow)
+	flowHandle, flowCreated, err := agent.flowCache.LockOrCreate(ctx, flow)
 	if err != nil {
 		return changed, updated, err
 	}
@@ -122,7 +123,7 @@
 			changed = true
 		}
 	} else {
-		if !created {
+		if !flowCreated {
 			flowToReplace = flowHandle.GetReadOnly()
 			if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
 				flow.ByteCount = flowToReplace.ByteCount
@@ -151,9 +152,30 @@
 
 		deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, updatedFlows, groups)
 		if err != nil {
+			if flowCreated {
+				if er := flowHandle.Delete(ctx); er != nil {
+					logger.Errorw(ctx, "deleting-flow-from-cache-failed", log.Fields{"error": er, "flow-id": flow.Id})
+				}
+			}
 			return changed, updated, err
 		}
 
+		// Verify whether the flow request can proceed, usually to multiple adapters
+		// This is an optimization to address the case where a decomposed set of flows need to
+		// be sent to multiple adapters.  One or more adapters may not be ready at this time.
+		// If one adapter is not ready this will result in flows being reverted from the
+		// other adapters, at times continuously as the OF controller will keep sending the
+		// flows until they are successfully added.
+		if err := agent.deviceMgr.canMultipleAdapterRequestProceed(ctx, deviceRules.Keys()); err != nil {
+			logger.Warnw(ctx, "adapters-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "flow-id": flow.Id, "error": err})
+			if flowCreated {
+				if er := flowHandle.Delete(ctx); er != nil {
+					logger.Errorw(ctx, "deleting-flow-from-cache-failed", log.Fields{"error": er, "flow-id": flow.Id})
+				}
+			}
+			return false, false, err
+		}
+
 		logger.Debugw(ctx, "rules", log.Fields{"rules": deviceRules.String()})
 		//	Update store and cache
 		if updated {
@@ -166,7 +188,7 @@
 		// Create the go routines to wait
 		go func() {
 			// Wait for completion
-			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChannels...); res != nil {
 				logger.Errorw(ctx, "failed-to-add-flow-will-attempt-deletion", log.Fields{
 					"errors":            res,
 					"logical-device-id": agent.logicalDeviceID,
@@ -237,7 +259,7 @@
 	// Wait for the responses
 	go func() {
 		// Since this action is taken following an add failure, we may also receive a failure for the revert
-		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+		if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChnls...); res != nil {
 			logger.Warnw(ctx, "failure-reverting-added-flows", log.Fields{
 				"logical-device-id": agent.logicalDeviceID,
 				"flow-cookie":       mod.Cookie,
@@ -327,6 +349,18 @@
 			partialRoute = true
 		}
 
+		var devicesInFlows []string
+		if deviceRules != nil {
+			devicesInFlows = deviceRules.Keys()
+		} else {
+			devicesInFlows = []string{agent.rootDeviceID}
+		}
+
+		if err := agent.deviceMgr.canMultipleAdapterRequestProceed(ctx, devicesInFlows); err != nil {
+			logger.Warnw(ctx, "adapters-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "flow": toDelete, "error": err})
+			return err
+		}
+
 		// Update the devices
 		if partialRoute {
 			respChnls = agent.deleteFlowsFromParentDevice(ctx, toDelete, mod)
@@ -337,7 +371,7 @@
 		// Wait for the responses
 		go func() {
 			// Wait for completion
-			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+			if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChnls...); res != nil {
 				logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
 				context := make(map[string]string)
 				context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
@@ -363,6 +397,9 @@
 
 //flowDeleteStrict deletes a flow from the flow table of that logical device
 func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
+	var flowHandle *flow.Handle
+	var have bool
+
 	mod := flowUpdate.FlowMod
 	logger.Debugw(ctx, "flow-delete-strict", log.Fields{"mod": mod})
 	if mod == nil {
@@ -373,13 +410,18 @@
 	if err != nil {
 		return err
 	}
+
+	defer func() {
+		if flowHandle != nil {
+			flowHandle.Unlock()
+		}
+	}()
+
 	logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flow-id": flow.Id})
-	flowHandle, have := agent.flowCache.Lock(flow.Id)
+	flowHandle, have = agent.flowCache.Lock(flow.Id)
 	if !have {
-		logger.Debugw(ctx, "skipping-flow-delete-strict-request-no-flow-found", log.Fields{"flow-mod": mod})
-		return nil
+		logger.Debugw(ctx, "flow-delete-strict-request-no-flow-found-continuing", log.Fields{"flow-mod": mod})
 	}
-	defer flowHandle.Unlock()
 
 	groups := make(map[uint32]*ofp.OfpGroupEntry)
 	for groupID := range agent.groupCache.ListIDs() {
@@ -389,12 +431,11 @@
 		}
 	}
 
-	if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
-		return fmt.Errorf("Cannot delete flow - %s. Meter update failed", flow)
+	flowsToDelete := map[uint64]*ofp.OfpFlowStats{flow.Id: flow}
+	if flowHandle != nil {
+		flowsToDelete = map[uint64]*ofp.OfpFlowStats{flow.Id: flowHandle.GetReadOnly()}
 	}
 
-	flowsToDelete := map[uint64]*ofp.OfpFlowStats{flow.Id: flowHandle.GetReadOnly()}
-
 	var respChnls []coreutils.Response
 	var partialRoute bool
 	deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, flowsToDelete, groups)
@@ -408,10 +449,18 @@
 		partialRoute = true
 	}
 
-	// Update the model
-	if err := flowHandle.Delete(ctx); err != nil {
+	var devicesInFlows []string
+	if deviceRules != nil {
+		devicesInFlows = deviceRules.Keys()
+	} else {
+		devicesInFlows = []string{agent.rootDeviceID}
+	}
+
+	if err := agent.deviceMgr.canMultipleAdapterRequestProceed(ctx, devicesInFlows); err != nil {
+		logger.Warnw(ctx, "adapters-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "flow": flowsToDelete, "error": err})
 		return err
 	}
+
 	// Update the devices
 	if partialRoute {
 		respChnls = agent.deleteFlowsFromParentDevice(ctx, flowsToDelete, mod)
@@ -420,30 +469,39 @@
 	}
 
 	// Wait for completion
-	go func() {
-		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
-			logger.Warnw(ctx, "failure-deleting-device-flows", log.Fields{
-				"flow-cookie":       mod.Cookie,
-				"logical-device-id": agent.logicalDeviceID,
-				"errors":            res,
-			})
-			// TODO: Revert flow changes
-			// send event, and allow any queued events to be sent as well
-			agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
-			context := make(map[string]string)
-			context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
-			context["flow-id"] = fmt.Sprintf("%v", flow.Id)
-			context["flow-cookie"] = fmt.Sprintf("%v", flowUpdate.FlowMod.Cookie)
-			context["logical-device-id"] = agent.logicalDeviceID
-			if deviceRules != nil {
-				context["device-rules"] = deviceRules.String()
-			}
-			// Create context and send extra information as part of it.
-			agent.ldeviceMgr.SendRPCEvent(ctx,
-				agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
+	if res := coreutils.WaitForNilOrErrorResponses(agent.internalTimeout, respChnls...); res != nil {
+		logger.Warnw(ctx, "failure-deleting-device-flows", log.Fields{
+			"flow-cookie":       mod.Cookie,
+			"logical-device-id": agent.logicalDeviceID,
+			"errors":            res,
+		})
+		context := make(map[string]string)
+		context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+		context["flow-id"] = fmt.Sprintf("%v", flow.Id)
+		context["flow-cookie"] = fmt.Sprintf("%v", flowUpdate.FlowMod.Cookie)
+		context["logical-device-id"] = agent.logicalDeviceID
+		if deviceRules != nil {
+			context["device-rules"] = deviceRules.String()
 		}
-	}()
+		// Create context and send extra information as part of it.
+		agent.ldeviceMgr.SendRPCEvent(ctx,
+			agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+			voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
+
+		return status.Errorf(codes.Aborted, "failed deleting flows id:%d, errors:%v", flow.Id, res)
+	}
+
+	// Update meter count
+	if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
+		return fmt.Errorf("cannot delete flow - %s. Meter update failed", flow)
+	}
+
+	// Update the model
+	if flowHandle != nil {
+		if err := flowHandle.Delete(ctx); err != nil {
+			return err
+		}
+	}
 
 	return nil
 }