VOL-3121 - Separated out LogicalDevices' low-level flow/meter/group handling into separate packages.

The new implementation hides the compexity of locking, caching, and interacting with the db.
An attempt was made to ensure that locks are held while updates are made, by returning a "handle" object from each flow/group/meter lock() call, and only allowing access through this call.

An attempt was also made to remove proto.Clone-ing.  flows/groups/meters which are returned are NOT cloned, and MUST NOT be modified by users of the flow/group/meter loaders.  In addition, flows/groups/meters which are given to the loaders MUST NOT be modified afterward.

There remain many cases where errors during particular kv updates may cause inconsistent state.  TODOs have been added for many of these cases.  Resolving this may require exposing (through voltha-lib-go) the transaction mechanism from etcd.

There is also the issue that locking a flow/meter/group while another flow/meter/group is held could cause deadlocks.  This can be avoided by acquiring locks in a consistent order.  Something to keep in mind while fixing the previous issue.
Change-Id: I146eb319c3564635fdc461ec17be13e6f3968cf7
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 0e811e7..b06b0f7 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -32,6 +32,19 @@
 	"strconv"
 )
 
+// listLogicalDeviceFlows returns logical device flows
+func (agent *LogicalAgent) listLogicalDeviceFlows() map[uint64]*ofp.OfpFlowStats {
+	flowIDs := agent.flowLoader.List()
+	flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
+	for flowID := range flowIDs {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			flows[flowID] = flowHandle.GetReadOnly()
+			flowHandle.Unlock()
+		}
+	}
+	return flows
+}
+
 //updateFlowTable updates the flow table of that logical device
 func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
 	logger.Debug("UpdateFlowTable")
@@ -84,54 +97,29 @@
 func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
 	changed := false
 	updated := false
-	alreadyExist := true
 	var flowToReplace *ofp.OfpFlowStats
 
 	//if flow is not found in the map, create a new entry, otherwise get the existing one.
