VOL-1334 : Fixed concurrency issues
- Semaphores were added at the different layers of the model
- Made the proxy interfaces more robust
- Eliminated problems while retrieving latest data in concurrent mode
Change-Id: I7854105d7effa10e5cb704f5d9917569ab184f84
diff --git a/db/model/root.go b/db/model/root.go
index 45eafb9..a05fbdd 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -21,25 +22,36 @@
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"reflect"
+ "sync"
"time"
)
+// Root is used to provide an abstraction to the base root structure
type Root interface {
Node
+ Load(rootClass interface{}) *root
+
+ ExecuteCallbacks()
+ AddCallback(callback CallbackFunction, args ...interface{})
+ AddNotificationCallback(callback CallbackFunction, args ...interface{})
}
+// root points to the top of the data model tree or sub-tree identified by a proxy
type root struct {
*node
Callbacks []CallbackTuple
NotificationCallbacks []CallbackTuple
- DirtyNodes map[string][]*node
- KvStore *Backend
- Loading bool
- RevisionClass interface{}
+ DirtyNodes map[string][]*node
+ KvStore *Backend
+ Loading bool
+ RevisionClass interface{}
+
+ mutex sync.RWMutex
}
+// NewRoot creates an new instance of a root object
func NewRoot(initialData interface{}, kvStore *Backend) *root {
root := &root{}
@@ -47,27 +59,34 @@
root.DirtyNodes = make(map[string][]*node)
root.Loading = false
+ // If there is no storage in place just revert to
+ // a non persistent mechanism
if kvStore != nil {
root.RevisionClass = reflect.TypeOf(PersistedRevision{})
} else {
root.RevisionClass = reflect.TypeOf(NonPersistedRevision{})
}
+
root.Callbacks = []CallbackTuple{}
root.NotificationCallbacks = []CallbackTuple{}
- root.node = NewNode(root, initialData,false, "")
+ root.node = NewNode(root, initialData, false, "")
return root
}
+// MakeTxBranch creates a new transaction branch
func (r *root) MakeTxBranch() string {
- txid_bin, _ := uuid.New().MarshalBinary()
- txid := hex.EncodeToString(txid_bin)[:12]
+ txidBin, _ := uuid.New().MarshalBinary()
+ txid := hex.EncodeToString(txidBin)[:12]
+
r.DirtyNodes[txid] = []*node{r.node}
r.node.MakeBranch(txid)
+
return txid
}
+// DeleteTxBranch removes a transaction branch
func (r *root) DeleteTxBranch(txid string) {
for _, dirtyNode := range r.DirtyNodes[txid] {
dirtyNode.DeleteBranch(txid)
@@ -75,16 +94,25 @@
delete(r.DirtyNodes, txid)
}
+// FoldTxBranch will merge the contents of a transaction branch with the root object
func (r *root) FoldTxBranch(txid string) {
- if _, err := r.MergeBranch(txid, true); err != nil {
+ // Start by doing a dry run of the merge
+ // If that fails, it bails out and the branch is deleted
+ if _, err := r.node.MergeBranch(txid, true); err != nil {
+ // Merge operation fails
r.DeleteTxBranch(txid)
} else {
- r.MergeBranch(txid, false)
+ r.node.MergeBranch(txid, false)
r.ExecuteCallbacks()
}
}
+// ExecuteCallbacks will invoke all the callbacks linked to root object
func (r *root) ExecuteCallbacks() {
+ r.mutex.Lock()
+ log.Debugf("ExecuteCallbacks has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("ExecuteCallbacks released the ROOT lock : %+v", r)
for len(r.Callbacks) > 0 {
callback := r.Callbacks[0]
r.Callbacks = r.Callbacks[1:]
@@ -97,25 +125,55 @@
}
}
-func (r *root) HasCallbacks() bool {
+func (r *root) hasCallbacks() bool {
return len(r.Callbacks) == 0
}
+// getCallbacks returns the available callbacks
+func (r *root) GetCallbacks() []CallbackTuple {
+ r.mutex.Lock()
+ log.Debugf("getCallbacks has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("getCallbacks released the ROOT lock : %+v", r)
+ return r.Callbacks
+}
+
+// getCallbacks returns the available notification callbacks
+func (r *root) GetNotificationCallbacks() []CallbackTuple {
+ r.mutex.Lock()
+ log.Debugf("GetNotificationCallbacks has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("GetNotificationCallbacks released the ROOT lock : %+v", r)
+ return r.NotificationCallbacks
+}
+
+// AddCallback inserts a new callback with its arguments
func (r *root) AddCallback(callback CallbackFunction, args ...interface{}) {
+ r.mutex.Lock()
+ log.Debugf("AddCallback has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("AddCallback released the ROOT lock : %+v", r)
r.Callbacks = append(r.Callbacks, CallbackTuple{callback, args})
}
+
+// AddNotificationCallback inserts a new notification callback with its arguments
func (r *root) AddNotificationCallback(callback CallbackFunction, args ...interface{}) {
+ r.mutex.Lock()
+ log.Debugf("AddNotificationCallback has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("AddNotificationCallback released the ROOT lock : %+v", r)
r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
}
+// Update modifies the content of an object at a given path with the provided data
func (r *root) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
- if makeBranch == nil {
+ if makeBranch != nil {
// TODO: raise error
}
- if r.HasCallbacks() {
+ if r.hasCallbacks() {
// TODO: raise error
}
@@ -129,19 +187,20 @@
result = r.node.Update(path, data, strict, "", nil)
}
- r.node.ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks()
return result
}
+// Add creates a new object at the given path with the provided data
func (r *root) Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
- if makeBranch == nil {
+ if makeBranch != nil {
// TODO: raise error
}
- if r.HasCallbacks() {
+ if r.hasCallbacks() {
// TODO: raise error
}
@@ -155,19 +214,20 @@
result = r.node.Add(path, data, "", nil)
}
- r.node.ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks()
return result
}
+// Remove discards an object at a given path
func (r *root) Remove(path string, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
- if makeBranch == nil {
+ if makeBranch != nil {
// TODO: raise error
}
- if r.HasCallbacks() {
+ if r.hasCallbacks() {
// TODO: raise error
}
@@ -181,33 +241,43 @@
result = r.node.Remove(path, "", nil)
}
- r.node.ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks()
return result
}
+// Load retrieves data from a persistent storage
func (r *root) Load(rootClass interface{}) *root {
//fakeKvStore := &Backend{}
//root := NewRoot(rootClass, nil)
//root.KvStore = r.KvStore
- r.LoadFromPersistence(rootClass)
+ r.loadFromPersistence(rootClass)
return r
}
+// MakeLatest updates a branch with the latest node revision
func (r *root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
r.makeLatest(branch, revision, changeAnnouncement)
}
+func (r *root) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ if r.RevisionClass.(reflect.Type) == reflect.TypeOf(PersistedRevision{}) {
+ return NewPersistedRevision(branch, data, children)
+ }
+
+ return NewNonPersistedRevision(r, branch, data, children)
+}
+
func (r *root) makeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
r.node.makeLatest(branch, revision, changeAnnouncement)
if r.KvStore != nil && branch.Txid == "" {
tags := make(map[string]string)
- for k, v := range r.Tags {
+ for k, v := range r.node.Tags {
tags[k] = v.GetHash()
}
data := &rootData{
- Latest: branch.Latest.GetHash(),
+ Latest: branch.GetLatest().GetHash(),
Tags: tags,
}
if blob, err := json.Marshal(data); err != nil {
@@ -226,7 +296,7 @@
Tags map[string]string `json:tags`
}
-func (r *root) LoadFromPersistence(rootClass interface{}) {
+func (r *root) loadFromPersistence(rootClass interface{}) {
var data rootData
r.Loading = true
@@ -239,10 +309,10 @@
stop := time.Now()
GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
for tag, hash := range data.Tags {
- r.LoadLatest(hash)
- r.Tags[tag] = r.Latest()
+ r.node.LoadLatest(hash)
+ r.node.Tags[tag] = r.node.Latest()
}
- r.LoadLatest(data.Latest)
+ r.node.LoadLatest(data.Latest)
r.Loading = false
}