VOL-1497 : Further improved data synchronization between cores
- Introduced locking when modifying branches
- Introduced locking when modifying rev children
- Rewrote persistence loading logic to avoid unecessary changes
- Access controlled CreateProxy to ensure a proxy is not created
against an incomplete device entry
- Removed locking logic from etcd client
- Replaced revision merging logic with persistence loading
VOL-1544 : Cleanup revisions to improve overall performance
- Ensure that old revisions are discarded
- Ensure that children do not contain discarded revisions
- Disabled cache logic for now
Change-Id: I1b952c82aba379fce64a47a71b5309a6f28fb5ff
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 3c39e01..d501c66 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -22,34 +22,41 @@
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"reflect"
+ "runtime/debug"
"sort"
"sync"
)
-type revCacheSingleton struct {
- sync.RWMutex
- Cache map[string]interface{}
-}
-
-var revCacheInstance *revCacheSingleton
-var revCacheOnce sync.Once
-
-func GetRevCache() *revCacheSingleton {
- revCacheOnce.Do(func() {
- revCacheInstance = &revCacheSingleton{Cache: make(map[string]interface{})}
- })
- return revCacheInstance
-}
+// TODO: Cache logic will have to be revisited to cleanup unused entries in memory (disabled for now)
+//
+//type revCacheSingleton struct {
+// sync.RWMutex
+// //Cache map[string]interface{}
+// Cache sync.Map
+//}
+//
+//var revCacheInstance *revCacheSingleton
+//var revCacheOnce sync.Once
+//
+//func GetRevCache() *revCacheSingleton {
+// revCacheOnce.Do(func() {
+// //revCacheInstance = &revCacheSingleton{Cache: make(map[string]interface{})}
+// revCacheInstance = &revCacheSingleton{Cache: sync.Map{}}
+// })
+// return revCacheInstance
+//}
type NonPersistedRevision struct {
- mutex sync.RWMutex
- Root *root
- Config *DataRevision
- Children map[string][]Revision
- Hash string
- Branch *Branch
- WeakRef string
- Name string
+ mutex sync.RWMutex
+ Root *root
+ Config *DataRevision
+ childrenLock sync.RWMutex
+ Children map[string][]Revision
+ Hash string
+ Branch *Branch
+ WeakRef string
+ Name string
+ discarded bool
}
func NewNonPersistedRevision(root *root, branch *Branch, data interface{}, children map[string][]Revision) Revision {
@@ -59,9 +66,14 @@
r.Config = NewDataRevision(root, data)
r.Children = children
r.Hash = r.hashContent()
+ r.discarded = false
return r
}
+func (npr *NonPersistedRevision) IsDiscarded() bool {
+ return npr.discarded
+}
+
func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
npr.mutex.Lock()
defer npr.mutex.Unlock()
@@ -75,28 +87,35 @@
}
func (npr *NonPersistedRevision) SetAllChildren(children map[string][]Revision) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- npr.Children = children
-}
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+ npr.Children = make(map[string][]Revision)
-func (npr *NonPersistedRevision) SetChildren(name string, children []Revision) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- if _, exists := npr.Children[name]; exists {
- npr.Children[name] = children
+ for key, value := range children {
+ npr.Children[key] = make([]Revision, len(value))
+ copy(npr.Children[key], value)
}
}
+func (npr *NonPersistedRevision) SetChildren(name string, children []Revision) {
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
+ npr.Children[name] = make([]Revision, len(children))
+ copy(npr.Children[name], children)
+}
+
func (npr *NonPersistedRevision) GetAllChildren() map[string][]Revision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
return npr.Children
}
func (npr *NonPersistedRevision) GetChildren(name string) []Revision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
if _, exists := npr.Children[name]; exists {
return npr.Children[name]
}
@@ -160,22 +179,11 @@
}
func (npr *NonPersistedRevision) Finalize(skipOnExist bool) {
- GetRevCache().Lock()
- defer GetRevCache().Unlock()
-
- if !skipOnExist {
- npr.Hash = npr.hashContent()
- }
- if _, exists := GetRevCache().Cache[npr.Hash]; !exists {
- GetRevCache().Cache[npr.Hash] = npr
- }
- if _, exists := GetRevCache().Cache[npr.Config.Hash]; !exists {
- GetRevCache().Cache[npr.Config.Hash] = npr.Config
- } else {
- npr.Config = GetRevCache().Cache[npr.Config.Hash].(*DataRevision)
- }
+ npr.Hash = npr.hashContent()
}
+// hashContent generates a hash string based on the contents of the revision.
+// The string should be unique to avoid conflicts with other revisions
func (npr *NonPersistedRevision) hashContent() string {
var buffer bytes.Buffer
var childrenKeys []string
@@ -184,6 +192,10 @@
buffer.WriteString(npr.Config.Hash)
}
+ if npr.Name != "" {
+ buffer.WriteString(npr.Name)
+ }
+
for key := range npr.Children {
childrenKeys = append(childrenKeys, key)
}
@@ -204,18 +216,20 @@
return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
}
+// Get will retrieve the data for the current revision
func (npr *NonPersistedRevision) Get(depth int) interface{} {
// 1. Clone the data to avoid any concurrent access issues
// 2. The current rev might still be pointing to an old config
// thus, force the revision to get its latest value
latestRev := npr.GetBranch().GetLatest()
originalData := proto.Clone(latestRev.GetData().(proto.Message))
-
data := originalData
- // Get back to the interface type
- //data := reflect.ValueOf(originalData).Interface()
if depth != 0 {
+ // FIXME: Traversing the struct through reflection sometimes corrupts the data.
+ // Unlike the original python implementation, golang structs are not lazy loaded.
+ // Keeping this non-critical logic for now, but Get operations should be forced to
+ // depth=0 to avoid going through the following loop.
for fieldName, field := range ChildrenFields(latestRev.GetData()) {
childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
if field.IsContainer {
@@ -235,8 +249,8 @@
}
}
} else {
- if revs := latestRev.GetChildren(fieldName); revs != nil && len(revs) > 0 {
- rev := latestRev.GetChildren(fieldName)[0]
+ if revs := npr.GetBranch().GetLatest().GetChildren(fieldName); revs != nil && len(revs) > 0 {
+ rev := revs[0]
if rev != nil {
childData := rev.Get(depth - 1)
if reflect.TypeOf(childData) == reflect.TypeOf(childDataHolder.Interface()) {
@@ -260,22 +274,27 @@
return result
}
+// UpdateData will refresh the data content of the revision
func (npr *NonPersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
+ // Do not update the revision if data is the same
if npr.Config.Data != nil && npr.Config.hashData(npr.Root, data) == npr.Config.Hash {
log.Debugw("stored-data-matches-latest", log.Fields{"stored": npr.Config.Data, "provided": data})
return npr
}
+ // Construct a new revision based on the current one
newRev := NonPersistedRevision{}
newRev.Config = NewDataRevision(npr.Root, data)
newRev.Hash = npr.Hash
+ newRev.Root = npr.Root
+ newRev.Name = npr.Name
newRev.Branch = branch
newRev.Children = make(map[string][]Revision)
- for entryName, childrenEntry := range npr.Children {
+ for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
}
@@ -284,44 +303,91 @@
return &newRev
}
+// UpdateChildren will refresh the list of children with the provided ones
+// It will carefully go through the list and ensure that no child is lost
func (npr *NonPersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
- updatedRev := npr
-
- // Verify if the map contains already contains an entry matching the name value
- // If so, we need to retain the contents of that entry and merge them with the provided children revision list
- if _, exists := updatedRev.Children[name]; exists {
- // Go through all child hashes and save their index within the map
- existChildMap := make(map[string]int)
- for i, child := range updatedRev.Children[name] {
- existChildMap[child.GetHash()] = i
- }
-
- for _, newChild := range children {
- if _, childExists := existChildMap[newChild.GetHash()]; !childExists {
- // revision is not present in the existing list... add it
- updatedRev.Children[name] = append(updatedRev.Children[name], newChild)
- } else {
- // replace
- updatedRev.Children[name][existChildMap[newChild.GetHash()]] = newChild
- }
- }
- } else {
- // Map entry does not exist, thus just create a new entry and assign the provided revisions
- updatedRev.Children[name] = make([]Revision, len(children))
- copy(updatedRev.Children[name], children)
- }
-
+ // Construct a new revision based on the current one
+ updatedRev := &NonPersistedRevision{}
updatedRev.Config = NewDataRevision(npr.Root, npr.Config.Data)
updatedRev.Hash = npr.Hash
updatedRev.Branch = branch
+ updatedRev.Name = npr.Name
+
+ updatedRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
+ updatedRev.Children[entryName] = append(updatedRev.Children[entryName], childrenEntry...)
+ }
+
+ var updatedChildren []Revision
+
+ // Verify if the map contains already contains an entry matching the name value
+ // If so, we need to retain the contents of that entry and merge them with the provided children revision list
+ if existingChildren := branch.GetLatest().GetChildren(name); existingChildren != nil {
+ // Construct a map of unique child names with the respective index value
+ // for the children in the existing revision as well as the new ones
+ existingNames := make(map[string]int)
+ newNames := make(map[string]int)
+
+ for i, newChild := range children {
+ newNames[newChild.GetName()] = i
+ }
+
+ for i, existingChild := range existingChildren {
+ existingNames[existingChild.GetName()] = i
+
+ // If an existing entry is not in the new list, add it to the updated list, so it is not forgotten
+ if _, exists := newNames[existingChild.GetName()]; !exists {
+ updatedChildren = append(updatedChildren, existingChild)
+ }
+ }
+
+ log.Debugw("existing-children-names", log.Fields{"hash": npr.GetHash(), "names": existingNames})
+
+ // Merge existing and new children
+ for _, newChild := range children {
+ nameIndex, nameExists := existingNames[newChild.GetName()]
+
+ // Does the existing list contain a child with that name?
+ if nameExists {
+ // Check if the data has changed or not
+ if existingChildren[nameIndex].GetData().(proto.Message).String() != newChild.GetData().(proto.Message).String() {
+ // replace entry
+ newChild.GetNode().Root = existingChildren[nameIndex].GetNode().Root
+ updatedChildren = append(updatedChildren, newChild)
+ } else {
+ // keep existing entry
+ updatedChildren = append(updatedChildren, existingChildren[nameIndex])
+ }
+ } else {
+ // new entry ... just add it
+ updatedChildren = append(updatedChildren, newChild)
+ }
+ }
+
+ // Save children in new revision
+ updatedRev.SetChildren(name, updatedChildren)
+
+ updatedNames := make(map[string]int)
+ for i, updatedChild := range updatedChildren {
+ updatedNames[updatedChild.GetName()] = i
+ }
+
+ log.Debugw("updated-children-names", log.Fields{"hash": npr.GetHash(), "names": updatedNames})
+
+ } else {
+ // There are no children available, just save the provided ones
+ updatedRev.SetChildren(name, children)
+ }
+
updatedRev.Finalize(false)
return updatedRev
}
+// UpdateAllChildren will replace the current list of children with the provided ones
func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
@@ -330,6 +396,8 @@
newRev.Config = npr.Config
newRev.Hash = npr.Hash
newRev.Branch = branch
+ newRev.Name = npr.Name
+
newRev.Children = make(map[string][]Revision)
for entryName, childrenEntry := range children {
newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
@@ -339,14 +407,25 @@
return newRev
}
+// Drop is used to indicate when a revision is no longer required
func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
- GetRevCache().Lock()
- defer GetRevCache().Unlock()
+ log.Debugw("dropping-revision", log.Fields{"hash": npr.GetHash(), "stack": string(debug.Stack())})
+ npr.discarded = true
+}
- if includeConfig {
- delete(GetRevCache().Cache, npr.Config.Hash)
+// ChildDrop will remove a child entry matching the provided parameters from the current revision
+func (npr *NonPersistedRevision) ChildDrop(childType string, childHash string) {
+ if childType != "" {
+ children := make([]Revision, len(npr.GetChildren(childType)))
+ copy(children, npr.GetChildren(childType))
+ for i, child := range children {
+ if child.GetHash() == childHash {
+ children = append(children[:i], children[i+1:]...)
+ npr.SetChildren(childType, children)
+ break
+ }
+ }
}
- delete(GetRevCache().Cache, npr.Hash)
}
func (npr *NonPersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
@@ -361,3 +440,4 @@
func (pr *NonPersistedRevision) StorageDrop(txid string, includeConfig bool) {
// stub ... required by interface
}
+