-	agent.flowLock.Lock()
-	flowChunk, ok := agent.flows[flow.Id]
-	if !ok {
-		flowChunk = &FlowChunk{
-			flow: flow,
-		}
-		agent.flows[flow.Id] = flowChunk
-		alreadyExist = false
-		flowChunk.lock.Lock() //acquire chunk lock before releasing map lock
-		defer flowChunk.lock.Unlock()
-		agent.flowLock.Unlock()
-	} else {
-		agent.flowLock.Unlock() //release map lock before acquiring chunk lock
-		flowChunk.lock.Lock()
-		defer flowChunk.lock.Unlock()
+	flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+	if err != nil {
+		return changed, updated, err
 	}
+	defer flowHandle.Unlock()
 
-	if !alreadyExist {
-		flowID := strconv.FormatUint(flow.Id, 10)
-		if err := agent.clusterDataProxy.AddWithID(ctx, "logical_flows/"+agent.logicalDeviceID, flowID, flow); err != nil {
-			logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": agent.logicalDeviceID, "flowID": flowID, "err": err})
-			//Revert the map
-			//TODO: Solve the condition:If we have two flow Adds of the same flow (at least same priority and match) in quick succession
-			//then if the first one fails while the second one was waiting on the flowchunk, we will end up with an instance of flowChunk that is no longer in the map.
-			agent.flowLock.Lock()
-			delete(agent.flows, flow.Id)
-			agent.flowLock.Unlock()
-			return changed, updated, err
-		}
-	}
 	flows := make([]*ofp.OfpFlowStats, 0)
-	updatedFlows := make([]*ofp.OfpFlowStats, 0)
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
 	if checkOverlap {
+		// TODO: this currently does nothing
 		if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
-			//	TODO:  should this error be notified other than being logged?
+			// TODO: should this error be notified other than being logged?
 			logger.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
 		} else {
 			//	Add flow
 			changed = true
 		}
 	} else {
-		if alreadyExist {
-			flowToReplace = flowChunk.flow
+		if !created {
+			flowToReplace = flowHandle.GetReadOnly()
 			if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
 				flow.ByteCount = flowToReplace.ByteCount
 				flow.PacketCount = flowToReplace.PacketCount
@@ -146,15 +134,24 @@
 	}
 	logger.Debugw("flowAdd-changed", log.Fields{"changed": changed, "updated": updated})
 	if changed {
-		updatedFlows = append(updatedFlows, flow)
-		var flowMetadata voltha.FlowMetadata
-		lMeters, _ := agent.ListLogicalDeviceMeters(ctx)
-		if err := agent.GetMeterConfig(updatedFlows, lMeters.Items, &flowMetadata); err != nil {
+		updatedFlows := map[uint64]*ofp.OfpFlowStats{flow.Id: flow}
+
+		flowMeterConfig, err := agent.GetMeterConfig(updatedFlows)
+		if err != nil {
 			logger.Error("Meter-referred-in-flow-not-present")
 			return changed, updated, err
 		}
-		flowGroups, _ := agent.ListLogicalDeviceFlowGroups(ctx)
-		deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *flowGroups)
+
+		groupIDs := agent.groupLoader.List()
+		groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
+		for groupID := range groupIDs {
+			if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+				groups[groupID] = groupHandle.GetReadOnly()
+				groupHandle.Unlock()
+			}
+		}
+
+		deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, updatedFlows, groups)
 		if err != nil {
 			return changed, updated, err
 		}
@@ -162,19 +159,18 @@
 		logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 		//	Update store and cache
 		if updated {
-			if err := agent.updateLogicalDeviceFlow(ctx, flow, flowChunk); err != nil {
+			if err := flowHandle.Update(ctx, flow); err != nil {
 				return changed, updated, err
 			}
 		}
-
-		respChannels := agent.addFlowsAndGroupsToDevices(deviceRules, &flowMetadata)
+		respChannels := agent.addFlowsAndGroupsToDevices(deviceRules, toMetadata(flowMeterConfig))
 		// Create the go routines to wait
 		go func() {
 			// Wait for completion
 			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
 				logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
 				// Revert added flows
-				if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
+				if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
 					logger.Errorw("failure-to-delete-flows-after-failed-addition", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
 				}
 			}
@@ -188,27 +184,24 @@
 func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
 	logger.Debugw("revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
 
-	agent.flowLock.RLock()
-	flowChunk, ok := agent.flows[addedFlow.Id]
-	agent.flowLock.RUnlock()
-	if !ok {
+	flowHandle, have := agent.flowLoader.Lock(addedFlow.Id)
+	if !have {
 		// Not found - do nothing
 		log.Debugw("flow-not-found", log.Fields{"added-flow": addedFlow})
 		return nil
 	}
-	//Leave the map lock and syncronize per flow
-	flowChunk.lock.Lock()
-	defer flowChunk.lock.Unlock()
+	defer flowHandle.Unlock()
 
 	if replacedFlow != nil {
-		if err := agent.updateLogicalDeviceFlow(ctx, replacedFlow, flowChunk); err != nil {
+		if err := flowHandle.Update(ctx, replacedFlow); err != nil {
 			return err
 		}
 	} else {
-		if err := agent.removeLogicalDeviceFlow(ctx, addedFlow.Id); err != nil {
+		if err := flowHandle.Delete(ctx); err != nil {
 			return err
 		}
 	}
+
 	// Revert meters
 	if changedMeterStats := agent.updateFlowCountOfMeterStats(ctx, mod, addedFlow, true); !changedMeterStats {
 		return fmt.Errorf("Unable-to-revert-meterstats-for-flow-%s", strconv.FormatUint(addedFlow.Id, 10))
@@ -235,56 +228,72 @@
 		return nil
 	}
 
+	//build a list of what to delete
+	toDelete := make(map[uint64]*ofp.OfpFlowStats)
+
+	// add perfectly matching entry if exists
 	fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
 	if err != nil {
 		return err
 	}
+	if handle, have := agent.flowLoader.Lock(fs.Id); have {
+		toDelete[fs.Id] = handle.GetReadOnly()
+		handle.Unlock()
+	}
 
-	//build a list of what to delete
-	toDelete := make([]*ofp.OfpFlowStats, 0)
-	toDeleteChunks := make([]*FlowChunk, 0)
-	//Lock the map to search the matched flows
-	agent.flowLock.RLock()
-	for _, f := range agent.flows {
-		if fu.FlowMatch(f.flow, fs) {
-			toDelete = append(toDelete, f.flow)
-			toDeleteChunks = append(toDeleteChunks, f)
-			continue
-		}
-		// Check wild card match
-		if fu.FlowMatchesMod(f.flow, mod) {
-			toDelete = append(toDelete, f.flow)
-			toDeleteChunks = append(toDeleteChunks, f)
+	// search through all the flows
+	for flowID := range agent.flowLoader.List() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			if flow := flowHandle.GetReadOnly(); fu.FlowMatchesMod(flow, mod) {
+				toDelete[flow.Id] = flow
+			}
+			flowHandle.Unlock()
 		}
 	}
-	agent.flowLock.RUnlock()
+
 	//Delete the matched flows
 	if len(toDelete) > 0 {
 		logger.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
-		var meters []*ofp.OfpMeterEntry
-		var flowGroups []*ofp.OfpGroupEntry
-		if ofpMeters, err := agent.ListLogicalDeviceMeters(ctx); err != nil {
-			meters = ofpMeters.Items
-		}
 
-		if groups, err := agent.ListLogicalDeviceFlowGroups(ctx); err != nil {
-			flowGroups = groups.Items
-		}
-
-		for _, fc := range toDeleteChunks {
-			if err := agent.deleteFlowAndUpdateMeterStats(ctx, mod, fc); err != nil {
-				return err
+		for _, flow := range toDelete {
+			if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+				// TODO: Flow should only be updated if meter is updated, and meter should only be updated if flow is updated
+				//       currently an error while performing the second operation will leave an inconsistent state in kv.
+				//       This should be a single atomic operation down to the kv.
+				if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flowHandle.GetReadOnly(), false); !changedMeter {
+					flowHandle.Unlock()
+					return fmt.Errorf("cannot-delete-flow-%d. Meter-update-failed", flow.Id)
+				}
+				// Update store and cache
+				if err := flowHandle.Delete(ctx); err != nil {
+					flowHandle.Unlock()
+					return fmt.Errorf("cannot-delete-flows-%d. Delete-from-store-failed", flow.Id)
+				}
+				flowHandle.Unlock()
+				// TODO: since this is executed in a loop without also updating meter stats, and error part way through this
+				//       operation will leave inconsistent state in the meter stats & flows on the devices.
+				//       This & related meter updates should be a single atomic operation down to the kv.
 			}
 		}
-		var flowMetadata voltha.FlowMetadata
-		if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
+
+		metersConfig, err := agent.GetMeterConfig(toDelete)
+		if err != nil { // This should never happen
 			logger.Error("Meter-referred-in-flows-not-present")
 			return err
 		}
+
+		groups := make(map[uint32]*ofp.OfpGroupEntry)
+		for groupID := range agent.groupLoader.List() {
+			if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+				groups[groupID] = groupHandle.GetReadOnly()
+				groupHandle.Unlock()
+			}
+		}
+
 		var respChnls []coreutils.Response
 		var partialRoute bool
 		var deviceRules *fu.DeviceRules
-		deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
+		deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, toDelete, groups)
 		if err != nil {
 			// A no route error means no route exists between the ports specified in the flow. This can happen when the
 			// child device is deleted and a request to delete flows from the parent device is received
@@ -297,9 +306,9 @@
 
 		// Update the devices
 		if partialRoute {
-			respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: toDelete}, &flowMetadata)
+			respChnls = agent.deleteFlowsFromParentDevice(toDelete, toMetadata(metersConfig))
 		} else {
-			respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
+			respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, toMetadata(metersConfig))
 		}
 
 		// Wait for the responses
@@ -327,38 +336,35 @@
 		return err
 	}
 	logger.Debugw("flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
-	agent.flowLock.RLock()
-	flowChunk, ok := agent.flows[flow.Id]
-	agent.flowLock.RUnlock()
-	if !ok {
+	flowHandle, have := agent.flowLoader.Lock(flow.Id)
+	if !have {
 		logger.Debugw("Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
 		return nil
 	}
-	//Release the map lock and syncronize per flow
-	flowChunk.lock.Lock()
-	defer flowChunk.lock.Unlock()
+	defer flowHandle.Unlock()
 
-	var meters []*ofp.OfpMeterEntry
-	var flowGroups []*ofp.OfpGroupEntry
-	if ofMeters, er := agent.ListLogicalDeviceMeters(ctx); er == nil {
-		meters = ofMeters.Items
+	groups := make(map[uint32]*ofp.OfpGroupEntry)
+	for groupID := range agent.groupLoader.List() {
+		if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+			groups[groupID] = groupHandle.GetReadOnly()
+			groupHandle.Unlock()
+		}
 	}
-	if ofGroups, er := agent.ListLogicalDeviceFlowGroups(ctx); er == nil {
-		flowGroups = ofGroups.Items
-	}
+
 	if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
 		return fmt.Errorf("Cannot delete flow - %s. Meter update failed", flow)
 	}
 
-	var flowMetadata voltha.FlowMetadata
-	flowsToDelete := []*ofp.OfpFlowStats{flowChunk.flow}
-	if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
+	flowsToDelete := map[uint64]*ofp.OfpFlowStats{flow.Id: flowHandle.GetReadOnly()}
+
+	flowMetadata, err := agent.GetMeterConfig(flowsToDelete)
+	if err != nil {
 		logger.Error("meter-referred-in-flows-not-present")
 		return err
 	}
 	var respChnls []coreutils.Response
 	var partialRoute bool
-	deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
+	deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, flowsToDelete, groups)
 	if err != nil {
 		// A no route error means no route exists between the ports specified in the flow. This can happen when the
 		// child device is deleted and a request to delete flows from the parent device is received
@@ -370,14 +376,14 @@
 	}
 
 	// Update the model
-	if err := agent.removeLogicalDeviceFlow(ctx, flow.Id); err != nil {
+	if err := flowHandle.Delete(ctx); err != nil {
 		return err
 	}
 	// Update the devices
 	if partialRoute {
-		respChnls = agent.deleteFlowsFromParentDevice(ofp.Flows{Items: flowsToDelete}, &flowMetadata)
+		respChnls = agent.deleteFlowsFromParentDevice(flowsToDelete, toMetadata(flowMetadata))
 	} else {
-		respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, &flowMetadata)
+		respChnls = agent.deleteFlowsAndGroupsFromDevices(deviceRules, toMetadata(flowMetadata))
 	}
 
 	// Wait for completion
@@ -400,3 +406,48 @@
 func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
 	return errors.New("flowModifyStrict not implemented")
 }
+
+// TODO: Remove this helper, just pass the map through to functions directly
+func toMetadata(meters map[uint32]*ofp.OfpMeterConfig) *voltha.FlowMetadata {
+	ctr, ret := 0, make([]*ofp.OfpMeterConfig, len(meters))
+	for _, meter := range meters {
+		ret[ctr] = meter
+		ctr++
+	}
+	return &voltha.FlowMetadata{Meters: ret}
+}
+
+func (agent *LogicalAgent) deleteFlowsHavingMeter(ctx context.Context, meterID uint32) error {
+	logger.Infow("Delete-flows-matching-meter", log.Fields{"meter": meterID})
+	for flowID := range agent.flowLoader.List() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
+				if err := flowHandle.Delete(ctx); err != nil {
+					//TODO: Think on carrying on and deleting the remaining flows, instead of returning.
+					//Anyways this returns an error to controller which possibly results with a re-deletion.
+					//Then how can we handle the new deletion request(Same for group deletion)?
+					return err
+				}
+			}
+			flowHandle.Unlock()
+		}
+	}
+	return nil
+}
+
+func (agent *LogicalAgent) deleteFlowsHavingGroup(ctx context.Context, groupID uint32) (map[uint64]*ofp.OfpFlowStats, error) {
+	logger.Infow("Delete-flows-matching-group", log.Fields{"groupID": groupID})
+	flowsRemoved := make(map[uint64]*ofp.OfpFlowStats)
+	for flowID := range agent.flowLoader.List() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			if flow := flowHandle.GetReadOnly(); fu.FlowHasOutGroup(flow, groupID) {
+				if err := flowHandle.Delete(ctx); err != nil {
+					return nil, err
+				}
+				flowsRemoved[flowID] = flow
+			}
+			flowHandle.Unlock()
+		}
+	}
+	return flowsRemoved, nil
+}