[VOL-2688] Improve core model performance

This commit addresses the low-hanging performance hogs in the
core model.  In particular, the following changes are made:

1) Remove proto message comparision when it's possible.  The proto
message deep comparison is quite expensive.
2) Since the Core already has a lock on the device/logicaldevice/
adapters/etc before invoking the model proxy then there is no
need for the latter to create an additional lock on these artifacts
duting an update
3) The model creates a watch on every artifacts it adds to the KV
store.   Since in the next Voltha release we will not be using Voltha
Core in pairs then there is no point in keeping these watches (these
is only 1 Core that will ever update an artifact in the next
deployment).  This update removes these watch.
4) Additional unit tests has been created, mostly around flows, in an
attempt to exercise both the core and the model further.

Change-Id: Ieaf1f6b9b05c56e819600bc55b46a05f73b8efcf
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 3637e9a..15e438c 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -25,7 +25,6 @@
 	"sync"
 
 	"github.com/golang/protobuf/proto"
-	"github.com/google/uuid"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -42,21 +41,6 @@
 	versionMutex sync.RWMutex
 	Version      int64
 	isStored     bool
-	isWatched    bool
-}
-
-type watchCache struct {
-	Cache sync.Map
-}
-
-var watchCacheInstance *watchCache
-var watchCacheOne sync.Once
-
-func watches() *watchCache {
-	watchCacheOne.Do(func() {
-		watchCacheInstance = &watchCache{Cache: sync.Map{}}
-	})
-	return watchCacheInstance
 }
 
 // NewPersistedRevision creates a new instance of a PersistentRevision structure
@@ -119,154 +103,6 @@
 	}
 }
 
-// SetupWatch -
-func (pr *PersistedRevision) SetupWatch(ctx context.Context, key string) {
-	if key == "" {
-		log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
-		return
-	}
-
-	if _, exists := watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
-		return
-	}
-
-	if pr.events == nil {
-		pr.events = make(chan *kvstore.Event)
-
-		log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
-
-		pr.SetName(key)
-		pr.events = pr.kvStore.CreateWatch(ctx, key, false)
-	}
-
-	if !pr.isWatched {
-		pr.isWatched = true
-
-		log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
-
-		// Start watching
-		go pr.startWatching(ctx)
-	}
-}
-
-func (pr *PersistedRevision) startWatching(ctx context.Context) {
-	log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
-
-StopWatchLoop:
-	for {
-		latestRev := pr.GetBranch().GetLatest()
-		event, ok := <-pr.events
-		if !ok {
-			log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
-			break StopWatchLoop
-		}
-		log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
-
-		switch event.EventType {
-		case kvstore.DELETE:
-			log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
-
-			// Remove reference from cache
-			getRevCache().Delete(latestRev.GetName())
-
-			// Remove reference from parent
-			parent := pr.GetBranch().Node.GetRoot()
-			parent.GetBranch(NONE).Latest.ChildDropByName(latestRev.GetName())
-
-			break StopWatchLoop
-
-		case kvstore.PUT:
-			log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
-			if latestRev.getVersion() >= event.Version {
-				log.Debugw("skipping-matching-or-older-revision", log.Fields{
-					"watch":          latestRev.GetName(),
-					"watch-version":  event.Version,
-					"latest-version": latestRev.getVersion(),
-				})
-				continue
-			} else {
-				log.Debugw("watch-revision-is-newer", log.Fields{
-					"watch":          latestRev.GetName(),
-					"watch-version":  event.Version,
-					"latest-version": latestRev.getVersion(),
-				})
-			}
-
-			data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
-
-			if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
-				log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
-			} else {
-				log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
-
-				var pathLock string
-
-				// The watch reported new persistence data.
-				// Construct an object that will be used to update the memory
-				blobs := make(map[string]*kvstore.KVPair)
-				key, _ := kvstore.ToString(event.Key)
-				blobs[key] = &kvstore.KVPair{
-					Key:     key,
-					Value:   event.Value,
-					Session: "",
-					Lease:   0,
-					Version: event.Version,
-				}
-
-				if latestRev.getNode().GetProxy() != nil {
-					//
-					// If a proxy exists for this revision, use it to lock access to the path
-					// and prevent simultaneous updates to the object in memory
-					//
-
-					//If the proxy already has a request in progress, then there is no need to process the watch
-					if latestRev.getNode().GetProxy().GetOperation() != ProxyNone {
-						log.Debugw("operation-in-progress", log.Fields{
-							"key":       latestRev.GetHash(),
-							"path":      latestRev.getNode().GetProxy().getFullPath(),
-							"operation": latestRev.getNode().GetProxy().operation.String(),
-						})
-						continue
-					}
-
-					pathLock, _ = latestRev.getNode().GetProxy().parseForControlledPath(latestRev.getNode().GetProxy().getFullPath())
-
-					// Reserve the path to prevent others to modify while we reload from persistence
-					if _, err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
-						log.Errorw("Unable to acquire a key and set it to a given value", log.Fields{"error": err})
-					}
-					latestRev.getNode().GetProxy().SetOperation(ProxyWatch)
-
-					// Load changes and apply to memory
-					if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
-						log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
-					}
-
-					// Release path
-					if err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_"); err != nil {
-						log.Errorw("Unable to release reservation for a specific key", log.Fields{"error": err})
-					}
-				} else {
-					// This block should be reached only if coming from a non-proxied request
-					log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
-
-					// Load changes and apply to memory
-					if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
-						log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
-					}
-				}
-			}
-
-		default:
-			log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
-		}
-	}
-
-	watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
-
-	log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
-}
-
 // UpdateData modifies the information in the data model and saves it in the persistent storage
 func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
 	log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
