VOL-1687 : Fix wrong in-memory node assignments

- Fixed nil pointer with createProxy
- Changed watch loop to avoid re-starting when rev changes

Change-Id: Ie821788f2422d7a2083398c65b9632c65fae001d
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 226fc3c..2ab91b7 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -127,34 +127,31 @@
 
 StopWatchLoop:
 	for {
-		if pr.IsDiscarded() {
-			break StopWatchLoop
-		}
+		latestRev := pr.GetBranch().GetLatest()
 
 		select {
 		case event, ok := <-pr.events:
 			if !ok {
-				log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+				log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 				break StopWatchLoop
 			}
-
-			log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.GetName()})
+			log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
 
 			switch event.EventType {
 			case kvstore.DELETE:
-				log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+				log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 				pr.Revision.Drop("", true)
 				break StopWatchLoop
 
 			case kvstore.PUT:
-				log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+				log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 
-				data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
+				data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
 
 				if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
-					log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
+					log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
 				} else {
-					log.Debugw("un-marshaled-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "data": data.Interface()})
+					log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
 
 					var pathLock string
 					var pac *proxyAccessControl
@@ -171,24 +168,26 @@
 						Lease:   0,
 					}
 
-					if pr.GetNode().GetProxy() != nil {
+					if latestRev.GetNode().GetProxy() != nil {
 						//
 						// If a proxy exists for this revision, use it to lock access to the path
 						// and prevent simultaneous updates to the object in memory
 						//
-						pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+						pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
 
 						//If the proxy already has a request in progress, then there is no need to process the watch
-						log.Debugw("checking-if-path-is-locked", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+						log.Debugw("checking-if-path-is-locked", log.Fields{"key": latestRev.GetHash(), "pathLock": pathLock})
 						if PAC().IsReserved(pathLock) {
 							log.Debugw("operation-in-progress", log.Fields{
-								"key":       pr.GetHash(),
-								"path":      pr.GetNode().GetProxy().getFullPath(),
-								"operation": pr.GetNode().GetProxy().Operation.String(),
+								"key":       latestRev.GetHash(),
+								"path":      latestRev.GetNode().GetProxy().getFullPath(),
+								"operation": latestRev.GetNode().GetProxy().Operation.String(),
 							})
 
+							//continue
+
 							// Identify the operation type and determine if the watch event should be applied or not.
-							switch pr.GetNode().GetProxy().Operation {
+							switch latestRev.GetNode().GetProxy().Operation {
 							case PROXY_REMOVE:
 								fallthrough
 
@@ -200,9 +199,9 @@
 								// Therefore, the data of the current event is most likely out-dated
 								// and should be ignored
 								log.Debugw("ignore-watch-event", log.Fields{
-									"key":       pr.GetHash(),
-									"path":      pr.GetNode().GetProxy().getFullPath(),
-									"operation": pr.GetNode().GetProxy().Operation.String(),
+									"key":       latestRev.GetHash(),
+									"path":      latestRev.GetNode().GetProxy().getFullPath(),
+									"operation": latestRev.GetNode().GetProxy().Operation.String(),
 								})
 
 								continue
@@ -216,41 +215,45 @@
 							case PROXY_GET:
 								fallthrough
 
+							case PROXY_WATCH:
+								fallthrough
+
 							default:
 								log.Debugw("process-watch-event", log.Fields{
-									"key":       pr.GetHash(),
-									"path":      pr.GetNode().GetProxy().getFullPath(),
-									"operation": pr.GetNode().GetProxy().Operation.String(),
+									"key":       latestRev.GetHash(),
+									"path":      latestRev.GetNode().GetProxy().getFullPath(),
+									"operation": latestRev.GetNode().GetProxy().Operation.String(),
 								})
 							}
 						}
 
 						// Reserve the path to prevent others to modify while we reload from persistence
-						log.Debugw("reserve-and-lock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
-						pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+						log.Debugw("reserve-and-lock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
+						pac = PAC().ReservePath(latestRev.GetNode().GetProxy().getFullPath(),
+							latestRev.GetNode().GetProxy(), pathLock)
 						pac.lock()
-						pr.GetNode().GetProxy().Operation = PROXY_UPDATE
-						pac.SetProxy(pr.GetNode().GetProxy())
+						latestRev.GetNode().GetProxy().Operation = PROXY_WATCH
+						pac.SetProxy(latestRev.GetNode().GetProxy())
 
 						// Load changes and apply to memory
-						pr.LoadFromPersistence(pr.GetName(), "", blobs)
+						latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
 
-						log.Debugw("release-and-unlock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
+						log.Debugw("release-and-unlock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
 						pac.getProxy().Operation = PROXY_GET
 						pac.unlock()
 						PAC().ReleasePath(pathLock)
 
 					} else {
 						// This block should be reached only if coming from a non-proxied request
-						log.Debugw("revision-with-no-proxy", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+						log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 
 						// Load changes and apply to memory
-						pr.LoadFromPersistence(pr.GetName(), "", blobs)
+						latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
 					}
 				}
 
 			default:
-				log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "type": event.EventType})
+				log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
 			}
 		}
 	}
@@ -267,19 +270,18 @@
 	newNPR := pr.Revision.UpdateData(data, branch)
 
 	newPR := &PersistedRevision{
-		Revision: newNPR,
-		Compress: pr.Compress,
-		kvStore:  pr.kvStore,
-		events:   pr.events,
+		Revision:  newNPR,
+		Compress:  pr.Compress,
+		kvStore:   pr.kvStore,
+		events:    pr.events,
+		isWatched: pr.isWatched,
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
-		newPR.isWatched = false
 		newPR.isStored = false
 		pr.Drop(branch.Txid, false)
 		pr.Drop(branch.Txid, false)
 	} else {
-		newPR.isWatched = true
 		newPR.isStored = true
 	}
 
@@ -294,18 +296,17 @@
 	newNPR := pr.Revision.UpdateChildren(name, children, branch)
 
 	newPR := &PersistedRevision{
-		Revision: newNPR,
-		Compress: pr.Compress,
-		kvStore:  pr.kvStore,
-		events:   pr.events,
+		Revision:  newNPR,
+		Compress:  pr.Compress,
+		kvStore:   pr.kvStore,
+		events:    pr.events,
+		isWatched: pr.isWatched,
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
-		newPR.isWatched = false
 		newPR.isStored = false
 		pr.Drop(branch.Txid, false)
 	} else {
-		newPR.isWatched = true
 		newPR.isStored = true
 	}
 
@@ -319,18 +320,17 @@
 	newNPR := pr.Revision.UpdateAllChildren(children, branch)
 
 	newPR := &PersistedRevision{
-		Revision: newNPR,
-		Compress: pr.Compress,
-		kvStore:  pr.kvStore,
-		events:   pr.events,
+		Revision:  newNPR,
+		Compress:  pr.Compress,
+		kvStore:   pr.kvStore,
+		events:    pr.events,
+		isWatched: pr.isWatched,
 	}
 
 	if newPR.GetHash() != pr.GetHash() {
-		newPR.isWatched = false
 		newPR.isStored = false
 		pr.Drop(branch.Txid, false)
 	} else {
-		newPR.isWatched = true
 		newPR.isStored = true
 	}
 
@@ -388,6 +388,7 @@
 	if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
 		// A child matching the provided key exists in memory
 		// Verify if the data differs from what was retrieved from persistence
+		// Also check if we are treating a newer revision of the data or not
 		if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
 			log.Debugw("revision-data-is-different", log.Fields{
 				"key":  childRev.GetHash(),
@@ -407,11 +408,11 @@
 
 			updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
 			updatedChildRev.SetupWatch(updatedChildRev.GetName())
-			childRev.Drop(txid, false)
 			updatedChildRev.SetLastUpdate()
 
 			// Update cache
 			GetRevCache().Cache.Store(updatedChildRev.GetName(), updatedChildRev)
+			childRev.Drop(txid, false)
 
 			childRev.GetBranch().LatestLock.Unlock()
 			// END lock child
@@ -459,6 +460,7 @@
 				response = childRev
 			}
 		}
+
 	} else {
 		// There is no available child with that key value.
 		// Create a new child and update the parent revision.
@@ -478,7 +480,6 @@
 		childRev.SetName(typeName + "/" + keyValue)
 		childRev.SetupWatch(childRev.GetName())
 
-		pr.GetBranch().Node.makeLatest(pr.GetBranch(), childRev, nil)
 		pr.GetBranch().LatestLock.Unlock()
 		// END child lock
 
@@ -491,7 +492,6 @@
 		children = append(children, childRev)
 		updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
 		updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
-
 		parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
 		parent.GetBranch(NONE).LatestLock.Unlock()
 		// END parent lock
@@ -512,8 +512,7 @@
 
 // LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
 // by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(
-	path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
 	pr.mutex.Lock()
 	defer pr.mutex.Unlock()
 
@@ -575,8 +574,7 @@
 						// based on the field's key attribute
 						_, key := GetAttributeValue(data.Interface(), field.Key, 0)
 
-						if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(),
-							txid); entry != nil {
+						if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
 							response = append(response, entry)
 						}
 					} else {
@@ -603,8 +601,7 @@
 					}
 					keyValue := field.KeyFromStr(key)
 
-					if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string),
-						txid); entry != nil {
+					if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
 						response = append(response, entry)
 					}
 				}