VOL-3121 - Separated out LogicalDevices' low-level flow/meter/group handling into separate packages.

The new implementation hides the compexity of locking, caching, and interacting with the db.
An attempt was made to ensure that locks are held while updates are made, by returning a "handle" object from each flow/group/meter lock() call, and only allowing access through this call.

An attempt was also made to remove proto.Clone-ing.  flows/groups/meters which are returned are NOT cloned, and MUST NOT be modified by users of the flow/group/meter loaders.  In addition, flows/groups/meters which are given to the loaders MUST NOT be modified afterward.

There remain many cases where errors during particular kv updates may cause inconsistent state.  TODOs have been added for many of these cases.  Resolving this may require exposing (through voltha-lib-go) the transaction mechanism from etcd.

There is also the issue that locking a flow/meter/group while another flow/meter/group is held could cause deadlocks.  This can be avoided by acquiring locks in a consistent order.  Something to keep in mind while fixing the previous issue.
Change-Id: I146eb319c3564635fdc461ec17be13e6f3968cf7
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index 73caa07..e126fdc 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"fmt"
-	"strconv"
 
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
@@ -30,6 +29,19 @@
 	"google.golang.org/grpc/status"
 )
 
+// listLogicalDeviceGroups returns logical device flow groups
+func (agent *LogicalAgent) listLogicalDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
+	groupIDs := agent.groupLoader.List()
+	groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
+	for groupID := range groupIDs {
+		if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+			groups[groupID] = groupHandle.GetReadOnly()
+			groupHandle.Unlock()
+		}
+	}
+	return groups
+}
+
 //updateGroupTable updates the group table of that logical device
 func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
 	logger.Debug("updateGroupTable")
@@ -53,36 +65,22 @@
 		return nil
 	}
 	logger.Debugw("groupAdd", log.Fields{"GroupId": groupMod.GroupId})
-	agent.groupLock.Lock()
-	_, ok := agent.groups[groupMod.GroupId]
-	if ok {
-		agent.groupLock.Unlock()
-		return fmt.Errorf("Group %d already exists", groupMod.GroupId)
-	}
 
 	groupEntry := fu.GroupEntryFromGroupMod(groupMod)
-	groupChunk := GroupChunk{
-		group: groupEntry,
-	}
-	//add to map
-	agent.groups[groupMod.GroupId] = &groupChunk
-	groupChunk.lock.Lock()
-	defer groupChunk.lock.Unlock()
-	agent.groupLock.Unlock()
-	//add to the kv store
-	path := fmt.Sprintf("groups/%s", agent.logicalDeviceID)
-	groupID := strconv.Itoa(int(groupMod.GroupId))
-	if err := agent.clusterDataProxy.AddWithID(ctx, path, groupID, groupEntry); err != nil {
-		logger.Errorw("failed-adding-group", log.Fields{"deviceID": agent.logicalDeviceID, "groupID": groupID, "err": err})
-		agent.groupLock.Lock()
-		delete(agent.groups, groupMod.GroupId)
-		agent.groupLock.Unlock()
+
+	groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, groupEntry)
+	if err != nil {
 		return err
 	}
-	deviceRules := fu.NewDeviceRules()
-	deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
+	groupHandle.Unlock()
+
+	if !created {
+		return fmt.Errorf("group %d already exists", groupMod.GroupId)
+	}
+
 	fg := fu.NewFlowsAndGroups()
-	fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
+	fg.AddGroup(groupEntry)
+	deviceRules := fu.NewDeviceRules()
 	deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
 	logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
@@ -105,63 +103,42 @@
 	if groupMod == nil {
 		return nil
 	}
