VOL-1334 : Fixed concurrency issues
- Semaphores were added at the different layers of the model
- Made the proxy interfaces more robust
- Eliminated problems while retrieving latest data in concurrent mode
Change-Id: I7854105d7effa10e5cb704f5d9917569ab184f84
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 29cadf7..811e35d 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -15,6 +15,7 @@
*/
package model
+import "C"
import (
"bytes"
"crypto/md5"
@@ -22,13 +23,27 @@
"github.com/golang/protobuf/proto"
"reflect"
"sort"
+ "sync"
)
-var (
- RevisionCache = make(map[string]interface{})
-)
+type revCacheSingleton struct {
+ sync.RWMutex
+ Cache map[string]interface{}
+}
+
+var revCacheInstance *revCacheSingleton
+var revCacheOnce sync.Once
+
+func GetRevCache() *revCacheSingleton {
+ revCacheOnce.Do(func() {
+ revCacheInstance = &revCacheSingleton{Cache: make(map[string]interface{})}
+ })
+ return revCacheInstance
+}
type NonPersistedRevision struct {
+ mutex sync.RWMutex
+ Root *root
Config *DataRevision
Children map[string][]Revision
Hash string
@@ -36,53 +51,73 @@
WeakRef string
}
-func NewNonPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
- cr := &NonPersistedRevision{}
- cr.Branch = branch
- cr.Config = NewDataRevision(data)
- cr.Children = children
- cr.Finalize()
-
- return cr
+func NewNonPersistedRevision(root *root, branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ r := &NonPersistedRevision{}
+ r.Root = root
+ r.Branch = branch
+ r.Config = NewDataRevision(root, data)
+ r.Children = children
+ r.Finalize()
+ return r
}
func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Config = config
}
func (npr *NonPersistedRevision) GetConfig() *DataRevision {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
return npr.Config
}
func (npr *NonPersistedRevision) SetChildren(children map[string][]Revision) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Children = children
}
func (npr *NonPersistedRevision) GetChildren() map[string][]Revision {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
return npr.Children
}
func (npr *NonPersistedRevision) SetHash(hash string) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Hash = hash
}
func (npr *NonPersistedRevision) GetHash() string {
+ //npr.mutex.Lock()
+ //defer npr.mutex.Unlock()
return npr.Hash
}
func (npr *NonPersistedRevision) ClearHash() {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Hash = ""
}
func (npr *NonPersistedRevision) SetBranch(branch *Branch) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Branch = branch
}
func (npr *NonPersistedRevision) GetBranch() *Branch {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
return npr.Branch
}
func (npr *NonPersistedRevision) GetData() interface{} {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
if npr.Config == nil {
return nil
}
@@ -90,19 +125,24 @@
}
func (npr *NonPersistedRevision) GetNode() *node {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
return npr.Branch.Node
}
func (npr *NonPersistedRevision) Finalize() {
- npr.SetHash(npr.hashContent())
+ GetRevCache().Lock()
+ defer GetRevCache().Unlock()
- if _, exists := RevisionCache[npr.Hash]; !exists {
- RevisionCache[npr.Hash] = npr
+ npr.Hash = npr.hashContent()
+
+ if _, exists := GetRevCache().Cache[npr.Hash]; !exists {
+ GetRevCache().Cache[npr.Hash] = npr
}
- if _, exists := RevisionCache[npr.Config.Hash]; !exists {
- RevisionCache[npr.Config.Hash] = npr.Config
+ if _, exists := GetRevCache().Cache[npr.Config.Hash]; !exists {
+ GetRevCache().Cache[npr.Config.Hash] = npr.Config
} else {
- npr.Config = RevisionCache[npr.Config.Hash].(*DataRevision)
+ npr.Config = GetRevCache().Cache[npr.Config.Hash].(*DataRevision)
}
}
@@ -114,12 +154,13 @@
buffer.WriteString(npr.Config.Hash)
}
- for key, _ := range npr.Children {
+ for key := range npr.Children {
childrenKeys = append(childrenKeys, key)
}
+
sort.Strings(childrenKeys)
- if npr.Children != nil && len(npr.Children) > 0 {
+ if len(npr.Children) > 0 {
// Loop through sorted Children keys
for _, key := range childrenKeys {
for _, child := range npr.Children[key] {
@@ -134,30 +175,38 @@
}
func (npr *NonPersistedRevision) Get(depth int) interface{} {
- originalData := npr.GetData()
- data := reflect.ValueOf(originalData).Interface()
+ // 1. Clone the data to avoid any concurrent access issues
+ // 2. The current rev might still be pointing to an old config
+ // thus, force the revision to get its latest value
+ latestRev := npr.GetBranch().GetLatest()
+ originalData := proto.Clone(latestRev.GetData().(proto.Message))
+
+ data := originalData
+ // Get back to the interface type
+ //data := reflect.ValueOf(originalData).Interface()
if depth != 0 {
- for fieldName, field := range ChildrenFields(npr.GetData()) {
+ for fieldName, field := range ChildrenFields(latestRev.GetData()) {
childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
if field.IsContainer {
- for _, rev := range npr.Children[fieldName] {
+ for _, rev := range latestRev.GetChildren()[fieldName] {
childData := rev.Get(depth - 1)
foundEntry := false
for i := 0; i < childDataHolder.Len(); i++ {
- if reflect.DeepEqual(childDataHolder.Index(i).Interface(), childData) {
+ cdh_if := childDataHolder.Index(i).Interface()
+ if cdh_if.(proto.Message).String() == childData.(proto.Message).String() {
foundEntry = true
break
}
}
if !foundEntry {
- // avoid duplicates by adding if the child was not found in the holder
+ // avoid duplicates by adding it only if the child was not found in the holder
childDataHolder = reflect.Append(childDataHolder, reflect.ValueOf(childData))
}
}
} else {
- if revs := npr.Children[fieldName]; revs != nil && len(revs) > 0 {
- rev := npr.Children[fieldName][0]
+ if revs := latestRev.GetChildren()[fieldName]; revs != nil && len(revs) > 0 {
+ rev := latestRev.GetChildren()[fieldName][0]
if rev != nil {
childData := rev.Get(depth - 1)
if reflect.TypeOf(childData) == reflect.TypeOf(childDataHolder.Interface()) {
@@ -174,52 +223,81 @@
result := data
if result != nil {
- clone := proto.Clone(data.(proto.Message))
- result = reflect.ValueOf(clone).Interface()
+ // We need to send back a copy of the retrieved object
+ result = proto.Clone(data.(proto.Message))
}
return result
}
func (npr *NonPersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
- // TODO: Need to keep the hash for the old revision.
- // TODO: This will allow us to get rid of the unnecessary data
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
- newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
- newRev.SetBranch(branch)
- newRev.SetConfig(NewDataRevision(data))
+ newRev := NonPersistedRevision{}
+ newRev.Config = NewDataRevision(npr.Root, data)
+ newRev.Hash = npr.Hash
+ newRev.Branch = branch
+
+ newRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range npr.Children {
+ newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
+ }
+
newRev.Finalize()
return &newRev
}
func (npr *NonPersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
- newChildren := make(map[string][]Revision)
- for entryName, childrenEntry := range npr.Children {
- for _, revisionEntry := range childrenEntry {
- newEntry := reflect.ValueOf(revisionEntry).Interface().(Revision)
- newChildren[entryName] = append(newChildren[entryName], newEntry)
- }
- }
- newChildren[name] = children
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
- newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
- newRev.SetBranch(branch)
- newRev.SetChildren(newChildren)
+ newRev := NonPersistedRevision{}
+ newRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range npr.Children {
+ newRev.Children[entryName] = make([]Revision, len(childrenEntry))
+ copy(newRev.Children[entryName], childrenEntry)
+ }
+
+ newRev.Children[name] = make([]Revision, len(children))
+ copy(newRev.Children[name], children)
+
+ newRev.Config = NewDataRevision(npr.Root, npr.Config.Data)
+ newRev.Hash = npr.Hash
+ newRev.Branch = branch
newRev.Finalize()
return &newRev
}
func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
- newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
- newRev.SetBranch(branch)
- newRev.SetChildren(children)
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+
+ newRev := &NonPersistedRevision{}
+ newRev.Config = npr.Config
+ newRev.Hash = npr.Hash
+ newRev.Branch = branch
+ newRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range npr.Children {
+ newRev.Children[entryName] = make([]Revision, len(childrenEntry))
+ copy(newRev.Children[entryName], childrenEntry)
+ }
newRev.Finalize()
- return &newRev
+ return newRev
}
func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
- //npr.SetConfig(nil)
+ GetRevCache().Lock()
+ defer GetRevCache().Unlock()
+
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+
+ if includeConfig {
+ delete(GetRevCache().Cache, npr.Config.Hash)
+ }
+ delete(GetRevCache().Cache, npr.Hash)
}