VOL-1497 : Further improved data synchronization between cores
- Introduced locking when modifying branches
- Introduced locking when modifying rev children
- Rewrote persistence loading logic to avoid unecessary changes
- Access controlled CreateProxy to ensure a proxy is not created
against an incomplete device entry
- Removed locking logic from etcd client
- Replaced revision merging logic with persistence loading
VOL-1544 : Cleanup revisions to improve overall performance
- Ensure that old revisions are discarded
- Ensure that children do not contain discarded revisions
- Disabled cache logic for now
Change-Id: I1b952c82aba379fce64a47a71b5309a6f28fb5ff
diff --git a/db/model/branch.go b/db/model/branch.go
index 40c66ad..ca89df0 100644
--- a/db/model/branch.go
+++ b/db/model/branch.go
@@ -27,11 +27,12 @@
// Branch structure is used to classify a collection of transaction based revisions
type Branch struct {
sync.RWMutex
- Node *node
- Txid string
- Origin Revision
- Revisions map[string]Revision
- Latest Revision
+ Node *node
+ Txid string
+ Origin Revision
+ Revisions map[string]Revision
+ LatestLock sync.RWMutex
+ Latest Revision
}
// NewBranch creates a new instance of the Branch structure
@@ -46,17 +47,69 @@
return b
}
+// Utility function to extract all children names for a given revision (mostly for debugging purposes)
+func (b *Branch) retrieveChildrenNames(revision Revision) []string {
+ var childrenNames []string
+
+ for _, child := range revision.GetChildren("devices") {
+ childrenNames = append(childrenNames, child.GetName())
+ }
+
+ return childrenNames
+}
+
+// Utility function to compare children names and report the missing ones (mostly for debugging purposes)
+func (b *Branch) findMissingChildrenNames(previousNames, latestNames []string) []string {
+ var missingNames []string
+
+ for _, previousName := range previousNames {
+ found := false
+
+ if len(latestNames) == 0 {
+ break
+ }
+
+ for _, latestName := range latestNames {
+ if previousName == latestName {
+ found = true
+ break
+ }
+ }
+ if !found {
+ missingNames = append(missingNames, previousName)
+ }
+ }
+
+ return missingNames
+}
+
// SetLatest assigns the latest revision for this branch
func (b *Branch) SetLatest(latest Revision) {
b.Lock()
defer b.Unlock()
if b.Latest != nil {
- log.Debugf("Switching latest from <%s> to <%s>", b.Latest.GetHash(), latest.GetHash())
- } else {
- log.Debugf("Switching latest from <NIL> to <%s>", latest.GetHash())
- }
+ log.Debugw("updating-latest-revision", log.Fields{"current": b.Latest.GetHash(), "new": latest.GetHash()})
+ // Go through list of children names in current revision and new revision
+ // and then compare the resulting outputs to ensure that we have not lost any entries.
+ var previousNames, latestNames, missingNames []string
+
+ if previousNames = b.retrieveChildrenNames(b.Latest); len(previousNames) > 0 {
+ log.Debugw("children-of-previous-revision", log.Fields{"hash": b.Latest.GetHash(), "names": previousNames})
+ }
+
+ if latestNames = b.retrieveChildrenNames(b.Latest); len(latestNames) > 0 {
+ log.Debugw("children-of-latest-revision", log.Fields{"hash": latest.GetHash(), "names": latestNames})
+ }
+
+ if missingNames = b.findMissingChildrenNames(previousNames, latestNames); len(missingNames) > 0 {
+ log.Debugw("children-missing-in-latest-revision", log.Fields{"hash": latest.GetHash(), "names": missingNames})
+ }
+
+ } else {
+ log.Debugw("setting-latest-revision", log.Fields{"new": latest.GetHash()})
+ }
b.Latest = latest
}
@@ -103,3 +156,13 @@
b.Revisions[hash] = revision
}
+
+// DeleteRevision removes a revision with the specified hash
+func (b *Branch) DeleteRevision(hash string) {
+ b.Lock()
+ defer b.Unlock()
+
+ if _, ok := b.Revisions[hash]; ok {
+ delete(b.Revisions, hash)
+ }
+}
diff --git a/db/model/node.go b/db/model/node.go
index 2a9309c..3908c4e 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -117,22 +117,34 @@
n.Lock()
defer n.Unlock()
+ // Keep a reference to the current revision
+ var previous string
+ if branch.GetLatest() != nil {
+ previous = branch.GetLatest().GetHash()
+ }
+
branch.AddRevision(revision)
+ // If anything is new, then set the revision as the latest
if branch.GetLatest() == nil || revision.GetHash() != branch.GetLatest().GetHash() {
branch.SetLatest(revision)
}
+ // Delete the previous revision if anything has changed
+ if previous != "" && previous != branch.GetLatest().GetHash() {
+ branch.DeleteRevision(previous)
+ }
+
if changeAnnouncement != nil && branch.Txid == "" {
if n.Proxy != nil {
for _, change := range changeAnnouncement {
- //log.Debugw("invoking callback",
- // log.Fields{
- // "callbacks": n.Proxy.getCallbacks(change.Type),
- // "type": change.Type,
- // "previousData": change.PreviousData,
- // "latestData": change.LatestData,
- // })
+ log.Debugw("adding-callback",
+ log.Fields{
+ "callbacks": n.Proxy.getCallbacks(change.Type),
+ "type": change.Type,
+ "previousData": change.PreviousData,
+ "latestData": change.LatestData,
+ })
n.Root.AddCallback(
n.Proxy.InvokeCallbacks,
change.Type,
@@ -141,19 +153,6 @@
change.LatestData)
}
}
-
- //for _, change := range changeAnnouncement {
- //log.Debugf("sending notification - changeType: %+v, previous:%+v, latest: %+v",
- // change.Type,
- // change.PreviousData,
- // change.LatestData)
- //n.Root.AddNotificationCallback(
- // n.makeEventBus().Advertise,
- // change.Type,
- // revision.GetHash(),
- // change.PreviousData,
- // change.LatestData)
- //}
}
}
@@ -288,8 +287,7 @@
// Get retrieves the data from a node tree that resides at the specified path
func (n *node) Get(path string, hash string, depth int, reconcile bool, txid string) interface{} {
- log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile,
- "txid": txid})
+ log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -473,6 +471,11 @@
copy(children, rev.GetChildren(name))
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
+
+ if childRev == nil {
+ return branch.GetLatest()
+ }
+
childNode := childRev.GetNode()
// Save proxy in child node to ensure callbacks are called later on
@@ -502,12 +505,20 @@
// Prefix the hash value with the data type (e.g. devices, logical_devices, adapters)
newChildRev.SetName(name + "/" + _keyValueType)
- children[idx] = newChildRev
+
+ if idx >= 0 {
+ children[idx] = newChildRev
+ } else {
+ children = append(children, newChildRev)
+ }
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
updatedRev := rev.UpdateChildren(name, children, branch)
- branch.GetLatest().Drop(txid, false)
n.makeLatest(branch, updatedRev, nil)
+ updatedRev.ChildDrop(name, childRev.GetHash())
return newChildRev
@@ -518,10 +529,15 @@
childRev := rev.GetChildren(name)[0]
childNode := childRev.GetNode()
newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
updatedRev := rev.UpdateChildren(name, []Revision{newChildRev}, branch)
- rev.Drop(txid, false)
n.makeLatest(branch, updatedRev, nil)
+ updatedRev.ChildDrop(name, childRev.GetHash())
+
return newChildRev
}
@@ -557,11 +573,6 @@
rev := branch.GetLatest().UpdateData(data, branch)
changes := []ChangeTuple{{POST_UPDATE, branch.GetLatest().GetData(), rev.GetData()}}
-
- // FIXME VOL-1293: the following statement corrupts the kv when using a subproxy (commenting for now)
- // FIXME VOL-1293 cont'd: need to figure out the required conditions otherwise we are not cleaning up entries
- //branch.GetLatest().Drop(branch.Txid, false)
-
n.makeLatest(branch, rev, changes)
return rev
@@ -628,15 +639,16 @@
// Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
childRev.SetName(name + "/" + key.String())
- // Create watch for <component>/<key>
- childRev.SetupWatch(childRev.GetName())
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
children = append(children, childRev)
- rev = rev.UpdateChildren(name, children, branch)
- changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
- rev.Drop(txid, false)
- n.makeLatest(branch, rev, changes)
+ updatedRev := rev.UpdateChildren(name, children, branch)
+ changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
+ childRev.SetupWatch(childRev.GetName())
+
+ n.makeLatest(branch, updatedRev, changes)
return childRev
}
@@ -657,17 +669,29 @@
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
+ if childRev == nil {
+ return branch.GetLatest()
+ }
+
childNode := childRev.GetNode()
newChildRev := childNode.Add(path, data, txid, makeBranch)
// Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
childRev.SetName(name + "/" + keyValue.(string))
- children[idx] = newChildRev
+ if idx >= 0 {
+ children[idx] = newChildRev
+ } else {
+ children = append(children, newChildRev)
+ }
- rev = rev.UpdateChildren(name, children, branch)
- rev.Drop(txid, false)
- n.makeLatest(branch, rev.GetBranch().GetLatest(), nil)
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ updatedRev := rev.UpdateChildren(name, children, branch)
+ n.makeLatest(branch, updatedRev, nil)
+
+ updatedRev.ChildDrop(name, childRev.GetHash())
return newChildRev
} else {
@@ -729,20 +753,30 @@
copy(children, rev.GetChildren(name))
if path != "" {
- idx, childRev := n.findRevByKey(children, field.Key, keyValue)
- childNode := childRev.GetNode()
- if childNode.Proxy == nil {
- childNode.Proxy = n.Proxy
+ if idx, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil {
+ childNode := childRev.GetNode()
+ if childNode.Proxy == nil {
+ childNode.Proxy = n.Proxy
+ }
+ newChildRev := childNode.Remove(path, txid, makeBranch)
+
+ if idx >= 0 {
+ children[idx] = newChildRev
+ } else {
+ children = append(children, newChildRev)
+ }
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ rev.SetChildren(name, children)
+ branch.GetLatest().Drop(txid, false)
+ n.makeLatest(branch, rev, nil)
}
- newChildRev := childNode.Remove(path, txid, makeBranch)
- children[idx] = newChildRev
- rev.SetChildren(name, children)
- branch.GetLatest().Drop(txid, false)
- n.makeLatest(branch, rev, nil)
- return nil
+ return branch.GetLatest()
}
- if idx, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil {
+ if idx, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil && idx >= 0 {
if n.GetProxy() != nil {
data := childRev.GetData()
n.GetProxy().InvokeCallbacks(PRE_REMOVE, false, data)
@@ -752,6 +786,10 @@
}
childRev.StorageDrop(txid, true)
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
children = append(children[:idx], children[idx+1:]...)
rev.SetChildren(name, children)
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 3c39e01..d501c66 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -22,34 +22,41 @@
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"reflect"
+ "runtime/debug"
"sort"
"sync"
)
-type revCacheSingleton struct {
- sync.RWMutex
- Cache map[string]interface{}
-}
-
-var revCacheInstance *revCacheSingleton
-var revCacheOnce sync.Once
-
-func GetRevCache() *revCacheSingleton {
- revCacheOnce.Do(func() {
- revCacheInstance = &revCacheSingleton{Cache: make(map[string]interface{})}
- })
- return revCacheInstance
-}
+// TODO: Cache logic will have to be revisited to cleanup unused entries in memory (disabled for now)
+//
+//type revCacheSingleton struct {
+// sync.RWMutex
+// //Cache map[string]interface{}
+// Cache sync.Map
+//}
+//
+//var revCacheInstance *revCacheSingleton
+//var revCacheOnce sync.Once
+//
+//func GetRevCache() *revCacheSingleton {
+// revCacheOnce.Do(func() {
+// //revCacheInstance = &revCacheSingleton{Cache: make(map[string]interface{})}
+// revCacheInstance = &revCacheSingleton{Cache: sync.Map{}}
+// })
+// return revCacheInstance
+//}
type NonPersistedRevision struct {
- mutex sync.RWMutex
- Root *root
- Config *DataRevision
- Children map[string][]Revision
- Hash string
- Branch *Branch
- WeakRef string
- Name string
+ mutex sync.RWMutex
+ Root *root
+ Config *DataRevision
+ childrenLock sync.RWMutex
+ Children map[string][]Revision
+ Hash string
+ Branch *Branch
+ WeakRef string
+ Name string
+ discarded bool
}
func NewNonPersistedRevision(root *root, branch *Branch, data interface{}, children map[string][]Revision) Revision {
@@ -59,9 +66,14 @@
r.Config = NewDataRevision(root, data)
r.Children = children
r.Hash = r.hashContent()
+ r.discarded = false
return r
}
+func (npr *NonPersistedRevision) IsDiscarded() bool {
+ return npr.discarded
+}
+
func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
npr.mutex.Lock()
defer npr.mutex.Unlock()
@@ -75,28 +87,35 @@
}
func (npr *NonPersistedRevision) SetAllChildren(children map[string][]Revision) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- npr.Children = children
-}
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+ npr.Children = make(map[string][]Revision)
-func (npr *NonPersistedRevision) SetChildren(name string, children []Revision) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- if _, exists := npr.Children[name]; exists {
- npr.Children[name] = children
+ for key, value := range children {
+ npr.Children[key] = make([]Revision, len(value))
+ copy(npr.Children[key], value)
}
}
+func (npr *NonPersistedRevision) SetChildren(name string, children []Revision) {
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
+ npr.Children[name] = make([]Revision, len(children))
+ copy(npr.Children[name], children)
+}
+
func (npr *NonPersistedRevision) GetAllChildren() map[string][]Revision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
return npr.Children
}
func (npr *NonPersistedRevision) GetChildren(name string) []Revision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
if _, exists := npr.Children[name]; exists {
return npr.Children[name]
}
@@ -160,22 +179,11 @@
}
func (npr *NonPersistedRevision) Finalize(skipOnExist bool) {
- GetRevCache().Lock()
- defer GetRevCache().Unlock()
-
- if !skipOnExist {
- npr.Hash = npr.hashContent()
- }
- if _, exists := GetRevCache().Cache[npr.Hash]; !exists {
- GetRevCache().Cache[npr.Hash] = npr
- }
- if _, exists := GetRevCache().Cache[npr.Config.Hash]; !exists {
- GetRevCache().Cache[npr.Config.Hash] = npr.Config
- } else {
- npr.Config = GetRevCache().Cache[npr.Config.Hash].(*DataRevision)
- }
+ npr.Hash = npr.hashContent()
}
+// hashContent generates a hash string based on the contents of the revision.
+// The string should be unique to avoid conflicts with other revisions
func (npr *NonPersistedRevision) hashContent() string {
var buffer bytes.Buffer
var childrenKeys []string
@@ -184,6 +192,10 @@
buffer.WriteString(npr.Config.Hash)
}
+ if npr.Name != "" {
+ buffer.WriteString(npr.Name)
+ }
+
for key := range npr.Children {
childrenKeys = append(childrenKeys, key)
}
@@ -204,18 +216,20 @@
return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
}
+// Get will retrieve the data for the current revision
func (npr *NonPersistedRevision) Get(depth int) interface{} {
// 1. Clone the data to avoid any concurrent access issues
// 2. The current rev might still be pointing to an old config
// thus, force the revision to get its latest value
latestRev := npr.GetBranch().GetLatest()
originalData := proto.Clone(latestRev.GetData().(proto.Message))
-
data := originalData
- // Get back to the interface type
- //data := reflect.ValueOf(originalData).Interface()
if depth != 0 {
+ // FIXME: Traversing the struct through reflection sometimes corrupts the data.
+ // Unlike the original python implementation, golang structs are not lazy loaded.
+ // Keeping this non-critical logic for now, but Get operations should be forced to
+ // depth=0 to avoid going through the following loop.
for fieldName, field := range ChildrenFields(latestRev.GetData()) {
childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
if field.IsContainer {
@@ -235,8 +249,8 @@
}
}
} else {
- if revs := latestRev.GetChildren(fieldName); revs != nil && len(revs) > 0 {
- rev := latestRev.GetChildren(fieldName)[0]
+ if revs := npr.GetBranch().GetLatest().GetChildren(fieldName); revs != nil && len(revs) > 0 {
+ rev := revs[0]
if rev != nil {
childData := rev.Get(depth - 1)
if reflect.TypeOf(childData) == reflect.TypeOf(childDataHolder.Interface()) {
@@ -260,22 +274,27 @@
return result
}
+// UpdateData will refresh the data content of the revision
func (npr *NonPersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
+ // Do not update the revision if data is the same
if npr.Config.Data != nil && npr.Config.hashData(npr.Root, data) == npr.Config.Hash {
log.Debugw("stored-data-matches-latest", log.Fields{"stored": npr.Config.Data, "provided": data})
return npr
}
+ // Construct a new revision based on the current one
newRev := NonPersistedRevision{}
newRev.Config = NewDataRevision(npr.Root, data)
newRev.Hash = npr.Hash
+ newRev.Root = npr.Root
+ newRev.Name = npr.Name
newRev.Branch = branch
newRev.Children = make(map[string][]Revision)
- for entryName, childrenEntry := range npr.Children {
+ for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
}
@@ -284,44 +303,91 @@
return &newRev
}
+// UpdateChildren will refresh the list of children with the provided ones
+// It will carefully go through the list and ensure that no child is lost
func (npr *NonPersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
- updatedRev := npr
-
- // Verify if the map contains already contains an entry matching the name value
- // If so, we need to retain the contents of that entry and merge them with the provided children revision list
- if _, exists := updatedRev.Children[name]; exists {
- // Go through all child hashes and save their index within the map
- existChildMap := make(map[string]int)
- for i, child := range updatedRev.Children[name] {
- existChildMap[child.GetHash()] = i
- }
-
- for _, newChild := range children {
- if _, childExists := existChildMap[newChild.GetHash()]; !childExists {
- // revision is not present in the existing list... add it
- updatedRev.Children[name] = append(updatedRev.Children[name], newChild)
- } else {
- // replace
- updatedRev.Children[name][existChildMap[newChild.GetHash()]] = newChild
- }
- }
- } else {
- // Map entry does not exist, thus just create a new entry and assign the provided revisions
- updatedRev.Children[name] = make([]Revision, len(children))
- copy(updatedRev.Children[name], children)
- }
-
+ // Construct a new revision based on the current one
+ updatedRev := &NonPersistedRevision{}
updatedRev.Config = NewDataRevision(npr.Root, npr.Config.Data)
updatedRev.Hash = npr.Hash
updatedRev.Branch = branch
+ updatedRev.Name = npr.Name
+
+ updatedRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
+ updatedRev.Children[entryName] = append(updatedRev.Children[entryName], childrenEntry...)
+ }
+
+ var updatedChildren []Revision
+
+ // Verify if the map contains already contains an entry matching the name value
+ // If so, we need to retain the contents of that entry and merge them with the provided children revision list
+ if existingChildren := branch.GetLatest().GetChildren(name); existingChildren != nil {
+ // Construct a map of unique child names with the respective index value
+ // for the children in the existing revision as well as the new ones
+ existingNames := make(map[string]int)
+ newNames := make(map[string]int)
+
+ for i, newChild := range children {
+ newNames[newChild.GetName()] = i
+ }
+
+ for i, existingChild := range existingChildren {
+ existingNames[existingChild.GetName()] = i
+
+ // If an existing entry is not in the new list, add it to the updated list, so it is not forgotten
+ if _, exists := newNames[existingChild.GetName()]; !exists {
+ updatedChildren = append(updatedChildren, existingChild)
+ }
+ }
+
+ log.Debugw("existing-children-names", log.Fields{"hash": npr.GetHash(), "names": existingNames})
+
+ // Merge existing and new children
+ for _, newChild := range children {
+ nameIndex, nameExists := existingNames[newChild.GetName()]
+
+ // Does the existing list contain a child with that name?
+ if nameExists {
+ // Check if the data has changed or not
+ if existingChildren[nameIndex].GetData().(proto.Message).String() != newChild.GetData().(proto.Message).String() {
+ // replace entry
+ newChild.GetNode().Root = existingChildren[nameIndex].GetNode().Root
+ updatedChildren = append(updatedChildren, newChild)
+ } else {
+ // keep existing entry
+ updatedChildren = append(updatedChildren, existingChildren[nameIndex])
+ }
+ } else {
+ // new entry ... just add it
+ updatedChildren = append(updatedChildren, newChild)
+ }
+ }
+
+ // Save children in new revision
+ updatedRev.SetChildren(name, updatedChildren)
+
+ updatedNames := make(map[string]int)
+ for i, updatedChild := range updatedChildren {
+ updatedNames[updatedChild.GetName()] = i
+ }
+
+ log.Debugw("updated-children-names", log.Fields{"hash": npr.GetHash(), "names": updatedNames})
+
+ } else {
+ // There are no children available, just save the provided ones
+ updatedRev.SetChildren(name, children)
+ }
+
updatedRev.Finalize(false)
return updatedRev
}
+// UpdateAllChildren will replace the current list of children with the provided ones
func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
@@ -330,6 +396,8 @@
newRev.Config = npr.Config
newRev.Hash = npr.Hash
newRev.Branch = branch
+ newRev.Name = npr.Name
+
newRev.Children = make(map[string][]Revision)
for entryName, childrenEntry := range children {
newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
@@ -339,14 +407,25 @@
return newRev
}
+// Drop is used to indicate when a revision is no longer required
func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
- GetRevCache().Lock()
- defer GetRevCache().Unlock()
+ log.Debugw("dropping-revision", log.Fields{"hash": npr.GetHash(), "stack": string(debug.Stack())})
+ npr.discarded = true
+}
- if includeConfig {
- delete(GetRevCache().Cache, npr.Config.Hash)
+// ChildDrop will remove a child entry matching the provided parameters from the current revision
+func (npr *NonPersistedRevision) ChildDrop(childType string, childHash string) {
+ if childType != "" {
+ children := make([]Revision, len(npr.GetChildren(childType)))
+ copy(children, npr.GetChildren(childType))
+ for i, child := range children {
+ if child.GetHash() == childHash {
+ children = append(children[:i], children[i+1:]...)
+ npr.SetChildren(childType, children)
+ break
+ }
+ }
}
- delete(GetRevCache().Cache, npr.Hash)
}
func (npr *NonPersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
@@ -361,3 +440,4 @@
func (pr *NonPersistedRevision) StorageDrop(txid string, includeConfig bool) {
// stub ... required by interface
}
+
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index c2a6c64..cf7ff9e 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,9 +19,7 @@
import (
"bytes"
"compress/gzip"
- "encoding/hex"
"github.com/golang/protobuf/proto"
- "github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"reflect"
@@ -42,6 +40,20 @@
isWatched bool
}
+type watchCache struct {
+ Cache sync.Map
+}
+
+var watchCacheInstance *watchCache
+var watchCacheOne sync.Once
+
+func Watches() *watchCache {
+ watchCacheOne.Do(func() {
+ watchCacheInstance = &watchCache{Cache: sync.Map{}}
+ })
+ return watchCacheInstance
+}
+
// NewPersistedRevision creates a new instance of a PersistentRevision structure
func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
pr := &PersistedRevision{}
@@ -55,11 +67,6 @@
pr.store(skipOnExist)
}
-type revData struct {
- Children map[string][]string
- Config string
-}
-
func (pr *PersistedRevision) store(skipOnExist bool) {
if pr.GetBranch().Txid != "" {
return
@@ -92,97 +99,43 @@
}
func (pr *PersistedRevision) SetupWatch(key string) {
+ if key == "" {
+ log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+ return
+ }
+
+ if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
+ return
+ }
+
if pr.events == nil {
pr.events = make(chan *kvstore.Event)
- log.Debugw("setting-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+ log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
pr.SetName(key)
pr.events = pr.kvStore.CreateWatch(key)
+ }
+ if !pr.isWatched {
pr.isWatched = true
+ log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+
// Start watching
go pr.startWatching()
}
}
-func (pr *PersistedRevision) updateInMemory(data interface{}) {
- pr.mutex.Lock()
- defer pr.mutex.Unlock()
-
- var pac *proxyAccessControl
- var pathLock string
-
- //
- // If a proxy exists for this revision, use it to lock access to the path
- // and prevent simultaneous updates to the object in memory
- //
- if pr.GetNode().GetProxy() != nil {
- pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
-
- // If the proxy already has a request in progress, then there is no need to process the watch
- log.Debugw("update-in-memory--checking-pathlock", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
- if PAC().IsReserved(pathLock) {
- switch pr.GetNode().GetRoot().GetProxy().Operation {
- case PROXY_ADD:
- fallthrough
- case PROXY_REMOVE:
- fallthrough
- case PROXY_UPDATE:
- log.Debugw("update-in-memory--skipping", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
- return
- default:
- log.Debugw("update-in-memory--operation", log.Fields{"operation": pr.GetNode().GetRoot().GetProxy().Operation})
- }
- } else {
- log.Debugw("update-in-memory--path-not-locked", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
- }
-
- log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
-
- pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
- pac.SetProxy(pr.GetNode().GetProxy())
- pac.lock()
-
- defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
- defer pac.unlock()
- defer PAC().ReleasePath(pathLock)
- }
-
- //
- // Update the object in memory through a transaction
- // This will allow for the object to be subsequently merged with any changes
- // that might have occurred in memory
- //
-
- log.Debugw("update-in-memory--custom-transaction", log.Fields{"key": pr.GetHash()})
-
- // Prepare the transaction
- branch := pr.GetBranch()
- latest := branch.GetLatest()
- txidBin, _ := uuid.New().MarshalBinary()
- txid := hex.EncodeToString(txidBin)[:12]
-
- makeBranch := func(node *node) *Branch {
- return node.MakeBranch(txid)
- }
-
- // Apply the update in a transaction branch
- updatedRev := latest.GetNode().Update("", data, false, txid, makeBranch)
- updatedRev.SetName(latest.GetName())
-
- // Merge the transaction branch in memory
- if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
- branch.SetLatest(mergedRev)
- }
-}
-
func (pr *PersistedRevision) startWatching() {
- log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
+ log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "stack": string(debug.Stack())})
StopWatchLoop:
for {
+ if pr.IsDiscarded() {
+ break StopWatchLoop
+ }
+
select {
case event, ok := <-pr.events:
if !ok {
@@ -209,7 +162,9 @@
if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
} else {
- pr.updateInMemory(data.Interface())
+ if pr.GetNode().GetProxy() != nil {
+ pr.LoadFromPersistence(pr.GetNode().GetProxy().getFullPath(), "")
+ }
}
}
@@ -219,110 +174,9 @@
}
}
- log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
-}
+ Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
- log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
-
- var response []Revision
- var rev Revision
-
- rev = pr
-
- if pr.kvStore != nil && path != "" {
- blobMap, _ := pr.kvStore.List(path)
-
- partition := strings.SplitN(path, "/", 2)
- name := partition[0]
-
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
-
- field := ChildrenFields(rev.GetBranch().Node.Type)[name]
-
- if field != nil && field.IsContainer {
- var children []Revision
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
- existChildMap := make(map[string]int)
- for i, child := range rev.GetChildren(name) {
- existChildMap[child.GetHash()] = i
- }
-
- for _, blob := range blobMap {
- output := blob.Value.([]byte)
-
- data := reflect.New(field.ClassType.Elem())
-
- if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
- log.Errorw(
- "loading-from-persistence--failed-to-unmarshal",
- log.Fields{"path": path, "txid": txid, "error": err},
- )
- } else if field.Key != "" {
- var key reflect.Value
- var keyValue interface{}
- var keyStr string
-
- if path == "" {
- // e.g. /logical_devices --> path="" name=logical_devices key=""
- _, key = GetAttributeValue(data.Interface(), field.Key, 0)
- keyStr = key.String()
-
- } else {
- // e.g.
- // /logical_devices/abcde --> path="abcde" name=logical_devices
- // /logical_devices/abcde/image_downloads --> path="abcde/image_downloads" name=logical_devices
-
- partition := strings.SplitN(path, "/", 2)
- key := partition[0]
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
- keyValue = field.KeyFromStr(key)
- keyStr = keyValue.(string)
-
- if idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue); childRev != nil {
- // Key is memory, continue recursing path
- if newChildRev := childRev.LoadFromPersistence(path, txid); newChildRev != nil {
- children[idx] = newChildRev[0]
-
- rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
-
- response = append(response, newChildRev[0])
- continue
- }
- }
- }
-
- childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
- childRev.SetName(name + "/" + keyStr)
-
- // Do not process a child that is already in memory
- if _, childExists := existChildMap[childRev.GetHash()]; !childExists {
- // Create watch for <component>/<key>
- childRev.SetupWatch(childRev.GetName())
-
- children = append(children, childRev)
- rev = rev.UpdateChildren(name, children, rev.GetBranch())
-
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
- }
- response = append(response, childRev)
- continue
- }
- }
- }
- }
-
- return response
+ log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "stack": string(debug.Stack())})
}
// UpdateData modifies the information in the data model and saves it in the persistent storage
@@ -335,6 +189,17 @@
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
+ events: pr.events,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isWatched = false
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ newPR.SetupWatch(newPR.GetName())
+ } else {
+ newPR.isWatched = true
+ newPR.isStored = true
}
return newPR
@@ -350,6 +215,17 @@
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
+ events: pr.events,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isWatched = false
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ newPR.SetupWatch(newPR.GetName())
+ } else {
+ newPR.isWatched = true
+ newPR.isStored = true
}
return newPR
@@ -365,6 +241,17 @@
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
+ events: pr.events,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isWatched = false
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ newPR.SetupWatch(newPR.GetName())
+ } else {
+ newPR.isWatched = true
+ newPR.isStored = true
}
return newPR
@@ -407,3 +294,182 @@
pr.Revision.Drop(txid, includeConfig)
}
+
+// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
+func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
+ rev := pr
+
+ children := make([]Revision, len(rev.GetBranch().GetLatest().GetChildren(typeName)))
+ copy(children, rev.GetBranch().GetLatest().GetChildren(typeName))
+
+ // Verify if the revision contains a child that matches that key
+ if childIdx, childRev := rev.GetNode().findRevByKey(rev.GetBranch().GetLatest().GetChildren(typeName), keyName, keyValue); childRev != nil {
+ // A child matching the provided key exists in memory
+ // Verify if the data differs to what was retrieved from persistence
+ if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
+ log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+
+ // Data has changed; replace the child entry and update the parent revision
+ updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+ updatedChildRev.SetupWatch(updatedChildRev.GetName())
+ childRev.Drop(txid, false)
+
+ if childIdx >= 0 {
+ children[childIdx] = updatedChildRev
+ } else {
+ children = append(children, updatedChildRev)
+ }
+
+ rev.GetBranch().LatestLock.Lock()
+ updatedRev := rev.UpdateChildren(typeName, children, rev.GetBranch())
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
+ rev.GetBranch().LatestLock.Unlock()
+
+ // Drop the previous child revision
+ rev.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
+
+ if updatedChildRev != nil {
+ log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+ "key": updatedChildRev.GetHash(),
+ "name": updatedChildRev.GetName(),
+ })
+ response = updatedChildRev
+ }
+ } else {
+ // Data is the same. Continue to the next entry
+ log.Debugw("verify-persisted-entry--same-data", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+ if childRev != nil {
+ log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+ response = childRev
+ }
+ }
+ } else {
+ // There is no available child with that key value.
+ // Create a new child and update the parent revision.
+ log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
+ "key": keyValue,
+ "name": typeName,
+ })
+
+ // Construct a new child node with the retrieved persistence data
+ childRev = rev.GetBranch().Node.MakeNode(data, txid).Latest(txid)
+
+ // We need to start watching this entry for future changes
+ childRev.SetName(typeName + "/" + keyValue)
+
+ // Add the child to the parent revision
+ rev.GetBranch().LatestLock.Lock()
+ children = append(children, childRev)
+ updatedRev := rev.GetBranch().Node.Latest().UpdateChildren(typeName, children, rev.GetBranch())
+ childRev.SetupWatch(childRev.GetName())
+
+ //rev.GetBranch().Node.Latest().Drop(txid, false)
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
+ rev.GetBranch().LatestLock.Unlock()
+
+ // Child entry is valid and can be included in the response object
+ if childRev != nil {
+ log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+ response = childRev
+ }
+ }
+
+ return response
+}
+
+// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
+// by adding missing entries, updating changed entries and ignoring unchanged ones
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
+
+ log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
+
+ var response []Revision
+ var rev Revision
+
+ rev = pr
+
+ if pr.kvStore != nil && path != "" {
+ blobMap, _ := pr.kvStore.List(path)
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+
+ field := ChildrenFields(rev.GetBranch().Node.Type)[name]
+
+ if field != nil && field.IsContainer {
+ log.Debugw("load-from-persistence--start-blobs", log.Fields{
+ "path": path,
+ "name": name,
+ "size": len(blobMap),
+ })
+
+ for _, blob := range blobMap {
+ output := blob.Value.([]byte)
+
+ data := reflect.New(field.ClassType.Elem())
+
+ if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
+ log.Errorw("load-from-persistence--failed-to-unmarshal", log.Fields{
+ "path": path,
+ "txid": txid,
+ "error": err,
+ })
+ } else if path == "" {
+ if field.Key != "" {
+ // Retrieve the key identifier value from the data structure
+ // based on the field's key attribute
+ _, key := GetAttributeValue(data.Interface(), field.Key, 0)
+
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
+ response = append(response, entry)
+ }
+ }
+
+ } else if field.Key != "" {
+ // The request is for a specific entry/id
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
+ response = append(response, entry)
+ }
+ }
+ }
+
+ log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
+ } else {
+ log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
+ "type": rev.GetBranch().Node.Type,
+ "name": name,
+ })
+ }
+ }
+
+ return response
+}
\ No newline at end of file
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 86d426a..eb3cb71 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -147,6 +147,7 @@
PROXY_ADD
PROXY_UPDATE
PROXY_REMOVE
+ PROXY_CREATE
)
// parseForControlledPath verifies if a proxy path matches a pattern
@@ -243,6 +244,9 @@
p.Operation = PROXY_UPDATE
pac.SetProxy(p)
+ defer func(op ProxyOperation) {
+ pac.getProxy().Operation = op
+ }(PROXY_GET)
log.Debugw("proxy-operation--update", log.Fields{"operation":p.Operation})
@@ -275,6 +279,10 @@
defer PAC().ReleasePath(pathLock)
p.Operation = PROXY_ADD
+ defer func() {
+ p.Operation = PROXY_GET
+ }()
+
pac.SetProxy(p)
log.Debugw("proxy-operation--add", log.Fields{"operation":p.Operation})
@@ -307,6 +315,9 @@
p.Operation = PROXY_ADD
pac.SetProxy(p)
+ defer func() {
+ p.Operation = PROXY_GET
+ }()
log.Debugw("proxy-operation--add", log.Fields{"operation":p.Operation})
@@ -338,12 +349,50 @@
p.Operation = PROXY_REMOVE
pac.SetProxy(p)
+ defer func() {
+ p.Operation = PROXY_GET
+ }()
log.Debugw("proxy-operation--remove", log.Fields{"operation":p.Operation})
return pac.Remove(fullPath, txid, controlled)
}
+// CreateProxy to interact with specific path directly
+func (p *Proxy) CreateProxy(path string, exclusive bool) *Proxy {
+ if !strings.HasPrefix(path, "/") {
+ log.Errorf("invalid path: %s", path)
+ return nil
+ }
+
+ var fullPath string
+ var effectivePath string
+ if path == "/" {
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
+ } else {
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
+
+ pac := PAC().ReservePath(path, p, pathLock)
+ defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_CREATE
+ pac.SetProxy(p)
+ defer func() {
+ p.Operation = PROXY_GET
+ }()
+
+ log.Debugw("proxy-operation--create-proxy", log.Fields{"operation":p.Operation})
+
+ return pac.CreateProxy(fullPath, exclusive, controlled)
+}
+
// OpenTransaction creates a new transaction branch to isolate operations made to the data model
func (p *Proxy) OpenTransaction() *Transaction {
txid := p.GetRoot().MakeTxBranch()
diff --git a/db/model/proxy_access_control.go b/db/model/proxy_access_control.go
index 66d3222..2a5d034 100644
--- a/db/model/proxy_access_control.go
+++ b/db/model/proxy_access_control.go
@@ -109,6 +109,7 @@
// lock will prevent access to a model path
func (pac *proxyAccessControl) lock() {
+ log.Debugw("locking", log.Fields{"path": pac.Path})
pac.PathLock <- struct{}{}
pac.setStart(time.Now())
}
@@ -116,6 +117,7 @@
// unlock will release control of a model path
func (pac *proxyAccessControl) unlock() {
<-pac.PathLock
+ log.Debugw("unlocking", log.Fields{"path": pac.Path})
pac.setStop(time.Now())
GetProfiling().AddToInMemoryLockTime(pac.getStop().Sub(pac.getStart()).Seconds())
}
@@ -243,3 +245,20 @@
return pac.getProxy().GetRoot().Remove(path, txid, nil)
}
+
+// CreateProxy allows interaction for a specific path
+func (pac *proxyAccessControl) CreateProxy(path string, exclusive bool, control bool) *Proxy {
+ if control {
+ pac.lock()
+ log.Debugw("locked-access--create-proxy", log.Fields{"path": path, "fullPath": pac.Path})
+ defer pac.unlock()
+ defer log.Debugw("unlocked-access--create-proxy", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
+ }
+
+ result := pac.getProxy().GetRoot().CreateProxy(path, exclusive)
+
+ if result != nil {
+ return result
+ }
+ return nil
+}
diff --git a/db/model/revision.go b/db/model/revision.go
index 2c10137..79620e1 100644
--- a/db/model/revision.go
+++ b/db/model/revision.go
@@ -17,10 +17,12 @@
type Revision interface {
Finalize(bool)
+ IsDiscarded() bool
SetConfig(revision *DataRevision)
GetConfig() *DataRevision
Drop(txid string, includeConfig bool)
StorageDrop(txid string, includeConfig bool)
+ ChildDrop(childType string, childHash string)
SetChildren(name string, children []Revision)
GetChildren(name string) []Revision
SetAllChildren(children map[string][]Revision)
diff --git a/db/model/root.go b/db/model/root.go
index 8f9e001..7a29c0b 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -119,11 +119,11 @@
r.Callbacks = r.Callbacks[1:]
go callback.Execute(nil)
}
- for len(r.NotificationCallbacks) > 0 {
- callback := r.NotificationCallbacks[0]
- r.NotificationCallbacks = r.NotificationCallbacks[1:]
- go callback.Execute(nil)
- }
+ //for len(r.NotificationCallbacks) > 0 {
+ // callback := r.NotificationCallbacks[0]
+ // r.NotificationCallbacks = r.NotificationCallbacks[1:]
+ // go callback.Execute(nil)
+ //}
}
func (r *root) hasCallbacks() bool {