VOL-3121 - Separated out LogicalDevices' low-level flow/meter/group handling into separate packages.

The new implementation hides the compexity of locking, caching, and interacting with the db.
An attempt was made to ensure that locks are held while updates are made, by returning a "handle" object from each flow/group/meter lock() call, and only allowing access through this call.

An attempt was also made to remove proto.Clone-ing.  flows/groups/meters which are returned are NOT cloned, and MUST NOT be modified by users of the flow/group/meter loaders.  In addition, flows/groups/meters which are given to the loaders MUST NOT be modified afterward.

There remain many cases where errors during particular kv updates may cause inconsistent state.  TODOs have been added for many of these cases.  Resolving this may require exposing (through voltha-lib-go) the transaction mechanism from etcd.

There is also the issue that locking a flow/meter/group while another flow/meter/group is held could cause deadlocks.  This can be avoided by acquiring locks in a consistent order.  Something to keep in mind while fixing the previous issue.
Change-Id: I146eb319c3564635fdc461ec17be13e6f3968cf7
diff --git a/rw_core/core/device/flow/common.go b/rw_core/core/device/flow/common.go
new file mode 100644
index 0000000..ca8bb6a
--- /dev/null
+++ b/rw_core/core/device/flow/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package flow
+
+import (
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "flow"})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/rw_core/core/device/flow/loader.go b/rw_core/core/device/flow/loader.go
new file mode 100644
index 0000000..741a45f
--- /dev/null
+++ b/rw_core/core/device/flow/loader.go
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flow
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+
+	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+// Loader hides all low-level locking & synchronization related to flow state updates
+type Loader struct {
+	// this lock protects the flows map, it does not protect individual flows
+	lock  sync.RWMutex
+	flows map[uint64]*chunk
+
+	dbProxy         *model.Proxy
+	logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+}
+
+// chunk keeps a flow and the lock for this flow
+type chunk struct {
+	// this lock is used to synchronize all access to the flow, and also to the "deleted" variable
+	lock    sync.Mutex
+	deleted bool
+
+	flow *ofp.OfpFlowStats
+}
+
+func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+	return &Loader{
+		flows:           make(map[uint64]*chunk),
+		dbProxy:         dataProxy,
+		logicalDeviceID: logicalDeviceID,
+	}
+}
+
+// Load queries existing flows from the kv,
+// and should only be called once when first created.
+func (loader *Loader) Load(ctx context.Context) {
+	loader.lock.Lock()
+	defer loader.lock.Unlock()
+
+	var flows []*ofp.OfpFlowStats
+	if err := loader.dbProxy.List(ctx, "logical_flows/"+loader.logicalDeviceID, &flows); err != nil {
+		logger.Errorw("failed-to-list-flows-from-cluster-data-proxy", log.Fields{"error": err})
+		return
+	}
+	for _, flow := range flows {
+		loader.flows[flow.Id] = &chunk{flow: flow}
+	}
+}
+
+// LockOrCreate locks this flow if it exists, or creates a new flow if it does not.
+// In the case of flow creation, the provided "flow" must not be modified afterwards.
+func (loader *Loader) LockOrCreate(ctx context.Context, flow *ofp.OfpFlowStats) (*Handle, bool, error) {
+	// try to use read lock instead of full lock if possible
+	if handle, have := loader.Lock(flow.Id); have {
+		return handle, false, nil
+	}
+
+	loader.lock.Lock()
+	entry, have := loader.flows[flow.Id]
+	if !have {
+		entry := &chunk{flow: flow}
+		loader.flows[flow.Id] = entry
+		entry.lock.Lock()
+		loader.lock.Unlock()
+
+		flowID := strconv.FormatUint(uint64(flow.Id), 10)
+		if err := loader.dbProxy.AddWithID(ctx, "logical_flows/"+loader.logicalDeviceID, flowID, flow); err != nil {
+			logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "flowID": flowID, "err": err})
+
+			// revert the map
+			loader.lock.Lock()
+			delete(loader.flows, flow.Id)
+			loader.lock.Unlock()
+
+			entry.deleted = true
+			entry.lock.Unlock()
+			return nil, false, err
+		}
+		return &Handle{loader: loader, chunk: entry}, true, nil
+	}
+	loader.lock.Unlock()
+
+	entry.lock.Lock()
+	if entry.deleted {
+		entry.lock.Unlock()
+		return loader.LockOrCreate(ctx, flow)
+	}
+	return &Handle{loader: loader, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this flow, and returns a handle which can be used to access the meter until it's unlocked.
+// This handle ensures that the flow cannot be accessed if the lock is not held.
+// Returns false if the flow is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (loader *Loader) Lock(id uint64) (*Handle, bool) {
+	loader.lock.RLock()
+	entry, have := loader.flows[id]
+	loader.lock.RUnlock()
+
+	if !have {
+		return nil, false
+	}
+
+	entry.lock.Lock()
+	if entry.deleted {
+		entry.lock.Unlock()
+		return loader.Lock(id)
+	}
+	return &Handle{loader: loader, chunk: entry}, true
+}
+
+type Handle struct {
+	loader *Loader
+	chunk  *chunk
+}
+
+// GetReadOnly returns an *ofp.OfpFlowStats which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *ofp.OfpFlowStats {
+	return h.chunk.flow
+}
+
+// Update updates an existing flow in the kv.
+// The provided "flow" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, flow *ofp.OfpFlowStats) error {
+	path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, flow.Id)
+	if err := h.loader.dbProxy.Update(ctx, path, flow); err != nil {
+		return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", h.loader.logicalDeviceID, flow.Id, err)
+	}
+	h.chunk.flow = flow
+	return nil
+}
+
+// Delete removes the device from the kv
+func (h *Handle) Delete(ctx context.Context) error {
+	path := fmt.Sprintf("logical_flows/%s/%d", h.loader.logicalDeviceID, h.chunk.flow.Id)
+	if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
+		return fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
+	}
+	h.chunk.deleted = true
+
+	h.loader.lock.Lock()
+	delete(h.loader.flows, h.chunk.flow.Id)
+	h.loader.lock.Unlock()
+
+	h.Unlock()
+	return nil
+}
+
+// Unlock releases the lock on the flow
+func (h *Handle) Unlock() {
+	if h.chunk != nil {
+		h.chunk.lock.Unlock()
+		h.chunk = nil // attempting to access the flow through this handle in future will panic
+	}
+}
+
+// List returns a snapshot of all the managed flow IDs
+// TODO: iterating through flows safely is expensive now, since all flows are stored & locked separately
+//       should avoid this where possible
+func (loader *Loader) List() map[uint64]struct{} {
+	loader.lock.RLock()
+	defer loader.lock.RUnlock()
+	// copy the IDs so caller can safely iterate
+	ret := make(map[uint64]struct{}, len(loader.flows))
+	for id := range loader.flows {
+		ret[id] = struct{}{}
+	}
+	return ret
+}
diff --git a/rw_core/core/device/flow/loader_test.go b/rw_core/core/device/flow/loader_test.go
new file mode 100644
index 0000000..b739272
--- /dev/null
+++ b/rw_core/core/device/flow/loader_test.go
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flow
+
+import (
+	"bufio"
+	"fmt"
+	"os"
+	"regexp"
+	"strconv"
+	"strings"
+	"testing"
+)
+
+// TestLoadersIdentical ensures that the group, flow, and meter loaders always have an identical implementation.
+func TestLoadersIdentical(t *testing.T) {
+	identical := [][]string{
+		{"ofp\\.OfpFlowStats", "ofp\\.OfpGroupEntry", "ofp\\.OfpMeterEntry"},
+		{"\\.Id", "\\.Desc\\.GroupId", "\\.Config.MeterId"},
+		{"uint64", "uint32", "uint32"},
+		{"Flow", "Group", "Meter"},
+		{"flow", "group", "meter"},
+	}
+
+	regexes := make([]*regexp.Regexp, len(identical))
+	for i, group := range identical {
+		regexes[i] = regexp.MustCompile(strings.Join(group, "|"))
+	}
+
+	for i := 1; i < len(identical[0]); i++ {
+		if err := compare(regexes, "../"+identical[4][0]+"/loader.go", "../"+identical[4][i]+"/loader.go"); err != nil {
+			t.Error(err)
+			return
+		}
+	}
+}
+
+func compare(regexes []*regexp.Regexp, fileNameA, fileNameB string) error {
+	fileA, err := os.Open(fileNameA)
+	if err != nil {
+		return err
+	}
+	defer fileA.Close()
+
+	fileB, err := os.Open(fileNameB)
+	if err != nil {
+		return err
+	}
+	defer fileB.Close()
+
+	scannerA, scannerB := bufio.NewScanner(fileA), bufio.NewScanner(fileB)
+
+	spaceRegex := regexp.MustCompile(" +")
+
+	line := 1
+	for {
+		if continueA, continueB := scannerA.Scan(), scannerB.Scan(); continueA != continueB {
+			if !continueA && continueB {
+				if err := scannerA.Err(); err != nil {
+					return err
+				}
+			}
+			if continueA && !continueB {
+				if err := scannerB.Err(); err != nil {
+					return err
+				}
+			}
+			return fmt.Errorf("line %d: files are not the same length", line)
+		} else if !continueA {
+			// EOF from both files
+			break
+		}
+
+		textA, textB := scannerA.Text(), scannerB.Text()
+
+		replacedA, replacedB := textA, textB
+		for i, regex := range regexes {
+			replacement := "{{type" + strconv.Itoa(i) + "}}"
+			replacedA, replacedB = regex.ReplaceAllString(replacedA, replacement), regex.ReplaceAllString(replacedB, replacement)
+		}
+
+		// replace multiple spaces with single space
+		replacedA, replacedB = spaceRegex.ReplaceAllString(replacedA, " "), spaceRegex.ReplaceAllString(replacedB, " ")
+
+		if replacedA != replacedB {
+			return fmt.Errorf("line %d: files %s and %s do not match: \n\t%s\n\t%s\n\n\t%s\n\t%s", line, fileNameA, fileNameB, textA, textB, replacedA, replacedB)
+		}
+
+		line++
+	}
+
+	if err := scannerA.Err(); err != nil {
+		return err
+	}
+	if err := scannerB.Err(); err != nil {
+		return err
+	}
+	return nil
+}