VOL-1509 : Partial fix for merging issue

- Changed channel map in etcd to a sync.Map
- Changed graph boundaryPorts to sync.Map
- Added logic to check if proxy access is currently reserved
- Changed watch logic to exit when proxy access in progress
- Fixed UpdateAllChildren method
- Commented out the Drop operation again in node.go

Change-Id: I8a61798e907be0ff6b0785dcc70721708308611d
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 0ecb5ef..f335216 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -40,6 +40,7 @@
 	mutex     sync.RWMutex        `json:"-"`
 	isStored  bool
 	isWatched bool
+	watchName string
 }
 
 // NewPersistedRevision creates a new instance of a PersistentRevision structure
@@ -96,8 +97,9 @@
 	if pr.events == nil {
 		pr.events = make(chan *kvstore.Event)
 
-		log.Debugw("setting-watch", log.Fields{"key": key})
+		log.Debugw("setting-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
 
+		pr.watchName = key
 		pr.events = pr.kvStore.CreateWatch(key)
 
 		pr.isWatched = true
@@ -107,7 +109,61 @@
 	}
 }
 
+//func (pr *PersistedRevision) mergeWithMemory(pacBlock bool) Revision {
+//	if pair, err := pr.kvStore.Get(pr.GetHash()); err != nil {
+//		log.Debugw("merge-with-memory--error-occurred", log.Fields{"hash": pr.GetHash(), "error": err})
+//	} else if pair == nil {
+//		log.Debugw("merge-with-memory--no-data-to-merge", log.Fields{"hash": pr.GetHash()})
+//	} else {
+//		data := reflect.New(reflect.TypeOf(pr.GetData()).Elem()).Interface()
+//
+//		if err := proto.Unmarshal(pair.Value.([]byte), data.(proto.Message)); err != nil {
+//			log.Errorw("merge-with-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
+//		} else {
+//			if pr.GetNode().GetProxy() != nil && pacBlock {
+//				var pac *proxyAccessControl
+//				var pathLock string
+//
+//				pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+//				log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+//
+//				pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+//				pac.SetProxy(pr.GetNode().GetProxy())
+//				pac.lock()
+//
+//				defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+//				defer pac.unlock()
+//				defer PAC().ReleasePath(pathLock)
+//			}
+//			//Prepare the transaction
+//			branch := pr.GetBranch()
+//			//latest := branch.GetLatest()
+//			txidBin, _ := uuid.New().MarshalBinary()
+//			txid := hex.EncodeToString(txidBin)[:12]
+//
+//			makeBranch := func(node *node) *Branch {
+//				return node.MakeBranch(txid)
+//			}
+//
+//			//Apply the update in a transaction branch
+//			updatedRev := pr.GetNode().Update("", data, false, txid, makeBranch)
+//			updatedRev.SetHash(pr.GetHash())
+//
+//			//Merge the transaction branch in memory
+//			if mergedRev, _ := pr.GetNode().MergeBranch(txid, false); mergedRev != nil {
+//				branch.SetLatest(mergedRev)
+//				return mergedRev
+//			}
+//		}
+//	}
+//
+//	return nil
+//}
+
 func (pr *PersistedRevision) updateInMemory(data interface{}) {
+	pr.mutex.Lock()
+	defer pr.mutex.Unlock()
+
 	var pac *proxyAccessControl
 	var pathLock string
 
@@ -116,8 +172,28 @@
 	// and prevent simultaneous updates to the object in memory
 	//
 	if pr.GetNode().GetProxy() != nil {
-		log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
 		pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+
+		// If the proxy already has a request in progress, then there is no need to process the watch
+		log.Debugw("update-in-memory--checking-pathlock", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+		if PAC().IsReserved(pathLock) {
+			switch pr.GetNode().GetRoot().GetProxy().Operation {
+			case PROXY_ADD:
+				fallthrough
+			case PROXY_REMOVE:
+				fallthrough
+			case PROXY_UPDATE:
+				log.Debugw("update-in-memory--skipping", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
+				return
+			default:
+				log.Debugw("update-in-memory--operation", log.Fields{"operation": pr.GetNode().GetRoot().GetProxy().Operation})
+			}
+		} else {
+			log.Debugw("update-in-memory--path-not-locked", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
+		}
+
+		log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+
 		pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
 		pac.SetProxy(pr.GetNode().GetProxy())
 		pac.lock()
@@ -153,9 +229,6 @@
 	if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
 		branch.SetLatest(mergedRev)
 	}
-
-	// The transaction branch must be deleted to free-up memory
-	//latest.GetNode().GetRoot().DeleteTxBranch(txid)
 }
 
 func (pr *PersistedRevision) startWatching() {
@@ -166,40 +239,40 @@
 		select {
 		case event, ok := <-pr.events:
 			if !ok {
-				log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash()})
+				log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
 				break StopWatchLoop
 			}
 
-			log.Debugw("received-event", log.Fields{"type": event.EventType})
+			log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.watchName})
 
 			switch event.EventType {
 			case kvstore.DELETE:
-				log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash()})
+				log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
 				pr.Revision.Drop("", true)
 				break StopWatchLoop
 
 			case kvstore.PUT:
-				log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash()})
+				log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
 
-				if dataPair, err := pr.kvStore.Get(pr.GetHash()); err != nil || dataPair == nil {
-					log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "error": err})
+				if dataPair, err := pr.kvStore.Get(pr.watchName); err != nil || dataPair == nil {
+					log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "error": err})
 				} else {
 					data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
 
 					if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
-						log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
+						log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "error": err})
 					} else {
 						pr.updateInMemory(data.Interface())
 					}
 				}
 
 			default:
-				log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "type": event.EventType})
+				log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "type": event.EventType})
 			}
 		}
 	}
 
-	log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash()})
+	log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
 }
 
 func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {