VOL-1497 : Further improved data synchronization between cores
- Introduced locking when modifying branches
- Introduced locking when modifying rev children
- Rewrote persistence loading logic to avoid unecessary changes
- Access controlled CreateProxy to ensure a proxy is not created
against an incomplete device entry
- Removed locking logic from etcd client
- Replaced revision merging logic with persistence loading
VOL-1544 : Cleanup revisions to improve overall performance
- Ensure that old revisions are discarded
- Ensure that children do not contain discarded revisions
- Disabled cache logic for now
Change-Id: I1b952c82aba379fce64a47a71b5309a6f28fb5ff
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index c2a6c64..cf7ff9e 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,9 +19,7 @@
import (
"bytes"
"compress/gzip"
- "encoding/hex"
"github.com/golang/protobuf/proto"
- "github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"reflect"
@@ -42,6 +40,20 @@
isWatched bool
}
+type watchCache struct {
+ Cache sync.Map
+}
+
+var watchCacheInstance *watchCache
+var watchCacheOne sync.Once
+
+func Watches() *watchCache {
+ watchCacheOne.Do(func() {
+ watchCacheInstance = &watchCache{Cache: sync.Map{}}
+ })
+ return watchCacheInstance
+}
+
// NewPersistedRevision creates a new instance of a PersistentRevision structure
func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
pr := &PersistedRevision{}
@@ -55,11 +67,6 @@
pr.store(skipOnExist)
}
-type revData struct {
- Children map[string][]string
- Config string
-}
-
func (pr *PersistedRevision) store(skipOnExist bool) {
if pr.GetBranch().Txid != "" {
return
@@ -92,97 +99,43 @@
}
func (pr *PersistedRevision) SetupWatch(key string) {
+ if key == "" {
+ log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+ return
+ }
+
+ if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
+ return
+ }
+
if pr.events == nil {
pr.events = make(chan *kvstore.Event)
- log.Debugw("setting-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+ log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
pr.SetName(key)
pr.events = pr.kvStore.CreateWatch(key)
+ }
+ if !pr.isWatched {
pr.isWatched = true
+ log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+
// Start watching
go pr.startWatching()
}
}
-func (pr *PersistedRevision) updateInMemory(data interface{}) {
- pr.mutex.Lock()
- defer pr.mutex.Unlock()
-
- var pac *proxyAccessControl
- var pathLock string
-
- //
- // If a proxy exists for this revision, use it to lock access to the path
- // and prevent simultaneous updates to the object in memory
- //
- if pr.GetNode().GetProxy() != nil {
- pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
-
- // If the proxy already has a request in progress, then there is no need to process the watch
- log.Debugw("update-in-memory--checking-pathlock", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
- if PAC().IsReserved(pathLock) {
- switch pr.GetNode().GetRoot().GetProxy().Operation {
- case PROXY_ADD:
- fallthrough
- case PROXY_REMOVE:
- fallthrough
- case PROXY_UPDATE:
- log.Debugw("update-in-memory--skipping", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
- return
- default:
- log.Debugw("update-in-memory--operation", log.Fields{"operation": pr.GetNode().GetRoot().GetProxy().Operation})
- }
- } else {
- log.Debugw("update-in-memory--path-not-locked", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
- }
-
- log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
-
- pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
- pac.SetProxy(pr.GetNode().GetProxy())
- pac.lock()
-
- defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
- defer pac.unlock()
- defer PAC().ReleasePath(pathLock)
- }
-
- //
- // Update the object in memory through a transaction
- // This will allow for the object to be subsequently merged with any changes
- // that might have occurred in memory
- //
-
- log.Debugw("update-in-memory--custom-transaction", log.Fields{"key": pr.GetHash()})
-
- // Prepare the transaction
- branch := pr.GetBranch()
- latest := branch.GetLatest()
- txidBin, _ := uuid.New().MarshalBinary()
- txid := hex.EncodeToString(txidBin)[:12]
-
- makeBranch := func(node *node) *Branch {
- return node.MakeBranch(txid)
- }
-
- // Apply the update in a transaction branch
- updatedRev := latest.GetNode().Update("", data, false, txid, makeBranch)
- updatedRev.SetName(latest.GetName())
-
- // Merge the transaction branch in memory
- if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
- branch.SetLatest(mergedRev)
- }
-}
-
func (pr *PersistedRevision) startWatching() {
- log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
+ log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "stack": string(debug.Stack())})
StopWatchLoop:
for {
+ if pr.IsDiscarded() {
+ break StopWatchLoop
+ }
+
select {
case event, ok := <-pr.events:
if !ok {
@@ -209,7 +162,9 @@
if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
} else {
- pr.updateInMemory(data.Interface())
+ if pr.GetNode().GetProxy() != nil {
+ pr.LoadFromPersistence(pr.GetNode().GetProxy().getFullPath(), "")
+ }
}
}
@@ -219,110 +174,9 @@
}
}
- log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
-}
+ Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
- log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
-
- var response []Revision
- var rev Revision
-
- rev = pr
-
- if pr.kvStore != nil && path != "" {
- blobMap, _ := pr.kvStore.List(path)
-
- partition := strings.SplitN(path, "/", 2)
- name := partition[0]
-
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
-
- field := ChildrenFields(rev.GetBranch().Node.Type)[name]
-
- if field != nil && field.IsContainer {
- var children []Revision
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
- existChildMap := make(map[string]int)
- for i, child := range rev.GetChildren(name) {
- existChildMap[child.GetHash()] = i
- }
-
- for _, blob := range blobMap {
- output := blob.Value.([]byte)
-
- data := reflect.New(field.ClassType.Elem())
-
- if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
- log.Errorw(
- "loading-from-persistence--failed-to-unmarshal",
- log.Fields{"path": path, "txid": txid, "error": err},
- )
- } else if field.Key != "" {
- var key reflect.Value
- var keyValue interface{}
- var keyStr string
-
- if path == "" {
- // e.g. /logical_devices --> path="" name=logical_devices key=""
- _, key = GetAttributeValue(data.Interface(), field.Key, 0)
- keyStr = key.String()
-
- } else {
- // e.g.
- // /logical_devices/abcde --> path="abcde" name=logical_devices
- // /logical_devices/abcde/image_downloads --> path="abcde/image_downloads" name=logical_devices
-
- partition := strings.SplitN(path, "/", 2)
- key := partition[0]
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
- keyValue = field.KeyFromStr(key)
- keyStr = keyValue.(string)
-
- if idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue); childRev != nil {
- // Key is memory, continue recursing path
- if newChildRev := childRev.LoadFromPersistence(path, txid); newChildRev != nil {
- children[idx] = newChildRev[0]
-
- rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
-
- response = append(response, newChildRev[0])
- continue
- }
- }
- }
-
- childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
- childRev.SetName(name + "/" + keyStr)
-
- // Do not process a child that is already in memory
- if _, childExists := existChildMap[childRev.GetHash()]; !childExists {
- // Create watch for <component>/<key>
- childRev.SetupWatch(childRev.GetName())
-
- children = append(children, childRev)
- rev = rev.UpdateChildren(name, children, rev.GetBranch())
-
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
- }
- response = append(response, childRev)
- continue
- }
- }
- }
- }
-
- return response
+ log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "stack": string(debug.Stack())})
}
// UpdateData modifies the information in the data model and saves it in the persistent storage
@@ -335,6 +189,17 @@
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
+ events: pr.events,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isWatched = false
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ newPR.SetupWatch(newPR.GetName())
+ } else {
+ newPR.isWatched = true
+ newPR.isStored = true
}
return newPR
@@ -350,6 +215,17 @@
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
+ events: pr.events,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isWatched = false
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ newPR.SetupWatch(newPR.GetName())
+ } else {
+ newPR.isWatched = true
+ newPR.isStored = true
}
return newPR
@@ -365,6 +241,17 @@
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
+ events: pr.events,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isWatched = false
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ newPR.SetupWatch(newPR.GetName())
+ } else {
+ newPR.isWatched = true
+ newPR.isStored = true
}
return newPR
@@ -407,3 +294,182 @@
pr.Revision.Drop(txid, includeConfig)
}
+
+// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
+func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
+ rev := pr
+
+ children := make([]Revision, len(rev.GetBranch().GetLatest().GetChildren(typeName)))
+ copy(children, rev.GetBranch().GetLatest().GetChildren(typeName))
+
+ // Verify if the revision contains a child that matches that key
+ if childIdx, childRev := rev.GetNode().findRevByKey(rev.GetBranch().GetLatest().GetChildren(typeName), keyName, keyValue); childRev != nil {
+ // A child matching the provided key exists in memory
+ // Verify if the data differs to what was retrieved from persistence
+ if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
+ log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+
+ // Data has changed; replace the child entry and update the parent revision
+ updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+ updatedChildRev.SetupWatch(updatedChildRev.GetName())
+ childRev.Drop(txid, false)
+
+ if childIdx >= 0 {
+ children[childIdx] = updatedChildRev
+ } else {
+ children = append(children, updatedChildRev)
+ }
+
+ rev.GetBranch().LatestLock.Lock()
+ updatedRev := rev.UpdateChildren(typeName, children, rev.GetBranch())
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
+ rev.GetBranch().LatestLock.Unlock()
+
+ // Drop the previous child revision
+ rev.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
+
+ if updatedChildRev != nil {
+ log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+ "key": updatedChildRev.GetHash(),
+ "name": updatedChildRev.GetName(),
+ })
+ response = updatedChildRev
+ }
+ } else {
+ // Data is the same. Continue to the next entry
+ log.Debugw("verify-persisted-entry--same-data", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+ if childRev != nil {
+ log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+ response = childRev
+ }
+ }
+ } else {
+ // There is no available child with that key value.
+ // Create a new child and update the parent revision.
+ log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
+ "key": keyValue,
+ "name": typeName,
+ })
+
+ // Construct a new child node with the retrieved persistence data
+ childRev = rev.GetBranch().Node.MakeNode(data, txid).Latest(txid)
+
+ // We need to start watching this entry for future changes
+ childRev.SetName(typeName + "/" + keyValue)
+
+ // Add the child to the parent revision
+ rev.GetBranch().LatestLock.Lock()
+ children = append(children, childRev)
+ updatedRev := rev.GetBranch().Node.Latest().UpdateChildren(typeName, children, rev.GetBranch())
+ childRev.SetupWatch(childRev.GetName())
+
+ //rev.GetBranch().Node.Latest().Drop(txid, false)
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
+ rev.GetBranch().LatestLock.Unlock()
+
+ // Child entry is valid and can be included in the response object
+ if childRev != nil {
+ log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+ response = childRev
+ }
+ }
+
+ return response
+}
+
+// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
+// by adding missing entries, updating changed entries and ignoring unchanged ones
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
+
+ log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
+
+ var response []Revision
+ var rev Revision
+
+ rev = pr
+
+ if pr.kvStore != nil && path != "" {
+ blobMap, _ := pr.kvStore.List(path)
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+
+ field := ChildrenFields(rev.GetBranch().Node.Type)[name]
+
+ if field != nil && field.IsContainer {
+ log.Debugw("load-from-persistence--start-blobs", log.Fields{
+ "path": path,
+ "name": name,
+ "size": len(blobMap),
+ })
+
+ for _, blob := range blobMap {
+ output := blob.Value.([]byte)
+
+ data := reflect.New(field.ClassType.Elem())
+
+ if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
+ log.Errorw("load-from-persistence--failed-to-unmarshal", log.Fields{
+ "path": path,
+ "txid": txid,
+ "error": err,
+ })
+ } else if path == "" {
+ if field.Key != "" {
+ // Retrieve the key identifier value from the data structure
+ // based on the field's key attribute
+ _, key := GetAttributeValue(data.Interface(), field.Key, 0)
+
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
+ response = append(response, entry)
+ }
+ }
+
+ } else if field.Key != "" {
+ // The request is for a specific entry/id
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
+ response = append(response, entry)
+ }
+ }
+ }
+
+ log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
+ } else {
+ log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
+ "type": rev.GetBranch().Node.Type,
+ "name": name,
+ })
+ }
+ }
+
+ return response
+}
\ No newline at end of file