VOL-1509 : Partial fix for merging issue
- Changed channel map in etcd to a sync.Map
- Changed graph boundaryPorts to sync.Map
- Added logic to check if proxy access is currently reserved
- Changed watch logic to exit when proxy access in progress
- Fixed UpdateAllChildren method
- Commented out the Drop operation again in node.go
Change-Id: I8a61798e907be0ff6b0785dcc70721708308611d
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 0ecb5ef..f335216 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -40,6 +40,7 @@
mutex sync.RWMutex `json:"-"`
isStored bool
isWatched bool
+ watchName string
}
// NewPersistedRevision creates a new instance of a PersistentRevision structure
@@ -96,8 +97,9 @@
if pr.events == nil {
pr.events = make(chan *kvstore.Event)
- log.Debugw("setting-watch", log.Fields{"key": key})
+ log.Debugw("setting-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+ pr.watchName = key
pr.events = pr.kvStore.CreateWatch(key)
pr.isWatched = true
@@ -107,7 +109,61 @@
}
}
+//func (pr *PersistedRevision) mergeWithMemory(pacBlock bool) Revision {
+// if pair, err := pr.kvStore.Get(pr.GetHash()); err != nil {
+// log.Debugw("merge-with-memory--error-occurred", log.Fields{"hash": pr.GetHash(), "error": err})
+// } else if pair == nil {
+// log.Debugw("merge-with-memory--no-data-to-merge", log.Fields{"hash": pr.GetHash()})
+// } else {
+// data := reflect.New(reflect.TypeOf(pr.GetData()).Elem()).Interface()
+//
+// if err := proto.Unmarshal(pair.Value.([]byte), data.(proto.Message)); err != nil {
+// log.Errorw("merge-with-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
+// } else {
+// if pr.GetNode().GetProxy() != nil && pacBlock {
+// var pac *proxyAccessControl
+// var pathLock string
+//
+// pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+// log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+//
+// pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+// pac.SetProxy(pr.GetNode().GetProxy())
+// pac.lock()
+//
+// defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+// defer pac.unlock()
+// defer PAC().ReleasePath(pathLock)
+// }
+// //Prepare the transaction
+// branch := pr.GetBranch()
+// //latest := branch.GetLatest()
+// txidBin, _ := uuid.New().MarshalBinary()
+// txid := hex.EncodeToString(txidBin)[:12]
+//
+// makeBranch := func(node *node) *Branch {
+// return node.MakeBranch(txid)
+// }
+//
+// //Apply the update in a transaction branch
+// updatedRev := pr.GetNode().Update("", data, false, txid, makeBranch)
+// updatedRev.SetHash(pr.GetHash())
+//
+// //Merge the transaction branch in memory
+// if mergedRev, _ := pr.GetNode().MergeBranch(txid, false); mergedRev != nil {
+// branch.SetLatest(mergedRev)
+// return mergedRev
+// }
+// }
+// }
+//
+// return nil
+//}
+
func (pr *PersistedRevision) updateInMemory(data interface{}) {
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
+
var pac *proxyAccessControl
var pathLock string
@@ -116,8 +172,28 @@
// and prevent simultaneous updates to the object in memory
//
if pr.GetNode().GetProxy() != nil {
- log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+
+ // If the proxy already has a request in progress, then there is no need to process the watch
+ log.Debugw("update-in-memory--checking-pathlock", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+ if PAC().IsReserved(pathLock) {
+ switch pr.GetNode().GetRoot().GetProxy().Operation {
+ case PROXY_ADD:
+ fallthrough
+ case PROXY_REMOVE:
+ fallthrough
+ case PROXY_UPDATE:
+ log.Debugw("update-in-memory--skipping", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
+ return
+ default:
+ log.Debugw("update-in-memory--operation", log.Fields{"operation": pr.GetNode().GetRoot().GetProxy().Operation})
+ }
+ } else {
+ log.Debugw("update-in-memory--path-not-locked", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
+ }
+
+ log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+
pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
pac.SetProxy(pr.GetNode().GetProxy())
pac.lock()
@@ -153,9 +229,6 @@
if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
branch.SetLatest(mergedRev)
}
-
- // The transaction branch must be deleted to free-up memory
- //latest.GetNode().GetRoot().DeleteTxBranch(txid)
}
func (pr *PersistedRevision) startWatching() {
@@ -166,40 +239,40 @@
select {
case event, ok := <-pr.events:
if !ok {
- log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash()})
+ log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
break StopWatchLoop
}
- log.Debugw("received-event", log.Fields{"type": event.EventType})
+ log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.watchName})
switch event.EventType {
case kvstore.DELETE:
- log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash()})
+ log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
pr.Revision.Drop("", true)
break StopWatchLoop
case kvstore.PUT:
- log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash()})
+ log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
- if dataPair, err := pr.kvStore.Get(pr.GetHash()); err != nil || dataPair == nil {
- log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "error": err})
+ if dataPair, err := pr.kvStore.Get(pr.watchName); err != nil || dataPair == nil {
+ log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "error": err})
} else {
data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
- log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
+ log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "error": err})
} else {
pr.updateInMemory(data.Interface())
}
}
default:
- log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "type": event.EventType})
+ log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "type": event.EventType})
}
}
}
- log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash()})
+ log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
}
func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {