VOL-1497 : Further improved data synchronization between cores

- Introduced locking when modifying branches
- Introduced locking when modifying rev children
- Rewrote persistence loading logic to avoid unecessary changes
- Access controlled CreateProxy to ensure a proxy is not created
  against an incomplete device entry
- Removed locking logic from etcd client
- Replaced revision merging logic with persistence loading

VOL-1544 : Cleanup revisions to improve overall performance

- Ensure that old revisions are discarded
- Ensure that children do not contain discarded revisions
- Disabled cache logic for now

Change-Id: I1b952c82aba379fce64a47a71b5309a6f28fb5ff
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index c2a6c64..cf7ff9e 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,9 +19,7 @@
 import (
 	"bytes"
 	"compress/gzip"
-	"encoding/hex"
 	"github.com/golang/protobuf/proto"
-	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
 	"reflect"
@@ -42,6 +40,20 @@
 	isWatched bool
 }
 
+type watchCache struct {
+	Cache sync.Map
+}
+
+var watchCacheInstance *watchCache
+var watchCacheOne sync.Once
+
+func Watches() *watchCache {
+	watchCacheOne.Do(func() {
+		watchCacheInstance = &watchCache{Cache: sync.Map{}}
+	})
+	return watchCacheInstance
+}
+
 // NewPersistedRevision creates a new instance of a PersistentRevision structure
 func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
 	pr := &PersistedRevision{}
@@ -55,11 +67,6 @@
 	pr.store(skipOnExist)
 }
 
-type revData struct {
-	Children map[string][]string
-	Config   string
-}
-
 func (pr *PersistedRevision) store(skipOnExist bool) {
 	if pr.GetBranch().Txid != "" {
 		return
@@ -92,97 +99,43 @@
 }
 
 func (pr *PersistedRevision) SetupWatch(key string) {
+	if key == "" {
+		log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+		return
+	}
+
+	if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
+		return
+	}
+
 	if pr.events == nil {
 		pr.events = make(chan *kvstore.Event)
 
-		log.Debugw("setting-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+		log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
 
 		pr.SetName(key)
 		pr.events = pr.kvStore.CreateWatch(key)
+	}
 
+	if !pr.isWatched {
 		pr.isWatched = true
 
+		log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+
 		// Start watching
 		go pr.startWatching()
 	}
 }
 
-func (pr *PersistedRevision) updateInMemory(data interface{}) {
-	pr.mutex.Lock()
-	defer pr.mutex.Unlock()
-
-	var pac *proxyAccessControl
-	var pathLock string
-
-	//
-	// If a proxy exists for this revision, use it to lock access to the path
-	// and prevent simultaneous updates to the object in memory
-	//
-	if pr.GetNode().GetProxy() != nil {
-		pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
-
-		// If the proxy already has a request in progress, then there is no need to process the watch
-		log.Debugw("update-in-memory--checking-pathlock", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
-		if PAC().IsReserved(pathLock) {
-			switch pr.GetNode().GetRoot().GetProxy().Operation {
-			case PROXY_ADD:
-				fallthrough
-			case PROXY_REMOVE:
-				fallthrough
-			case PROXY_UPDATE:
-				log.Debugw("update-in-memory--skipping", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
-				return
-			default:
-				log.Debugw("update-in-memory--operation", log.Fields{"operation": pr.GetNode().GetRoot().GetProxy().Operation})
-			}
-		} else {
-			log.Debugw("update-in-memory--path-not-locked", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
-		}
-
-		log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
-
-		pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
-		pac.SetProxy(pr.GetNode().GetProxy())
-		pac.lock()
-
-		defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
-		defer pac.unlock()
-		defer PAC().ReleasePath(pathLock)
-	}
-
-	//
-	// Update the object in memory through a transaction
-	// This will allow for the object to be subsequently merged with any changes
-	// that might have occurred in memory
-	//
-
-	log.Debugw("update-in-memory--custom-transaction", log.Fields{"key": pr.GetHash()})
-
-	// Prepare the transaction
-	branch := pr.GetBranch()
-	latest := branch.GetLatest()
-	txidBin, _ := uuid.New().MarshalBinary()
-	txid := hex.EncodeToString(txidBin)[:12]
-
-	makeBranch := func(node *node) *Branch {
-		return node.MakeBranch(txid)
-	}
-
-	// Apply the update in a transaction branch
-	updatedRev := latest.GetNode().Update("", data, false, txid, makeBranch)
-	updatedRev.SetName(latest.GetName())
-
-	// Merge the transaction branch in memory
-	if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
-		branch.SetLatest(mergedRev)
-	}
-}
-
 func (pr *PersistedRevision) startWatching() {
-	log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
+	log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "stack": string(debug.Stack())})
 
 StopWatchLoop:
 	for {
+		if pr.IsDiscarded() {
+			break StopWatchLoop
+		}
+
 		select {
 		case event, ok := <-pr.events:
 			if !ok {
@@ -209,7 +162,9 @@
 					if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
 						log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
 					} else {
-						pr.updateInMemory(data.Interface())
+						if pr.GetNode().GetProxy() != nil {
+							pr.LoadFromPersistence(pr.GetNode().GetProxy().getFullPath(), "")
+						}
 					}
 				}
 
@@ -219,110 +174,9 @@
 		}
 	}
 
-	log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
-}
+	Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
 
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
-	log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
-
-	var response []Revision
-	var rev Revision
-
-	rev = pr
-
-	if pr.kvStore != nil && path != "" {
-		blobMap, _ := pr.kvStore.List(path)
-
-		partition := strings.SplitN(path, "/", 2)
-		name := partition[0]
-
-		if len(partition) < 2 {
-			path = ""
-		} else {
-			path = partition[1]
-		}
-
-		field := ChildrenFields(rev.GetBranch().Node.Type)[name]
-
-		if field != nil && field.IsContainer {
-			var children []Revision
-			children = make([]Revision, len(rev.GetChildren(name)))
-			copy(children, rev.GetChildren(name))
-			existChildMap := make(map[string]int)
-			for i, child := range rev.GetChildren(name) {
-				existChildMap[child.GetHash()] = i
-			}
-
-			for _, blob := range blobMap {
-				output := blob.Value.([]byte)
-
-				data := reflect.New(field.ClassType.Elem())
-
-				if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
-					log.Errorw(
-						"loading-from-persistence--failed-to-unmarshal",
-						log.Fields{"path": path, "txid": txid, "error": err},
-					)
-				} else if field.Key != "" {
-					var key reflect.Value
-					var keyValue interface{}
-					var keyStr string
-
-					if path == "" {
-						// e.g. /logical_devices --> path="" name=logical_devices key=""
-						_, key = GetAttributeValue(data.Interface(), field.Key, 0)
-						keyStr = key.String()
-
-					} else {
-						// e.g.
-						// /logical_devices/abcde --> path="abcde" name=logical_devices
-						// /logical_devices/abcde/image_downloads --> path="abcde/image_downloads" name=logical_devices
-
-						partition := strings.SplitN(path, "/", 2)
-						key := partition[0]
-						if len(partition) < 2 {
-							path = ""
-						} else {
-							path = partition[1]
-						}
-						keyValue = field.KeyFromStr(key)
-						keyStr = keyValue.(string)
-
-						if idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue); childRev != nil {
-							// Key is memory, continue recursing path
-							if newChildRev := childRev.LoadFromPersistence(path, txid); newChildRev != nil {
-								children[idx] = newChildRev[0]
-
-								rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
-								rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
-
-								response = append(response, newChildRev[0])
-								continue
-							}
-						}
-					}
-
-					childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
-					childRev.SetName(name + "/" + keyStr)
-
-					// Do not process a child that is already in memory
-					if _, childExists := existChildMap[childRev.GetHash()]; !childExists {
-						// Create watch for <component>/<key>
-						childRev.SetupWatch(childRev.GetName())
-
-						children = append(children, childRev)
-						rev = rev.UpdateChildren(name, children, rev.GetBranch())
-
-						rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
-					}
-					response = append(response, childRev)
-					continue
-				}
-			}
-		}
-	}
-
-	return response
+	log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "stack": string(debug.Stack())})
 }
 
 // UpdateData modifies the information in the data model and saves it in the persistent storage
