VOL-3121 - Separated out LogicalDevices' low-level flow/meter/group handling into separate packages.

The new implementation hides the compexity of locking, caching, and interacting with the db.
An attempt was made to ensure that locks are held while updates are made, by returning a "handle" object from each flow/group/meter lock() call, and only allowing access through this call.

An attempt was also made to remove proto.Clone-ing.  flows/groups/meters which are returned are NOT cloned, and MUST NOT be modified by users of the flow/group/meter loaders.  In addition, flows/groups/meters which are given to the loaders MUST NOT be modified afterward.

There remain many cases where errors during particular kv updates may cause inconsistent state.  TODOs have been added for many of these cases.  Resolving this may require exposing (through voltha-lib-go) the transaction mechanism from etcd.

There is also the issue that locking a flow/meter/group while another flow/meter/group is held could cause deadlocks.  This can be avoided by acquiring locks in a consistent order.  Something to keep in mind while fixing the previous issue.
Change-Id: I146eb319c3564635fdc461ec17be13e6f3968cf7
diff --git a/rw_core/core/device/logical_agent_meter.go b/rw_core/core/device/logical_agent_meter.go
index c211f1e..dc44fda 100644
--- a/rw_core/core/device/logical_agent_meter.go
+++ b/rw_core/core/device/logical_agent_meter.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"fmt"
-	"strconv"
 
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -28,6 +27,19 @@
 	"google.golang.org/grpc/status"
 )
 
+// listLogicalDeviceMeters returns logical device meters
+func (agent *LogicalAgent) listLogicalDeviceMeters() map[uint32]*ofp.OfpMeterEntry {
+	meterIDs := agent.meterLoader.List()
+	meters := make(map[uint32]*ofp.OfpMeterEntry, len(meterIDs))
+	for meterID := range meterIDs {
+		if meterHandle, have := agent.meterLoader.Lock(meterID); have {
+			meters[meterID] = meterHandle.GetReadOnly()
+			meterHandle.Unlock()
+		}
+	}
+	return meters
+}
+
 // updateMeterTable updates the meter table of that logical device
 func (agent *LogicalAgent) updateMeterTable(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
 	logger.Debug("updateMeterTable")
@@ -53,34 +65,18 @@
 	}
 
 	meterEntry := fu.MeterEntryFromMeterMod(meterMod)
-	agent.meterLock.Lock()
-	//check if the meter already exists or not
-	_, ok := agent.meters[meterMod.MeterId]
-	if ok {
-		logger.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
-		agent.meterLock.Unlock()
-		return nil
-	}
 
-	mChunk := MeterChunk{
-		meter: meterEntry,
-	}
-	//Add to map and acquire the per meter lock
-	agent.meters[meterMod.MeterId] = &mChunk
-	mChunk.lock.Lock()
-	defer mChunk.lock.Unlock()
-	agent.meterLock.Unlock()
-	meterID := strconv.Itoa(int(meterMod.MeterId))
-	if err := agent.clusterDataProxy.AddWithID(ctx, "meters/"+agent.logicalDeviceID, meterID, meterEntry); err != nil {
-		logger.Errorw("failed-adding-meter", log.Fields{"deviceID": agent.logicalDeviceID, "meterID": meterID, "err": err})
-		//Revert the map
-		agent.meterLock.Lock()
-		delete(agent.meters, meterMod.MeterId)
-		agent.meterLock.Unlock()
+	meterHandle, created, err := agent.meterLoader.LockOrCreate(ctx, meterEntry)
+	if err != nil {
 		return err
 	}
+	defer meterHandle.Unlock()
 
-	logger.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry})
+	if created {
+		logger.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry})
+	} else {
+		logger.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
+	}
 	return nil
 }
 
@@ -89,25 +85,25 @@
 	if meterMod == nil {
 		return nil
 	}
-	agent.meterLock.RLock()
-	meterChunk, ok := agent.meters[meterMod.MeterId]
-	agent.meterLock.RUnlock()
-	if ok {
-		//Dont let anyone to do any changes to this meter until this is done.
-		//And wait if someone else is already making modifications. Do this with per meter lock.
-		meterChunk.lock.Lock()
-		defer meterChunk.lock.Unlock()
-		if err := agent.deleteFlowsOfMeter(ctx, meterMod.MeterId); err != nil {
-			return err
-		}
-		//remove from the store and cache
-		if err := agent.removeLogicalDeviceMeter(ctx, meterMod.MeterId); err != nil {
-			return err
-		}
-		logger.Debugw("meterDelete-success", log.Fields{"meterID": meterMod.MeterId})
-	} else {
+
+	meterHandle, have := agent.meterLoader.Lock(meterMod.MeterId)
+	if !have {
 		logger.Warnw("meter-not-found", log.Fields{"meterID": meterMod.MeterId})
+		return nil
 	}
+	defer meterHandle.Unlock()
+
+	//TODO: A meter lock is held here while flow lock(s) are acquired, if this is done in opposite order anywhere
+	//      there's potential for deadlock.
+	if err := agent.deleteFlowsHavingMeter(ctx, meterMod.MeterId); err != nil {
+		return err
+	}
+
+	if err := meterHandle.Delete(ctx); err != nil {
+		return err
+	}
+
+	logger.Debugw("meterDelete-success", log.Fields{"meterID": meterMod.MeterId})
 	return nil
 }
 
@@ -116,21 +112,18 @@
 	if meterMod == nil {
 		return nil
 	}
-	newMeter := fu.MeterEntryFromMeterMod(meterMod)
-	agent.meterLock.RLock()
-	meterChunk, ok := agent.meters[newMeter.Config.MeterId]
-	agent.meterLock.RUnlock()
-	if !ok {
-		return fmt.Errorf("no-meter-to-modify:%d", newMeter.Config.MeterId)
+
+	meterHandle, have := agent.meterLoader.Lock(meterMod.MeterId)
+	if !have {
+		return fmt.Errorf("no-meter-to-modify: %d", meterMod.MeterId)
 	}
-	//Release the map lock and syncronize per meter
-	meterChunk.lock.Lock()
-	defer meterChunk.lock.Unlock()
-	oldMeter := meterChunk.meter
+	defer meterHandle.Unlock()
+
+	oldMeter := meterHandle.GetReadOnly()
+	newMeter := fu.MeterEntryFromMeterMod(meterMod)
 	newMeter.Stats.FlowCount = oldMeter.Stats.FlowCount
 
-	if err := agent.updateLogicalDeviceMeter(ctx, newMeter, meterChunk); err != nil {
-		logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "meterID": newMeter.Config.MeterId})
+	if err := meterHandle.Update(ctx, newMeter); err != nil {
 		return err
 	}
 	logger.Debugw("replaced-with-new-meter", log.Fields{"oldMeter": oldMeter, "newMeter": newMeter})