VOL-3121 - Separated out LogicalDevices' low-level flow/meter/group handling into separate packages.

The new implementation hides the compexity of locking, caching, and interacting with the db.
An attempt was made to ensure that locks are held while updates are made, by returning a "handle" object from each flow/group/meter lock() call, and only allowing access through this call.

An attempt was also made to remove proto.Clone-ing.  flows/groups/meters which are returned are NOT cloned, and MUST NOT be modified by users of the flow/group/meter loaders.  In addition, flows/groups/meters which are given to the loaders MUST NOT be modified afterward.

There remain many cases where errors during particular kv updates may cause inconsistent state.  TODOs have been added for many of these cases.  Resolving this may require exposing (through voltha-lib-go) the transaction mechanism from etcd.

There is also the issue that locking a flow/meter/group while another flow/meter/group is held could cause deadlocks.  This can be avoided by acquiring locks in a consistent order.  Something to keep in mind while fixing the previous issue.
Change-Id: I146eb319c3564635fdc461ec17be13e6f3968cf7
diff --git a/rw_core/flowdecomposition/flow_decomposer_test.go b/rw_core/flowdecomposition/flow_decomposer_test.go
index eb7ad75..4c6ca8c 100644
--- a/rw_core/flowdecomposition/flow_decomposer_test.go
+++ b/rw_core/flowdecomposition/flow_decomposer_test.go
@@ -18,6 +18,8 @@
 import (
 	"context"
 	"errors"
+	"testing"
+
 	"github.com/opencord/voltha-go/rw_core/mocks"
 	"github.com/opencord/voltha-go/rw_core/route"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
@@ -26,8 +28,6 @@
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-
-	"testing"
 )
 
 type testDeviceManager struct {
@@ -476,11 +476,10 @@
 
 	fs, err := fu.MkFlowStat(fa)
 	assert.Nil(t, err)
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
-	groups := ofp.FlowGroups{}
+	flows := map[uint64]*ofp.OfpFlowStats{fs.Id: fs}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -541,11 +540,10 @@
 
 	fs, err := fu.MkFlowStat(fa)
 	assert.Nil(t, err)
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
-	groups := ofp.FlowGroups{}
+	flows := map[uint64]*ofp.OfpFlowStats{fs.Id: fs}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -605,11 +603,10 @@
 
 	fs, err := fu.MkFlowStat(fa)
 	assert.Nil(t, err)
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
-	groups := ofp.FlowGroups{}
+	flows := map[uint64]*ofp.OfpFlowStats{fs.Id: fs}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -669,11 +666,10 @@
 
 	fs, err := fu.MkFlowStat(fa)
 	assert.Nil(t, err)
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
-	groups := ofp.FlowGroups{}
+	flows := map[uint64]*ofp.OfpFlowStats{fs.Id: fs}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -737,10 +733,9 @@
 
 	fs, err := fu.MkFlowStat(fa)
 	assert.Nil(t, err)
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
-	groups := ofp.FlowGroups{}
+	flows := map[uint64]*ofp.OfpFlowStats{fs.Id: fs}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -796,19 +791,19 @@
 	assert.Nil(t, err)
 	fs2, err := fu.MkFlowStat(fa2)
 	assert.Nil(t, err)
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs, fs2}}
-	flows.Items[0].Instructions = []*ofp.OfpInstruction{{
+
+	fs.Instructions = []*ofp.OfpInstruction{{
 		Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE),
 		Data: &ofp.OfpInstruction_GotoTable{
 			GotoTable: &ofp.OfpInstructionGotoTable{
 				TableId: 1,
 			},
 		}}}
+	flows := map[uint64]*ofp.OfpFlowStats{fs.Id: fs, fs2.Id: fs2}
 
-	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -905,19 +900,18 @@
 	assert.Nil(t, err)
 	fs2, err := fu.MkFlowStat(fa2)
 	assert.Nil(t, err)
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs1, fs2}}
-	flows.Items[0].Instructions = []*ofp.OfpInstruction{{
+	fs1.Instructions = []*ofp.OfpInstruction{{
 		Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE),
 		Data: &ofp.OfpInstruction_GotoTable{
 			GotoTable: &ofp.OfpInstructionGotoTable{
 				TableId: 1,
 			},
 		}}}
+	flows := map[uint64]*ofp.OfpFlowStats{fs1.Id: fs1, fs2.Id: fs2}
 
-	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
-	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
+	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, nil)
 	assert.Nil(t, err)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -1004,8 +998,8 @@
 
 	fs, err := fu.MkFlowStat(fa)
 	assert.Nil(t, err)
-	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fs}}
-	groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{fu.MkGroupStat(ga)}}
+	flows := map[uint64]*ofp.OfpFlowStats{fs.Id: fs}
+	groups := map[uint32]*ofp.OfpGroupEntry{ga.GroupId: fu.MkGroupStat(ga)}
 	tfd := newTestFlowDecomposer(t, newTestDeviceManager())
 
 	deviceRules, err := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)