-	affectedFlows := make([]*ofp.OfpFlowStats, 0)
-	affectedGroups := make([]*ofp.OfpGroupEntry, 0)
-	var groupsChanged bool
-	groupID := groupMod.GroupId
-	var err error
-	if groupID == uint32(ofp.OfpGroup_OFPG_ALL) {
-		if err := func() error {
-			agent.groupLock.Lock()
-			defer agent.groupLock.Unlock()
-			for key, groupChunk := range agent.groups {
-				//Remove from store and cache. Do this in a one time lock allocation.
-				path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, key)
-				if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
-					return fmt.Errorf("couldnt-deleted-group-from-store-%s", path)
-				}
-				delete(agent.groups, groupID)
-				var flows []*ofp.OfpFlowStats
-				if flows, err = agent.deleteFlowsOfGroup(ctx, key); err != nil {
-					logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": key})
-					return err
-				}
-				affectedFlows = append(affectedFlows, flows...)
-				affectedGroups = append(affectedGroups, groupChunk.group)
-			}
-			return nil
-		}(); err != nil {
-			return err
-		}
-		groupsChanged = true
-	} else {
-		agent.groupLock.RLock()
-		groupChunk, ok := agent.groups[groupID]
-		agent.groupLock.RUnlock()
-		if !ok {
-			logger.Warnw("group-not-found", log.Fields{"groupID": groupID})
-			return nil
-		}
-		groupChunk.lock.Lock()
-		defer groupChunk.lock.Unlock()
-		var flows []*ofp.OfpFlowStats
-		if flows, err = agent.deleteFlowsOfGroup(ctx, groupID); err != nil {
-			logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
-			return err
-		}
-		//remove from store
-		if err := agent.removeLogicalDeviceFlowGroup(ctx, groupID); err != nil {
-			return err
-		}
-		affectedFlows = append(affectedFlows, flows...)
-		affectedGroups = append(affectedGroups, groupChunk.group)
-		groupsChanged = true
 
+	affectedFlows := make(map[uint64]*ofp.OfpFlowStats)
+	affectedGroups := make(map[uint32]*ofp.OfpGroupEntry)
+	var groupsChanged bool
+
+	toDelete := map[uint32]struct{}{groupMod.GroupId: {}}
+	if groupMod.GroupId == uint32(ofp.OfpGroup_OFPG_ALL) {
+		toDelete = agent.groupLoader.List()
 	}
 
-	if err != nil || groupsChanged {
-		var deviceRules *fu.DeviceRules
-		deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: affectedFlows}, ofp.FlowGroups{Items: affectedGroups})
+	for groupID := range toDelete {
+		if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+			if err := groupHandle.Delete(ctx); err != nil {
+				return err
+			}
+			affectedGroups[groupID] = groupHandle.GetReadOnly()
+			groupHandle.Unlock()
+
+			//TODO: this is another case where ordering guarantees are not being made,
+			//      group deletion does not guarantee deletion of corresponding flows.
+			//      an error while deleting flows can cause inconsistent state.
+			flows, err := agent.deleteFlowsHavingGroup(ctx, groupID)
+			if err != nil {
+				logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
+				return err
+			}
+			for flowID, flow := range flows {
+				affectedFlows[flowID] = flow
+			}
+		}
+	}
+	groupsChanged = true
+
+	//TODO: groupsChanged is always true here?  use `len(affectedFlows)!=0` or `len(affectedGroups)!=0` instead?
+	if groupsChanged {
+		deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, affectedFlows, affectedGroups)
 		if err != nil {
 			return err
 		}
@@ -188,15 +165,13 @@
 	}
 
 	groupID := groupMod.GroupId
-	agent.groupLock.RLock()
-	groupChunk, ok := agent.groups[groupID]
-	agent.groupLock.RUnlock()
-	if !ok {
+
+	groupHandle, have := agent.groupLoader.Lock(groupID)
+	if !have {
 		return fmt.Errorf("group-absent:%d", groupID)
 	}
-	//Don't let any other thread to make modifications to this group till all done here.
-	groupChunk.lock.Lock()
-	defer groupChunk.lock.Unlock()
+	defer groupHandle.Unlock()
+
 	//replace existing group entry with new group definition
 	groupEntry := fu.GroupEntryFromGroupMod(groupMod)
 	deviceRules := fu.NewDeviceRules()
@@ -206,8 +181,9 @@
 	deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
 	logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
+
 	//update KV
-	if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
+	if err := groupHandle.Update(ctx, groupEntry); err != nil {
 		logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 		return err
 	}