@@ -274,12 +110,11 @@
 	newNPR := pr.Revision.UpdateData(ctx, data, branch)
 
 	newPR := &PersistedRevision{
-		Revision:  newNPR,
-		Compress:  pr.Compress,
-		kvStore:   pr.kvStore,
-		events:    pr.events,
-		Version:   pr.getVersion(),
-		isWatched: pr.isWatched,
+		Revision: newNPR,
+		Compress: pr.Compress,
+		kvStore:  pr.kvStore,
+		events:   pr.events,
+		Version:  pr.getVersion(),
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
@@ -300,12 +135,11 @@
 	newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
 
 	newPR := &PersistedRevision{
-		Revision:  newNPR,
-		Compress:  pr.Compress,
-		kvStore:   pr.kvStore,
-		events:    pr.events,
-		Version:   pr.getVersion(),
-		isWatched: pr.isWatched,
+		Revision: newNPR,
+		Compress: pr.Compress,
+		kvStore:  pr.kvStore,
+		events:   pr.events,
+		Version:  pr.getVersion(),
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
@@ -325,12 +159,11 @@
 	newNPR := pr.Revision.UpdateAllChildren(ctx, children, branch)
 
 	newPR := &PersistedRevision{
-		Revision:  newNPR,
-		Compress:  pr.Compress,
-		kvStore:   pr.kvStore,
-		events:    pr.events,
-		Version:   pr.getVersion(),
-		isWatched: pr.isWatched,
+		Revision: newNPR,
+		Compress: pr.Compress,
+		kvStore:  pr.kvStore,
+		events:   pr.events,
+		Version:  pr.getVersion(),
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
@@ -357,11 +190,6 @@
 	pr.mutex.Lock()
 	defer pr.mutex.Unlock()
 	if pr.kvStore != nil && txid == "" {
-		if pr.isWatched {
-			pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
-			pr.isWatched = false
-		}
-
 		if err := pr.kvStore.Delete(ctx, pr.GetName()); err != nil {
 			log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
 		} else {
@@ -412,7 +240,6 @@
 			updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
 
 			updatedChildRev.getNode().SetProxy(childRev.getNode().GetProxy())
-			updatedChildRev.SetupWatch(ctx, updatedChildRev.GetName())
 			updatedChildRev.SetLastUpdate()
 			updatedChildRev.(*PersistedRevision).setVersion(version)
 
@@ -482,7 +309,6 @@
 
 		// We need to start watching this entry for future changes
 		childRev.SetName(typeName + "/" + keyValue)
-		childRev.SetupWatch(ctx, childRev.GetName())
 		childRev.(*PersistedRevision).setVersion(version)
 
 		// Add entry to cache