VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/db/model/node.go b/db/model/node.go
index 207df09..fcd3b5f 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -20,6 +20,7 @@
// TODO: proper logging
import (
+ "context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
@@ -32,10 +33,6 @@
// When a branch has no transaction id, everything gets stored in NONE
const (
NONE string = "none"
-
- // period to determine when data requires a refresh (in seconds)
- // TODO: make this configurable?
- DATA_REFRESH_PERIOD int64 = 5000
)
// Node interface is an abstraction of the node data structure
@@ -43,10 +40,14 @@
MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
// CRUD functions
- Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
- Get(path string, hash string, depth int, deep bool, txid string) interface{}
- Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision
- Remove(path string, txid string, makeBranch MakeBranchFunction) Revision
+ Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
+ Get(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
+ List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
+ Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision
+ Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision
+ CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy
+
+ GetProxy() *Proxy
MakeBranch(txid string) *Branch
DeleteBranch(txid string)
@@ -55,16 +56,12 @@
MakeTxBranch() string
DeleteTxBranch(txid string)
FoldTxBranch(txid string)
-
- CreateProxy(path string, exclusive bool) *Proxy
- GetProxy() *Proxy
}
type node struct {
- mutex sync.RWMutex
- Root *root
- Type interface{}
-
+ mutex sync.RWMutex
+ Root *root
+ Type interface{}
Branches map[string]*Branch
Tags map[string]Revision
Proxy *Proxy
@@ -133,7 +130,7 @@
log.Debugw("saving-latest-data", log.Fields{"hash": revision.GetHash(), "data": revision.GetData()})
// Tag a timestamp to that revision
revision.SetLastUpdate()
- GetRevCache().Cache.Store(revision.GetName(), revision)
+ GetRevCache().Set(revision.GetName(), revision)
}
branch.SetLatest(revision)
}
@@ -148,13 +145,13 @@
for _, change := range changeAnnouncement {
log.Debugw("adding-callback",
log.Fields{
- "callbacks": n.Proxy.getCallbacks(change.Type),
+ "callbacks": n.GetProxy().getCallbacks(change.Type),
"type": change.Type,
"previousData": change.PreviousData,
"latestData": change.LatestData,
})
n.Root.AddCallback(
- n.Proxy.InvokeCallbacks,
+ n.GetProxy().InvokeCallbacks,
change.Type,
true,
change.PreviousData,
@@ -253,7 +250,7 @@
}
// Get retrieves the data from a node tree that resides at the specified path
-func (n *node) List(path string, hash string, depth int, deep bool, txid string) interface{} {
+func (n *node) List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{} {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -281,7 +278,7 @@
var result interface{}
var prList []interface{}
- if pr := rev.LoadFromPersistence(path, txid, nil); pr != nil {
+ if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil {
for _, revEntry := range pr {
prList = append(prList, revEntry.GetData())
}
@@ -292,7 +289,7 @@
}
// Get retrieves the data from a node tree that resides at the specified path
-func (n *node) Get(path string, hash string, depth int, reconcile bool, txid string) interface{} {
+func (n *node) Get(ctx context.Context, path string, hash string, depth int, reconcile bool, txid string) interface{} {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -323,9 +320,9 @@
// 1. Start with the cache which stores revisions by watch names
// 2. Then look in the revision tree, especially if it's a sub-path such as /devices/1234/flows
// 3. Move on to the KV store if that path cannot be found or if the entry has expired
- if entry, exists := GetRevCache().Cache.Load(path); exists && entry.(Revision) != nil {
+ if entry, exists := GetRevCache().Get(path); exists && entry.(Revision) != nil {
entryAge := time.Now().Sub(entry.(Revision).GetLastUpdate()).Nanoseconds() / int64(time.Millisecond)
- if entryAge < DATA_REFRESH_PERIOD {
+ if entryAge < DataRefreshPeriod {
log.Debugw("using-cache-entry", log.Fields{
"path": path,
"hash": hash,
@@ -335,7 +332,7 @@
} else {
log.Debugw("cache-entry-expired", log.Fields{"path": path, "hash": hash, "age": entryAge})
}
- } else if result = n.getPath(rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
+ } else if result = n.getPath(ctx, rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
log.Debugw("using-rev-tree-entry", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
return result
} else {
@@ -357,7 +354,7 @@
// If we got to this point, we are either trying to reconcile with the db
// or we simply failed at getting information from memory
if n.Root.KvStore != nil {
- if pr := rev.LoadFromPersistence(path, txid, nil); pr != nil && len(pr) > 0 {
+ if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil && len(pr) > 0 {
// Did we receive a single or multiple revisions?
if len(pr) > 1 {
var revs []interface{}
@@ -375,7 +372,7 @@
}
//getPath traverses the specified path and retrieves the data associated to it
-func (n *node) getPath(rev Revision, path string, depth int) interface{} {
+func (n *node) getPath(ctx context.Context, rev Revision, path string, depth int) interface{} {
if path == "" {
return n.getData(rev, depth)
}
@@ -406,7 +403,7 @@
return nil
} else {
childNode := childRev.GetNode()
- return childNode.getPath(childRev, path, depth)
+ return childNode.getPath(ctx, childRev, path, depth)
}
} else {
var response []interface{}
@@ -430,11 +427,13 @@
}
return response
}
+ } else if children := rev.GetChildren(name); children != nil && len(children) > 0 {
+ childRev := children[0]
+ childNode := childRev.GetNode()
+ return childNode.getPath(ctx, childRev, path, depth)
}
- childRev := rev.GetChildren(name)[0]
- childNode := childRev.GetNode()
- return childNode.getPath(childRev, path, depth)
+ return nil
}
// getData retrieves the data from a node revision
@@ -454,7 +453,7 @@
}
// Update changes the content of a node at the specified path with the provided data
-func (n *node) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
+func (n *node) Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -475,7 +474,7 @@
log.Debugf("Branch data : %+v, Passed data: %+v", branch.GetLatest().GetData(), data)
}
if path == "" {
- return n.doUpdate(branch, data, strict)
+ return n.doUpdate(ctx, branch, data, strict)
}
rev := branch.GetLatest()
@@ -493,7 +492,7 @@
var children []Revision
if field == nil {
- return n.doUpdate(branch, data, strict)
+ return n.doUpdate(ctx, branch, data, strict)
}
if field.IsContainer {
@@ -523,11 +522,11 @@
// Save proxy in child node to ensure callbacks are called later on
// only assign in cases of non sub-folder proxies, i.e. "/"
- if childNode.Proxy == nil && n.Proxy != nil && n.Proxy.getFullPath() == "" {
+ if childNode.Proxy == nil && n.Proxy != nil && n.GetProxy().getFullPath() == "" {
childNode.Proxy = n.Proxy
}
- newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+ newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
if newChildRev.GetHash() == childRev.GetHash() {
if newChildRev != childRev {
@@ -559,7 +558,7 @@
children = append(children, newChildRev)
}
- updatedRev := rev.UpdateChildren(name, children, branch)
+ updatedRev := rev.UpdateChildren(ctx, name, children, branch)
n.makeLatest(branch, updatedRev, nil)
updatedRev.ChildDrop(name, childRev.GetHash())
@@ -572,12 +571,12 @@
} else {
childRev := rev.GetChildren(name)[0]
childNode := childRev.GetNode()
- newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+ newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
branch.LatestLock.Lock()
defer branch.LatestLock.Unlock()
- updatedRev := rev.UpdateChildren(name, []Revision{newChildRev}, branch)
+ updatedRev := rev.UpdateChildren(ctx, name, []Revision{newChildRev}, branch)
n.makeLatest(branch, updatedRev, nil)
updatedRev.ChildDrop(name, childRev.GetHash())
@@ -588,7 +587,7 @@
return nil
}
-func (n *node) doUpdate(branch *Branch, data interface{}, strict bool) Revision {
+func (n *node) doUpdate(ctx context.Context, branch *Branch, data interface{}, strict bool) Revision {
log.Debugw("comparing-types", log.Fields{"expected": reflect.ValueOf(n.Type).Type(), "actual": reflect.TypeOf(data)})
if reflect.TypeOf(data) != reflect.ValueOf(n.Type).Type() {
@@ -613,7 +612,7 @@
log.Debugf("checking access violations")
}
- rev := branch.GetLatest().UpdateData(data, branch)
+ rev := branch.GetLatest().UpdateData(ctx, data, branch)
changes := []ChangeTuple{{POST_UPDATE, branch.GetLatest().GetData(), rev.GetData()}}
n.makeLatest(branch, rev, changes)
@@ -623,7 +622,7 @@
}
// Add inserts a new node at the specified path with the provided data
-func (n *node) Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
+func (n *node) Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -688,7 +687,7 @@
children = append(children, childRev)
- updatedRev := rev.UpdateChildren(name, children, branch)
+ updatedRev := rev.UpdateChildren(ctx, name, children, branch)
changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
childRev.SetupWatch(childRev.GetName())
@@ -718,7 +717,7 @@
}
childNode := childRev.GetNode()
- newChildRev := childNode.Add(path, data, txid, makeBranch)
+ newChildRev := childNode.Add(ctx, path, data, txid, makeBranch)
// Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
newChildRev.SetName(name + "/" + keyValue.(string))
@@ -732,7 +731,7 @@
children = append(children, newChildRev)
}
- updatedRev := rev.UpdateChildren(name, children, branch)
+ updatedRev := rev.UpdateChildren(ctx, name, children, branch)
n.makeLatest(branch, updatedRev, nil)
updatedRev.ChildDrop(name, childRev.GetHash())
@@ -749,7 +748,7 @@
}
// Remove eliminates a node at the specified path
-func (n *node) Remove(path string, txid string, makeBranch MakeBranchFunction) Revision {
+func (n *node) Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -805,7 +804,7 @@
if childNode.Proxy == nil {
childNode.Proxy = n.Proxy
}
- newChildRev := childNode.Remove(path, txid, makeBranch)
+ newChildRev := childNode.Remove(ctx, path, txid, makeBranch)
branch.LatestLock.Lock()
defer branch.LatestLock.Unlock()
@@ -833,7 +832,7 @@
}
childRev.StorageDrop(txid, true)
- GetRevCache().Cache.Delete(childRev.GetName())
+ GetRevCache().Delete(childRev.GetName())
branch.LatestLock.Lock()
defer branch.LatestLock.Unlock()
@@ -950,11 +949,11 @@
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ node Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// CreateProxy returns a reference to a sub-tree of the data model
-func (n *node) CreateProxy(path string, exclusive bool) *Proxy {
- return n.createProxy(path, path, n, exclusive)
+func (n *node) CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy {
+ return n.createProxy(ctx, path, path, n, exclusive)
}
-func (n *node) createProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+func (n *node) createProxy(ctx context.Context, path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
log.Debugw("node-create-proxy", log.Fields{
"node-type": reflect.ValueOf(n.Type).Type(),
"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
@@ -973,7 +972,6 @@
partition := strings.SplitN(path, "/", 2)
name := partition[0]
var nodeType interface{}
- // Node type is chosen depending on if we have reached the end of path or not
if len(partition) < 2 {
path = ""
nodeType = n.Type
@@ -1020,8 +1018,6 @@
children = make([]Revision, len(rev.GetChildren(name)))
copy(children, rev.GetChildren(name))
- // Try to find a matching revision in memory
- // If not found try the db
var childRev Revision
if _, childRev = n.findRevByKey(children, field.Key, keyValue); childRev != nil {
log.Debugw("found-revision-matching-key-in-memory", log.Fields{
@@ -1030,7 +1026,7 @@
"fullPath": fullPath,
"name": name,
})
- } else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(fullPath, "", nil); revs != nil && len(revs) > 0 {
+ } else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(ctx, fullPath, "", nil); revs != nil && len(revs) > 0 {
log.Debugw("found-revision-matching-key-in-db", log.Fields{
"node-type": reflect.ValueOf(n.Type).Type(),
"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
@@ -1048,7 +1044,7 @@
}
if childRev != nil {
childNode := childRev.GetNode()
- return childNode.createProxy(path, fullPath, n, exclusive)
+ return childNode.createProxy(ctx, path, fullPath, n, exclusive)
}
} else {
log.Errorw("cannot-access-index-of-empty-container", log.Fields{
@@ -1067,7 +1063,7 @@
})
childRev := rev.GetChildren(name)[0]
childNode := childRev.GetNode()
- return childNode.createProxy(path, fullPath, n, exclusive)
+ return childNode.createProxy(ctx, path, fullPath, n, exclusive)
}
} else {
log.Debugw("field-object-is-nil", log.Fields{
@@ -1116,12 +1112,12 @@
n.Proxy = NewProxy(r, n, parentNode, path, fullPath, exclusive)
} else {
log.Debugw("node-has-existing-proxy", log.Fields{
- "node-type": reflect.ValueOf(n.Proxy.Node.Type).Type(),
- "parent-node-type": reflect.ValueOf(n.Proxy.ParentNode.Type).Type(),
- "path": n.Proxy.Path,
- "fullPath": n.Proxy.FullPath,
+ "node-type": reflect.ValueOf(n.GetProxy().Node.Type).Type(),
+ "parent-node-type": reflect.ValueOf(n.GetProxy().ParentNode.Type).Type(),
+ "path": n.GetProxy().Path,
+ "fullPath": n.GetProxy().FullPath,
})
- if n.Proxy.Exclusive {
+ if n.GetProxy().Exclusive {
log.Error("node is already owned exclusively")
}
}
@@ -1160,3 +1156,6 @@
func (n *node) GetRoot() *root {
return n.Root
}
+func (n *node) SetRoot(root *root) {
+ n.Root = root
+}