VOL-1687 : Fix wrong in-memory node assignments
- Fixed nil pointer with createProxy
- Changed watch loop to avoid re-starting when rev changes
Change-Id: Ie821788f2422d7a2083398c65b9632c65fae001d
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 226fc3c..2ab91b7 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -127,34 +127,31 @@
StopWatchLoop:
for {
- if pr.IsDiscarded() {
- break StopWatchLoop
- }
+ latestRev := pr.GetBranch().GetLatest()
select {
case event, ok := <-pr.events:
if !ok {
- log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+ log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
break StopWatchLoop
}
-
- log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.GetName()})
+ log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
switch event.EventType {
case kvstore.DELETE:
- log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+ log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
pr.Revision.Drop("", true)
break StopWatchLoop
case kvstore.PUT:
- log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+ log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
- data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
+ data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
- log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
+ log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
} else {
- log.Debugw("un-marshaled-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "data": data.Interface()})
+ log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
var pathLock string
var pac *proxyAccessControl
@@ -171,24 +168,26 @@
Lease: 0,
}
- if pr.GetNode().GetProxy() != nil {
+ if latestRev.GetNode().GetProxy() != nil {
//
// If a proxy exists for this revision, use it to lock access to the path
// and prevent simultaneous updates to the object in memory
//
- pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+ pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
//If the proxy already has a request in progress, then there is no need to process the watch
- log.Debugw("checking-if-path-is-locked", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+ log.Debugw("checking-if-path-is-locked", log.Fields{"key": latestRev.GetHash(), "pathLock": pathLock})
if PAC().IsReserved(pathLock) {
log.Debugw("operation-in-progress", log.Fields{
- "key": pr.GetHash(),
- "path": pr.GetNode().GetProxy().getFullPath(),
- "operation": pr.GetNode().GetProxy().Operation.String(),
+ "key": latestRev.GetHash(),
+ "path": latestRev.GetNode().GetProxy().getFullPath(),
+ "operation": latestRev.GetNode().GetProxy().Operation.String(),
})
+ //continue
+
// Identify the operation type and determine if the watch event should be applied or not.
- switch pr.GetNode().GetProxy().Operation {
+ switch latestRev.GetNode().GetProxy().Operation {
case PROXY_REMOVE:
fallthrough
@@ -200,9 +199,9 @@
// Therefore, the data of the current event is most likely out-dated
// and should be ignored
log.Debugw("ignore-watch-event", log.Fields{
- "key": pr.GetHash(),
- "path": pr.GetNode().GetProxy().getFullPath(),
- "operation": pr.GetNode().GetProxy().Operation.String(),
+ "key": latestRev.GetHash(),
+ "path": latestRev.GetNode().GetProxy().getFullPath(),
+ "operation": latestRev.GetNode().GetProxy().Operation.String(),
})
continue
@@ -216,41 +215,45 @@
case PROXY_GET:
fallthrough
+ case PROXY_WATCH:
+ fallthrough
+
default:
log.Debugw("process-watch-event", log.Fields{
- "key": pr.GetHash(),
- "path": pr.GetNode().GetProxy().getFullPath(),
- "operation": pr.GetNode().GetProxy().Operation.String(),
+ "key": latestRev.GetHash(),
+ "path": latestRev.GetNode().GetProxy().getFullPath(),
+ "operation": latestRev.GetNode().GetProxy().Operation.String(),
})
}
}
// Reserve the path to prevent others to modify while we reload from persistence
- log.Debugw("reserve-and-lock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
- pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+ log.Debugw("reserve-and-lock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
+ pac = PAC().ReservePath(latestRev.GetNode().GetProxy().getFullPath(),
+ latestRev.GetNode().GetProxy(), pathLock)
pac.lock()
- pr.GetNode().GetProxy().Operation = PROXY_UPDATE
- pac.SetProxy(pr.GetNode().GetProxy())
+ latestRev.GetNode().GetProxy().Operation = PROXY_WATCH
+ pac.SetProxy(latestRev.GetNode().GetProxy())
// Load changes and apply to memory
- pr.LoadFromPersistence(pr.GetName(), "", blobs)
+ latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
- log.Debugw("release-and-unlock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
+ log.Debugw("release-and-unlock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
pac.getProxy().Operation = PROXY_GET
pac.unlock()
PAC().ReleasePath(pathLock)
} else {
// This block should be reached only if coming from a non-proxied request
- log.Debugw("revision-with-no-proxy", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+ log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
// Load changes and apply to memory
- pr.LoadFromPersistence(pr.GetName(), "", blobs)
+ latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
}
}
default:
- log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "type": event.EventType})
+ log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
}
}
}
@@ -267,19 +270,18 @@
newNPR := pr.Revision.UpdateData(data, branch)
newPR := &PersistedRevision{
- Revision: newNPR,
- Compress: pr.Compress,
- kvStore: pr.kvStore,
- events: pr.events,
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ events: pr.events,
+ isWatched: pr.isWatched,
}
if newPR.GetHash() != pr.GetHash() {
- newPR.isWatched = false
newPR.isStored = false
pr.Drop(branch.Txid, false)
pr.Drop(branch.Txid, false)
} else {
- newPR.isWatched = true
newPR.isStored = true
}
@@ -294,18 +296,17 @@
newNPR := pr.Revision.UpdateChildren(name, children, branch)
newPR := &PersistedRevision{
- Revision: newNPR,
- Compress: pr.Compress,
- kvStore: pr.kvStore,
- events: pr.events,
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ events: pr.events,
+ isWatched: pr.isWatched,
}
if newPR.GetHash() != pr.GetHash() {
- newPR.isWatched = false
newPR.isStored = false
pr.Drop(branch.Txid, false)
} else {
- newPR.isWatched = true
newPR.isStored = true
}
@@ -319,18 +320,17 @@
newNPR := pr.Revision.UpdateAllChildren(children, branch)
newPR := &PersistedRevision{
- Revision: newNPR,
- Compress: pr.Compress,
- kvStore: pr.kvStore,
- events: pr.events,
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ events: pr.events,
+ isWatched: pr.isWatched,
}
if newPR.GetHash() != pr.GetHash() {
- newPR.isWatched = false
newPR.isStored = false
pr.Drop(branch.Txid, false)
} else {
- newPR.isWatched = true
newPR.isStored = true
}
@@ -388,6 +388,7 @@
if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
// A child matching the provided key exists in memory
// Verify if the data differs from what was retrieved from persistence
+ // Also check if we are treating a newer revision of the data or not
if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
log.Debugw("revision-data-is-different", log.Fields{
"key": childRev.GetHash(),
@@ -407,11 +408,11 @@
updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
updatedChildRev.SetupWatch(updatedChildRev.GetName())
- childRev.Drop(txid, false)
updatedChildRev.SetLastUpdate()
// Update cache
GetRevCache().Cache.Store(updatedChildRev.GetName(), updatedChildRev)
+ childRev.Drop(txid, false)
childRev.GetBranch().LatestLock.Unlock()
// END lock child
@@ -459,6 +460,7 @@
response = childRev
}
}
+
} else {
// There is no available child with that key value.
// Create a new child and update the parent revision.
@@ -478,7 +480,6 @@
childRev.SetName(typeName + "/" + keyValue)
childRev.SetupWatch(childRev.GetName())
- pr.GetBranch().Node.makeLatest(pr.GetBranch(), childRev, nil)
pr.GetBranch().LatestLock.Unlock()
// END child lock
@@ -491,7 +492,6 @@
children = append(children, childRev)
updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
-
parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
parent.GetBranch(NONE).LatestLock.Unlock()
// END parent lock
@@ -512,8 +512,7 @@
// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
// by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(
- path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
pr.mutex.Lock()
defer pr.mutex.Unlock()
@@ -575,8 +574,7 @@
// based on the field's key attribute
_, key := GetAttributeValue(data.Interface(), field.Key, 0)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(),
- txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
response = append(response, entry)
}
} else {
@@ -603,8 +601,7 @@
}
keyValue := field.KeyFromStr(key)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string),
- txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
response = append(response, entry)
}
}