@@ -335,6 +189,17 @@
 		Revision: newNPR,
 		Compress: pr.Compress,
 		kvStore:  pr.kvStore,
+		events:   pr.events,
+	}
+
+	if newPR.GetHash() != pr.GetHash() {
+		newPR.isWatched = false
+		newPR.isStored = false
+		pr.Drop(branch.Txid, false)
+		newPR.SetupWatch(newPR.GetName())
+	} else {
+		newPR.isWatched = true
+		newPR.isStored = true
 	}
 
 	return newPR
@@ -350,6 +215,17 @@
 		Revision: newNPR,
 		Compress: pr.Compress,
 		kvStore:  pr.kvStore,
+		events:   pr.events,
+	}
+
+	if newPR.GetHash() != pr.GetHash() {
+		newPR.isWatched = false
+		newPR.isStored = false
+		pr.Drop(branch.Txid, false)
+		newPR.SetupWatch(newPR.GetName())
+	} else {
+		newPR.isWatched = true
+		newPR.isStored = true
 	}
 
 	return newPR
@@ -365,6 +241,17 @@
 		Revision: newNPR,
 		Compress: pr.Compress,
 		kvStore:  pr.kvStore,
+		events:   pr.events,
+	}
+
+	if newPR.GetHash() != pr.GetHash() {
+		newPR.isWatched = false
+		newPR.isStored = false
+		pr.Drop(branch.Txid, false)
+		newPR.SetupWatch(newPR.GetName())
+	} else {
+		newPR.isWatched = true
+		newPR.isStored = true
 	}
 
 	return newPR
@@ -407,3 +294,182 @@
 
 	pr.Revision.Drop(txid, includeConfig)
 }
+
+// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
+func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
+	rev := pr
+
+	children := make([]Revision, len(rev.GetBranch().GetLatest().GetChildren(typeName)))
+	copy(children, rev.GetBranch().GetLatest().GetChildren(typeName))
+
+	// Verify if the revision contains a child that matches that key
+	if childIdx, childRev := rev.GetNode().findRevByKey(rev.GetBranch().GetLatest().GetChildren(typeName), keyName, keyValue); childRev != nil {
+		// A child matching the provided key exists in memory
+		// Verify if the data differs to what was retrieved from persistence
+		if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
+			log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
+				"key":  childRev.GetHash(),
+				"name": childRev.GetName(),
+			})
+
+			// Data has changed; replace the child entry and update the parent revision
+			updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+			updatedChildRev.SetupWatch(updatedChildRev.GetName())
+			childRev.Drop(txid, false)
+
+			if childIdx >= 0 {
+				children[childIdx] = updatedChildRev
+			} else {
+				children = append(children, updatedChildRev)
+			}
+
+			rev.GetBranch().LatestLock.Lock()
+			updatedRev := rev.UpdateChildren(typeName, children, rev.GetBranch())
+			rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
+			rev.GetBranch().LatestLock.Unlock()
+
+			// Drop the previous child revision
+			rev.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
+
+			if updatedChildRev != nil {
+				log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+					"key":  updatedChildRev.GetHash(),
+					"name": updatedChildRev.GetName(),
+				})
+				response = updatedChildRev
+			}
+		} else {
+			// Data is the same. Continue to the next entry
+			log.Debugw("verify-persisted-entry--same-data", log.Fields{
+				"key":  childRev.GetHash(),
+				"name": childRev.GetName(),
+			})
+			if childRev != nil {
+				log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
+					"key":  childRev.GetHash(),
+					"name": childRev.GetName(),
+				})
+				response = childRev
+			}
+		}
+	} else {
+		// There is no available child with that key value.
+		// Create a new child and update the parent revision.
+		log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
+			"key":  keyValue,
+			"name": typeName,
+		})
+
+		// Construct a new child node with the retrieved persistence data
+		childRev = rev.GetBranch().Node.MakeNode(data, txid).Latest(txid)
+
+		// We need to start watching this entry for future changes
+		childRev.SetName(typeName + "/" + keyValue)
+
+		// Add the child to the parent revision
+		rev.GetBranch().LatestLock.Lock()
+		children = append(children, childRev)
+		updatedRev := rev.GetBranch().Node.Latest().UpdateChildren(typeName, children, rev.GetBranch())
+		childRev.SetupWatch(childRev.GetName())
+
+		//rev.GetBranch().Node.Latest().Drop(txid, false)
+		rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
+		rev.GetBranch().LatestLock.Unlock()
+
+		// Child entry is valid and can be included in the response object
+		if childRev != nil {
+			log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+				"key":  childRev.GetHash(),
+				"name": childRev.GetName(),
+			})
+			response = childRev
+		}
+	}
+
+	return response
+}
+
+// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
+// by adding missing entries, updating changed entries and ignoring unchanged ones
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+	pr.mutex.Lock()
+	defer pr.mutex.Unlock()
+
+	log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
+
+	var response []Revision
+	var rev Revision
+
+	rev = pr
+
+	if pr.kvStore != nil && path != "" {
+		blobMap, _ := pr.kvStore.List(path)
+
+		partition := strings.SplitN(path, "/", 2)
+		name := partition[0]
+
+		if len(partition) < 2 {
+			path = ""
+		} else {
+			path = partition[1]
+		}
+
+		field := ChildrenFields(rev.GetBranch().Node.Type)[name]
+
+		if field != nil && field.IsContainer {
+			log.Debugw("load-from-persistence--start-blobs", log.Fields{
+				"path": path,
+				"name": name,
+				"size": len(blobMap),
+			})
+
+			for _, blob := range blobMap {
+				output := blob.Value.([]byte)
+
+				data := reflect.New(field.ClassType.Elem())
+
+				if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
+					log.Errorw("load-from-persistence--failed-to-unmarshal", log.Fields{
+						"path":  path,
+						"txid":  txid,
+						"error": err,
+					})
+				} else if path == "" {
+					if field.Key != "" {
+						// Retrieve the key identifier value from the data structure
+						// based on the field's key attribute
+						_, key := GetAttributeValue(data.Interface(), field.Key, 0)
+
+						if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
+							response = append(response, entry)
+						}
+					}
+
+				} else if field.Key != "" {
+					// The request is for a specific entry/id
+					partition := strings.SplitN(path, "/", 2)
+					key := partition[0]
+					if len(partition) < 2 {
+						path = ""
+					} else {
+						path = partition[1]
+					}
+					keyValue := field.KeyFromStr(key)
+
+					if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
+						response = append(response, entry)
+					}
+				}
+			}
+
+			log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
+		} else {
+			log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
+				"type": rev.GetBranch().Node.Type,
+				"name": name,
+			})
+		}
+	}
+
+	return response
+}
\ No newline at end of file