VOL-3121 - Separated out LogicalDevices' low-level flow/meter/group handling into separate packages.

The new implementation hides the compexity of locking, caching, and interacting with the db.
An attempt was made to ensure that locks are held while updates are made, by returning a "handle" object from each flow/group/meter lock() call, and only allowing access through this call.

An attempt was also made to remove proto.Clone-ing.  flows/groups/meters which are returned are NOT cloned, and MUST NOT be modified by users of the flow/group/meter loaders.  In addition, flows/groups/meters which are given to the loaders MUST NOT be modified afterward.

There remain many cases where errors during particular kv updates may cause inconsistent state.  TODOs have been added for many of these cases.  Resolving this may require exposing (through voltha-lib-go) the transaction mechanism from etcd.

There is also the issue that locking a flow/meter/group while another flow/meter/group is held could cause deadlocks.  This can be avoided by acquiring locks in a consistent order.  Something to keep in mind while fixing the previous issue.
Change-Id: I146eb319c3564635fdc461ec17be13e6f3968cf7
diff --git a/rw_core/core/device/group/loader.go b/rw_core/core/device/group/loader.go
new file mode 100644
index 0000000..0e3f078
--- /dev/null
+++ b/rw_core/core/device/group/loader.go
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package group
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+
+	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+// Loader hides all low-level locking & synchronization related to group state updates
+type Loader struct {
+	// this lock protects the groups map, it does not protect individual groups
+	lock   sync.RWMutex
+	groups map[uint32]*chunk
+
+	dbProxy         *model.Proxy
+	logicalDeviceID string // TODO: dbProxy should already have the logicalDeviceID component of the path internally
+}
+
+// chunk keeps a group and the lock for this group
+type chunk struct {
+	// this lock is used to synchronize all access to the group, and also to the "deleted" variable
+	lock    sync.Mutex
+	deleted bool
+
+	group *ofp.OfpGroupEntry
+}
+
+func NewLoader(dataProxy *model.Proxy, logicalDeviceID string) *Loader {
+	return &Loader{
+		groups:          make(map[uint32]*chunk),
+		dbProxy:         dataProxy,
+		logicalDeviceID: logicalDeviceID,
+	}
+}
+
+// Load queries existing groups from the kv,
+// and should only be called once when first created.
+func (loader *Loader) Load(ctx context.Context) {
+	loader.lock.Lock()
+	defer loader.lock.Unlock()
+
+	var groups []*ofp.OfpGroupEntry
+	if err := loader.dbProxy.List(ctx, "logical_groups/"+loader.logicalDeviceID, &groups); err != nil {
+		logger.Errorw("failed-to-list-groups-from-cluster-data-proxy", log.Fields{"error": err})
+		return
+	}
+	for _, group := range groups {
+		loader.groups[group.Desc.GroupId] = &chunk{group: group}
+	}
+}
+
+// LockOrCreate locks this group if it exists, or creates a new group if it does not.
+// In the case of group creation, the provided "group" must not be modified afterwards.
+func (loader *Loader) LockOrCreate(ctx context.Context, group *ofp.OfpGroupEntry) (*Handle, bool, error) {
+	// try to use read lock instead of full lock if possible
+	if handle, have := loader.Lock(group.Desc.GroupId); have {
+		return handle, false, nil
+	}
+
+	loader.lock.Lock()
+	entry, have := loader.groups[group.Desc.GroupId]
+	if !have {
+		entry := &chunk{group: group}
+		loader.groups[group.Desc.GroupId] = entry
+		entry.lock.Lock()
+		loader.lock.Unlock()
+
+		groupID := strconv.FormatUint(uint64(group.Desc.GroupId), 10)
+		if err := loader.dbProxy.AddWithID(ctx, "logical_groups/"+loader.logicalDeviceID, groupID, group); err != nil {
+			logger.Errorw("failed-adding-group-to-db", log.Fields{"deviceID": loader.logicalDeviceID, "groupID": groupID, "err": err})
+
+			// revert the map
+			loader.lock.Lock()
+			delete(loader.groups, group.Desc.GroupId)
+			loader.lock.Unlock()
+
+			entry.deleted = true
+			entry.lock.Unlock()
+			return nil, false, err
+		}
+		return &Handle{loader: loader, chunk: entry}, true, nil
+	}
+	loader.lock.Unlock()
+
+	entry.lock.Lock()
+	if entry.deleted {
+		entry.lock.Unlock()
+		return loader.LockOrCreate(ctx, group)
+	}
+	return &Handle{loader: loader, chunk: entry}, false, nil
+}
+
+// Lock acquires the lock for this group, and returns a handle which can be used to access the group until it's unlocked.
+// This handle ensures that the group cannot be accessed if the lock is not held.
+// Returns false if the group is not present.
+// TODO: consider accepting a ctx and aborting the lock attempt on cancellation
+func (loader *Loader) Lock(id uint32) (*Handle, bool) {
+	loader.lock.RLock()
+	entry, have := loader.groups[id]
+	loader.lock.RUnlock()
+
+	if !have {
+		return nil, false
+	}
+
+	entry.lock.Lock()
+	if entry.deleted {
+		entry.lock.Unlock()
+		return loader.Lock(id)
+	}
+	return &Handle{loader: loader, chunk: entry}, true
+}
+
+type Handle struct {
+	loader *Loader
+	chunk  *chunk
+}
+
+// GetReadOnly returns an *ofp.OfpGroupEntry which MUST NOT be modified externally, but which is safe to keep indefinitely
+func (h *Handle) GetReadOnly() *ofp.OfpGroupEntry {
+	return h.chunk.group
+}
+
+// Update updates an existing group in the kv.
+// The provided "group" must not be modified afterwards.
+func (h *Handle) Update(ctx context.Context, group *ofp.OfpGroupEntry) error {
+	path := fmt.Sprintf("logical_groups/%s/%d", h.loader.logicalDeviceID, group.Desc.GroupId)
+	if err := h.loader.dbProxy.Update(ctx, path, group); err != nil {
+		return status.Errorf(codes.Internal, "failed-update-group:%s:%d %s", h.loader.logicalDeviceID, group.Desc.GroupId, err)
+	}
+	h.chunk.group = group
+	return nil
+}
+
+// Delete removes the device from the kv
+func (h *Handle) Delete(ctx context.Context) error {
+	path := fmt.Sprintf("logical_groups/%s/%d", h.loader.logicalDeviceID, h.chunk.group.Desc.GroupId)
+	if err := h.loader.dbProxy.Remove(ctx, path); err != nil {
+		return fmt.Errorf("couldnt-delete-group-from-store-%s", path)
+	}
+	h.chunk.deleted = true
+
+	h.loader.lock.Lock()
+	delete(h.loader.groups, h.chunk.group.Desc.GroupId)
+	h.loader.lock.Unlock()
+
+	h.Unlock()
+	return nil
+}
+
+// Unlock releases the lock on the group
+func (h *Handle) Unlock() {
+	if h.chunk != nil {
+		h.chunk.lock.Unlock()
+		h.chunk = nil // attempting to access the group through this handle in future will panic
+	}
+}
+
+// List returns a snapshot of all the managed group IDs
+// TODO: iterating through groups safely is expensive now, since all groups are stored & locked separately
+//       should avoid this where possible
+func (loader *Loader) List() map[uint32]struct{} {
+	loader.lock.RLock()
+	defer loader.lock.RUnlock()
+	// copy the IDs so caller can safely iterate
+	ret := make(map[uint32]struct{}, len(loader.groups))
+	for id := range loader.groups {
+		ret[id] = struct{}{}
+	}
+	return ret
+}