VOL-3121 - Separated out LogicalDevices' low-level flow/meter/group handling into separate packages.

The new implementation hides the compexity of locking, caching, and interacting with the db.
An attempt was made to ensure that locks are held while updates are made, by returning a "handle" object from each flow/group/meter lock() call, and only allowing access through this call.

An attempt was also made to remove proto.Clone-ing.  flows/groups/meters which are returned are NOT cloned, and MUST NOT be modified by users of the flow/group/meter loaders.  In addition, flows/groups/meters which are given to the loaders MUST NOT be modified afterward.

There remain many cases where errors during particular kv updates may cause inconsistent state.  TODOs have been added for many of these cases.  Resolving this may require exposing (through voltha-lib-go) the transaction mechanism from etcd.

There is also the issue that locking a flow/meter/group while another flow/meter/group is held could cause deadlocks.  This can be avoided by acquiring locks in a consistent order.  Something to keep in mind while fixing the previous issue.
Change-Id: I146eb319c3564635fdc461ec17be13e6f3968cf7
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 7723b74..dbb2da5 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -25,6 +25,9 @@
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/rw_core/core/device/flow"
+	"github.com/opencord/voltha-go/rw_core/core/device/group"
+	"github.com/opencord/voltha-go/rw_core/core/device/meter"
 	fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
 	"github.com/opencord/voltha-go/rw_core/route"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
@@ -56,32 +59,31 @@
 	startOnce          sync.Once
 	stopOnce           sync.Once
 
-	meters    map[uint32]*MeterChunk
-	meterLock sync.RWMutex
-	flows     map[uint64]*FlowChunk
-	flowLock  sync.RWMutex
-	groups    map[uint32]*GroupChunk
-	groupLock sync.RWMutex
+	flowLoader  *flow.Loader
+	meterLoader *meter.Loader
+	groupLoader *group.Loader
 }
 
 func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
 	deviceMgr *Manager, cdProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
-	var agent LogicalAgent
-	agent.logicalDeviceID = id
-	agent.serialNumber = sn
-	agent.rootDeviceID = deviceID
-	agent.deviceMgr = deviceMgr
-	agent.clusterDataProxy = cdProxy
-	agent.ldeviceMgr = ldeviceMgr
-	agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
-	agent.logicalPortsNo = make(map[uint32]bool)
-	agent.defaultTimeout = defaultTimeout
-	agent.requestQueue = coreutils.NewRequestQueue()
-	agent.meters = make(map[uint32]*MeterChunk)
-	agent.flows = make(map[uint64]*FlowChunk)
-	agent.groups = make(map[uint32]*GroupChunk)
+	agent := &LogicalAgent{
+		logicalDeviceID:  id,
+		serialNumber:     sn,
+		rootDeviceID:     deviceID,
+		deviceMgr:        deviceMgr,
+		clusterDataProxy: cdProxy,
+		ldeviceMgr:       ldeviceMgr,
+		flowDecomposer:   fd.NewFlowDecomposer(deviceMgr),
+		logicalPortsNo:   make(map[uint32]bool),
+		defaultTimeout:   defaultTimeout,
+		requestQueue:     coreutils.NewRequestQueue(),
+
+		flowLoader:  flow.NewLoader(cdProxy, id),
+		meterLoader: meter.NewLoader(cdProxy, id),
+		groupLoader: group.NewLoader(cdProxy, id),
+	}
 	agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
-	return &agent
+	return agent
 }
 
 // start creates the logical device and add it to the data model
@@ -161,9 +163,9 @@
 		// Setup the local list of logical ports
 		agent.addLogicalPortsToMap(ld.Ports)
 		// load the flows, meters and groups from KV to cache
-		agent.loadFlows(ctx)
-		agent.loadMeters(ctx)
-		agent.loadGroups(ctx)
+		agent.flowLoader.Load(ctx)
+		agent.meterLoader.Load(ctx)
+		agent.groupLoader.Load(ctx)
 	}
 
 	// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
@@ -234,20 +236,6 @@
 	}
 
 	agent.logicalDevice = logicalDevice
-
-	return nil
-}
-
-func (agent *LogicalAgent) deleteFlowAndUpdateMeterStats(ctx context.Context, mod *ofp.OfpFlowMod, chunk *FlowChunk) error {
-	chunk.lock.Lock()
-	defer chunk.lock.Unlock()
-	if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, chunk.flow, false); !changedMeter {
-		return fmt.Errorf("Cannot-delete-flow-%s. Meter-update-failed", chunk.flow)
-	}
-	// Update store and cache
-	if err := agent.removeLogicalDeviceFlow(ctx, chunk.flow.Id); err != nil {
-		return fmt.Errorf("Cannot-delete-flows-%s. Delete-from-store-failed", chunk.flow)
-	}
 	return nil
 }
 
@@ -314,10 +302,10 @@
 	return responses
 }
 
-func (agent *LogicalAgent) deleteFlowsFromParentDevice(flows ofp.Flows, metadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) deleteFlowsFromParentDevice(flows map[uint64]*ofp.OfpFlowStats, metadata *voltha.FlowMetadata) []coreutils.Response {
 	logger.Debugw("deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
 	responses := make([]coreutils.Response, 0)
-	for _, flow := range flows.Items {
+	for _, flow := range flows {
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		uniPort, err := agent.getUNILogicalPortNo(flow)