VOL-1584: Fix for multi-core cli infinite loop
- Reduced number of calls to kv
- Re-introduced cache logic for in-memory data access
- Misc log updates
Amendments:
- Ensure that we clone the returned cache data
- Give priority to cache but use alternate get methods otherwise
Change-Id: I56ce67f22d9945b7a194f4c6aab0c7fd75dd2f2e
diff --git a/db/model/branch.go b/db/model/branch.go
index ca89df0..5502e63 100644
--- a/db/model/branch.go
+++ b/db/model/branch.go
@@ -93,18 +93,21 @@
// Go through list of children names in current revision and new revision
// and then compare the resulting outputs to ensure that we have not lost any entries.
- var previousNames, latestNames, missingNames []string
- if previousNames = b.retrieveChildrenNames(b.Latest); len(previousNames) > 0 {
- log.Debugw("children-of-previous-revision", log.Fields{"hash": b.Latest.GetHash(), "names": previousNames})
- }
+ if level, _ := log.GetPackageLogLevel(); level == log.DebugLevel {
+ var previousNames, latestNames, missingNames []string
- if latestNames = b.retrieveChildrenNames(b.Latest); len(latestNames) > 0 {
- log.Debugw("children-of-latest-revision", log.Fields{"hash": latest.GetHash(), "names": latestNames})
- }
+ if previousNames = b.retrieveChildrenNames(b.Latest); len(previousNames) > 0 {
+ log.Debugw("children-of-previous-revision", log.Fields{"hash": b.Latest.GetHash(), "names": previousNames})
+ }
- if missingNames = b.findMissingChildrenNames(previousNames, latestNames); len(missingNames) > 0 {
- log.Debugw("children-missing-in-latest-revision", log.Fields{"hash": latest.GetHash(), "names": missingNames})
+ if latestNames = b.retrieveChildrenNames(b.Latest); len(latestNames) > 0 {
+ log.Debugw("children-of-latest-revision", log.Fields{"hash": latest.GetHash(), "names": latestNames})
+ }
+
+ if missingNames = b.findMissingChildrenNames(previousNames, latestNames); len(missingNames) > 0 {
+ log.Debugw("children-missing-in-latest-revision", log.Fields{"hash": latest.GetHash(), "names": missingNames})
+ }
}
} else {
diff --git a/db/model/node.go b/db/model/node.go
index 7bfdca0..9ec5ce9 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -24,7 +24,6 @@
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"reflect"
- "runtime/debug"
"strings"
"sync"
)
@@ -127,6 +126,9 @@
// If anything is new, then set the revision as the latest
if branch.GetLatest() == nil || revision.GetHash() != branch.GetLatest().GetHash() {
+ if revision.GetName() != "" {
+ GetRevCache().Cache.Store(revision.GetName(), revision)
+ }
branch.SetLatest(revision)
}
@@ -275,7 +277,7 @@
var result interface{}
var prList []interface{}
- if pr := rev.LoadFromPersistence(path, txid); pr != nil {
+ if pr := rev.LoadFromPersistence(path, txid, nil); pr != nil {
for _, revEntry := range pr {
prList = append(prList, revEntry.GetData())
}
@@ -288,6 +290,7 @@
// Get retrieves the data from a node tree that resides at the specified path
func (n *node) Get(path string, hash string, depth int, reconcile bool, txid string) interface{} {
log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
+
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -307,9 +310,15 @@
var result interface{}
- // If there is not request to reconcile, try to get it from memory
+ // If there is no request to reconcile, try to get it from memory
if !reconcile {
- if result = n.getPath(rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
+ // Try to find an entry matching the path value from one of these sources
+ // 1. Start with the cache which stores revisions by watch names
+ // 2. Then look in the revision tree, especially if it's a sub-path such as /devices/1234/flows
+ // 3. As a last effort, move on to the KV store
+ if entry, exists := GetRevCache().Cache.Load(path); exists && entry.(Revision) != nil {
+ return proto.Clone(entry.(Revision).GetData().(proto.Message))
+ } else if result = n.getPath(rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
return result
}
}
@@ -317,14 +326,14 @@
// If we got to this point, we are either trying to reconcile with the db or
// or we simply failed at getting information from memory
if n.Root.KvStore != nil {
- var prList []interface{}
- if pr := rev.LoadFromPersistence(path, txid); pr != nil && len(pr) > 0 {
+ if pr := rev.LoadFromPersistence(path, txid, nil); pr != nil && len(pr) > 0 {
// Did we receive a single or multiple revisions?
if len(pr) > 1 {
+ var revs []interface{}
for _, revEntry := range pr {
- prList = append(prList, revEntry.GetData())
+ revs = append(revs, revEntry.GetData())
}
- result = prList
+ result = revs
} else {
result = pr[0].GetData()
}
@@ -334,7 +343,7 @@
return result
}
-// getPath traverses the specified path and retrieves the data associated to it
+//getPath traverses the specified path and retrieves the data associated to it
func (n *node) getPath(rev Revision, path string, depth int) interface{} {
if path == "" {
return n.getData(rev, depth)
@@ -472,6 +481,7 @@
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
if childRev == nil {
+ log.Debugw("child-revision-is-nil", log.Fields{"key": keyValue})
return branch.GetLatest()
}
@@ -490,6 +500,7 @@
log.Debug("clear-hash - %s %+v", newChildRev.GetHash(), newChildRev)
newChildRev.ClearHash()
}
+ log.Debugw("child-revisions-have-matching-hash", log.Fields{"hash": childRev.GetHash(), "key": keyValue})
return branch.GetLatest()
}
@@ -505,15 +516,15 @@
// Prefix the hash value with the data type (e.g. devices, logical_devices, adapters)
newChildRev.SetName(name + "/" + _keyValueType)
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
if idx >= 0 {
children[idx] = newChildRev
} else {
children = append(children, newChildRev)
}
- branch.LatestLock.Lock()
- defer branch.LatestLock.Unlock()
-
updatedRev := rev.UpdateChildren(name, children, branch)
n.makeLatest(branch, updatedRev, nil)
@@ -544,13 +555,11 @@
}
func (n *node) doUpdate(branch *Branch, data interface{}, strict bool) Revision {
- log.Debugf("Comparing types - expected: %+v, actual: %+v &&&&&& %s", reflect.ValueOf(n.Type).Type(),
- reflect.TypeOf(data),
- string(debug.Stack()))
+ log.Debugw("comparing-types", log.Fields{"expected": reflect.ValueOf(n.Type).Type(), "actual": reflect.TypeOf(data)})
if reflect.TypeOf(data) != reflect.ValueOf(n.Type).Type() {
// TODO raise error
- log.Errorf("data does not match type: %+v", n.Type)
+ log.Errorw("types-do-not-match: %+v", log.Fields{"actual": reflect.TypeOf(data), "expected": n.Type})
return nil
}
@@ -644,6 +653,7 @@
updatedRev := rev.UpdateChildren(name, children, branch)
changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
+ childRev.GetNode().SetProxy(n.GetProxy())
childRev.SetupWatch(childRev.GetName())
n.makeLatest(branch, updatedRev, changes)
@@ -675,7 +685,10 @@
newChildRev := childNode.Add(path, data, txid, makeBranch)
// Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
- childRev.SetName(name + "/" + keyValue.(string))
+ newChildRev.SetName(name + "/" + keyValue.(string))
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
if idx >= 0 {
children[idx] = newChildRev
@@ -683,9 +696,6 @@
children = append(children, newChildRev)
}
- branch.LatestLock.Lock()
- defer branch.LatestLock.Unlock()
-
updatedRev := rev.UpdateChildren(name, children, branch)
n.makeLatest(branch, updatedRev, nil)
@@ -758,15 +768,15 @@
}
newChildRev := childNode.Remove(path, txid, makeBranch)
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
if idx >= 0 {
children[idx] = newChildRev
} else {
children = append(children, newChildRev)
}
- branch.LatestLock.Lock()
- defer branch.LatestLock.Unlock()
-
rev.SetChildren(name, children)
branch.GetLatest().Drop(txid, false)
n.makeLatest(branch, rev, nil)
@@ -784,6 +794,7 @@
}
childRev.StorageDrop(txid, true)
+ GetRevCache().Cache.Delete(childRev.GetName())
branch.LatestLock.Lock()
defer branch.LatestLock.Unlock()
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 0ccc58e..d7b0b58 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -21,30 +21,28 @@
"fmt"
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
"reflect"
- "runtime/debug"
"sort"
"sync"
)
// TODO: Cache logic will have to be revisited to cleanup unused entries in memory (disabled for now)
//
-//type revCacheSingleton struct {
-// sync.RWMutex
-// //Cache map[string]interface{}
-// Cache sync.Map
-//}
-//
-//var revCacheInstance *revCacheSingleton
-//var revCacheOnce sync.Once
-//
-//func GetRevCache() *revCacheSingleton {
-// revCacheOnce.Do(func() {
-// //revCacheInstance = &revCacheSingleton{Cache: make(map[string]interface{})}
-// revCacheInstance = &revCacheSingleton{Cache: sync.Map{}}
-// })
-// return revCacheInstance
-//}
+type revCacheSingleton struct {
+ sync.RWMutex
+ Cache sync.Map
+}
+
+var revCacheInstance *revCacheSingleton
+var revCacheOnce sync.Once
+
+func GetRevCache() *revCacheSingleton {
+ revCacheOnce.Do(func() {
+ revCacheInstance = &revCacheSingleton{Cache: sync.Map{}}
+ })
+ return revCacheInstance
+}
type NonPersistedRevision struct {
mutex sync.RWMutex
@@ -409,7 +407,7 @@
// Drop is used to indicate when a revision is no longer required
func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
- log.Debugw("dropping-revision", log.Fields{"hash": npr.GetHash(), "stack": string(debug.Stack())})
+ log.Debugw("dropping-revision", log.Fields{"hash": npr.GetHash(), "name": npr.GetName()})
npr.discarded = true
}
@@ -428,7 +426,7 @@
}
}
-func (npr *NonPersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+func (npr *NonPersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
// stub... required by interface
return nil
}
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index a56b776..ea99cf7 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -23,7 +23,6 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"reflect"
- "runtime/debug"
"strings"
"sync"
)
@@ -72,10 +71,7 @@
return
}
- if pair, _ := pr.kvStore.Get(pr.GetName()); pair != nil && skipOnExist {
- log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash(), "name": pr.GetName()})
- return
- }
+ log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
// TODO report error
@@ -89,10 +85,9 @@
}
if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
- log.Warnw("problem-storing-revision-config",
- log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
+ log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
} else {
- log.Debugw("storing-revision-config", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
+ log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
pr.isStored = true
}
}
@@ -100,7 +95,7 @@
func (pr *PersistedRevision) SetupWatch(key string) {
if key == "" {
- log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+ log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
return
}
@@ -111,7 +106,7 @@
if pr.events == nil {
pr.events = make(chan *kvstore.Event)
- log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+ log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
pr.SetName(key)
pr.events = pr.kvStore.CreateWatch(key)
@@ -120,7 +115,7 @@
if !pr.isWatched {
pr.isWatched = true
- log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+ log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
// Start watching
go pr.startWatching()
@@ -128,7 +123,7 @@
}
func (pr *PersistedRevision) startWatching() {
- log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "stack": string(debug.Stack())})
+ log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
StopWatchLoop:
for {
@@ -154,17 +149,106 @@
case kvstore.PUT:
log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
- if dataPair, err := pr.kvStore.Get(pr.GetName()); err != nil || dataPair == nil {
- log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
- } else {
- data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
+ data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
- if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
- log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
- } else {
- if pr.GetNode().GetProxy() != nil {
- pr.LoadFromPersistence(pr.GetNode().GetProxy().getFullPath(), "")
+ if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
+ log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
+ } else {
+ var pathLock string
+ var pac *proxyAccessControl
+ var blobs map[string]*kvstore.KVPair
+
+ // The watch reported new persistence data.
+ // Construct an object that will be used to update the memory
+ blobs = make(map[string]*kvstore.KVPair)
+ key, _ := kvstore.ToString(event.Key)
+ blobs[key] = &kvstore.KVPair{
+ Key: key,
+ Value: event.Value,
+ Session: "",
+ Lease: 0,
+ }
+
+ if pr.GetNode().GetProxy() != nil {
+ //
+ // If a proxy exists for this revision, use it to lock access to the path
+ // and prevent simultaneous updates to the object in memory
+ //
+ pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+
+ //If the proxy already has a request in progress, then there is no need to process the watch
+ log.Debugw("checking-if-path-is-locked", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+ if PAC().IsReserved(pathLock) {
+ log.Debugw("operation-in-progress", log.Fields{
+ "key": pr.GetHash(),
+ "path": pr.GetNode().GetProxy().getFullPath(),
+ "operation": pr.GetNode().GetRoot().GetProxy().Operation,
+ })
+
+ continue
+
+ // TODO/FIXME: keep logic for now in case we need to control based on running operation
+ //
+ // The code below seems to revert the in-memory/persistence value (occasionally) with
+ // the one received from the watch event.
+ //
+ // The same problem may occur, in the scenario where the core owning a device
+ // receives a watch event for an update made by another core, and when the owning core is
+ // also processing an update. Need to investigate...
+ //
+ //switch pr.GetNode().GetRoot().GetProxy().Operation {
+ //case PROXY_UPDATE:
+ // // We will need to reload once the update operation completes.
+ // // Therefore, the data of the current event is most likely out-dated
+ // // and should be ignored
+ // log.Debugw("reload-required", log.Fields{
+ // "key": pr.GetHash(),
+ // "path": pr.GetNode().GetProxy().getFullPath(),
+ // "operation": pr.GetNode().GetRoot().GetProxy().Operation,
+ // })
+ //
+ // // Eliminate the object constructed earlier
+ // blobs = nil
+ //
+ //case PROXY_ADD:
+ // fallthrough
+ //
+ //case PROXY_REMOVE:
+ // fallthrough
+ //
+ //case PROXY_GET:
+ // fallthrough
+ //
+ //default:
+ // // No need to process the event ... move on
+ // log.Debugw("", log.Fields{
+ // "key": pr.GetHash(),
+ // "path": pr.GetNode().GetProxy().getFullPath(),
+ // "operation": pr.GetNode().GetRoot().GetProxy().Operation,
+ // })
+ //
+ // continue
+ //}
}
+
+ // Reserve the path to prevent others to modify while we reload from persistence
+ log.Debugw("reserve-and-lock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
+ pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+ pac.SetProxy(pr.GetNode().GetProxy())
+ pac.lock()
+
+ // Load changes and apply to memory
+ pr.LoadFromPersistence(pr.GetName(), "", blobs)
+
+ log.Debugw("release-and-unlock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
+ pac.unlock()
+ PAC().ReleasePath(pathLock)
+
+ } else {
+ log.Debugw("revision-with-no-proxy", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+
+ // Load changes and apply to memory
+ pr.LoadFromPersistence(pr.GetName(), "", blobs)
}
}
@@ -176,7 +260,7 @@
Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
- log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "stack": string(debug.Stack())})
+ log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
}
// UpdateData modifies the information in the data model and saves it in the persistent storage
@@ -196,7 +280,7 @@
newPR.isWatched = false
newPR.isStored = false
pr.Drop(branch.Txid, false)
- newPR.SetupWatch(newPR.GetName())
+ pr.Drop(branch.Txid, false)
} else {
newPR.isWatched = true
newPR.isStored = true
@@ -206,7 +290,8 @@
}
// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateChildren(name string, children []Revision,
+ branch *Branch) Revision {
log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
newNPR := pr.Revision.UpdateChildren(name, children, branch)
@@ -222,7 +307,6 @@
newPR.isWatched = false
newPR.isStored = false
pr.Drop(branch.Txid, false)
- newPR.SetupWatch(newPR.GetName())
} else {
newPR.isWatched = true
newPR.isStored = true
@@ -248,7 +332,6 @@
newPR.isWatched = false
newPR.isStored = false
pr.Drop(branch.Txid, false)
- newPR.SetupWatch(newPR.GetName())
} else {
newPR.isWatched = true
newPR.isStored = true
@@ -267,7 +350,7 @@
// and its associated config when required
func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
log.Debugw("dropping-revision",
- log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
+ log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
pr.mutex.Lock()
defer pr.mutex.Unlock()
@@ -297,25 +380,28 @@
// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
- rev := pr
+ //rev := pr
- children := make([]Revision, len(rev.GetBranch().GetLatest().GetChildren(typeName)))
- copy(children, rev.GetBranch().GetLatest().GetChildren(typeName))
+ children := make([]Revision, len(pr.GetBranch().GetLatest().GetChildren(typeName)))
+ copy(children, pr.GetBranch().GetLatest().GetChildren(typeName))
// Verify if the revision contains a child that matches that key
- if childIdx, childRev := rev.GetNode().findRevByKey(rev.GetBranch().GetLatest().GetChildren(typeName), keyName, keyValue); childRev != nil {
+ if childIdx, childRev := pr.GetNode().findRevByKey(pr.GetBranch().GetLatest().GetChildren(typeName), keyName,
+ keyValue); childRev != nil {
// A child matching the provided key exists in memory
// Verify if the data differs to what was retrieved from persistence
if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
+ "data": childRev.GetData(),
})
// Data has changed; replace the child entry and update the parent revision
- updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
- updatedChildRev.SetupWatch(updatedChildRev.GetName())
childRev.Drop(txid, false)
+ updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+ updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
+ updatedChildRev.SetupWatch(updatedChildRev.GetName())
if childIdx >= 0 {
children[childIdx] = updatedChildRev
@@ -323,18 +409,19 @@
children = append(children, updatedChildRev)
}
- rev.GetBranch().LatestLock.Lock()
- updatedRev := rev.UpdateChildren(typeName, children, rev.GetBranch())
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
- rev.GetBranch().LatestLock.Unlock()
+ pr.GetBranch().LatestLock.Lock()
+ updatedRev := pr.GetBranch().Node.Latest().UpdateChildren(typeName, children, pr.GetBranch())
+ pr.GetBranch().Node.makeLatest(pr.GetBranch(), updatedRev, nil)
+ pr.GetBranch().LatestLock.Unlock()
// Drop the previous child revision
- rev.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
+ pr.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
if updatedChildRev != nil {
log.Debugw("verify-persisted-entry--adding-child", log.Fields{
"key": updatedChildRev.GetHash(),
"name": updatedChildRev.GetName(),
+ "data": updatedChildRev.GetData(),
})
response = updatedChildRev
}
@@ -343,11 +430,13 @@
log.Debugw("verify-persisted-entry--same-data", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
+ "data": childRev.GetData(),
})
if childRev != nil {
log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
+ "data": childRev.GetData(),
})
response = childRev
}
@@ -358,29 +447,32 @@
log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
"key": keyValue,
"name": typeName,
+ "data": data,
})
// Construct a new child node with the retrieved persistence data
- childRev = rev.GetBranch().Node.MakeNode(data, txid).Latest(txid)
+ childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
// We need to start watching this entry for future changes
childRev.SetName(typeName + "/" + keyValue)
// Add the child to the parent revision
- rev.GetBranch().LatestLock.Lock()
+ pr.GetBranch().LatestLock.Lock()
children = append(children, childRev)
- updatedRev := rev.GetBranch().Node.Latest().UpdateChildren(typeName, children, rev.GetBranch())
+ updatedRev := pr.GetBranch().Node.Latest().UpdateChildren(typeName, children, pr.GetBranch())
+ updatedRev.GetNode().SetProxy(pr.GetNode().GetProxy())
childRev.SetupWatch(childRev.GetName())
//rev.GetBranch().Node.Latest().Drop(txid, false)
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
- rev.GetBranch().LatestLock.Unlock()
+ pr.GetBranch().Node.makeLatest(pr.GetBranch(), updatedRev, nil)
+ pr.GetBranch().LatestLock.Unlock()
// Child entry is valid and can be included in the response object
if childRev != nil {
log.Debugw("verify-persisted-entry--adding-child", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
+ "data": childRev.GetData(),
})
response = childRev
}
@@ -391,39 +483,46 @@
// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
// by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
pr.mutex.Lock()
defer pr.mutex.Unlock()
log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
var response []Revision
- var rev Revision
- rev = pr
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
if pr.kvStore != nil && path != "" {
- blobMap, _ := pr.kvStore.List(path)
+ if blobs == nil || len(blobs) == 0 {
+ log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
+ blobs, _ = pr.kvStore.List(path)
+ }
partition := strings.SplitN(path, "/", 2)
name := partition[0]
+ var nodeType interface{}
if len(partition) < 2 {
path = ""
+ nodeType = pr.GetBranch().Node.Type
} else {
path = partition[1]
+ nodeType = pr.GetBranch().Node.Root.Type
}
- field := ChildrenFields(rev.GetBranch().Node.Type)[name]
+ field := ChildrenFields(nodeType)[name]
if field != nil && field.IsContainer {
log.Debugw("load-from-persistence--start-blobs", log.Fields{
"path": path,
"name": name,
- "size": len(blobMap),
+ "size": len(blobs),
})
- for _, blob := range blobMap {
+ for _, blob := range blobs {
output := blob.Value.([]byte)
data := reflect.New(field.ClassType.Elem())
@@ -440,7 +539,8 @@
// based on the field's key attribute
_, key := GetAttributeValue(data.Interface(), field.Key, 0)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(),
+ txid); entry != nil {
response = append(response, entry)
}
}
@@ -456,7 +556,8 @@
}
keyValue := field.KeyFromStr(key)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string),
+ txid); entry != nil {
response = append(response, entry)
}
}
@@ -465,7 +566,8 @@
log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
} else {
log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
- "type": rev.GetBranch().Node.Type,
+
+ "type": pr.GetBranch().Node.Type,
"name": name,
})
}
diff --git a/db/model/proxy.go b/db/model/proxy.go
index b45fb1d..2933464 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -186,11 +186,20 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
- log.Debugf("Path: %s, Effective: %s, PathLock: %s", path, effectivePath, pathLock)
+ log.Debugw("proxy-list", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ })
pac := PAC().ReservePath(effectivePath, p, pathLock)
defer PAC().ReleasePath(pathLock)
+ p.Operation = PROXY_LIST
pac.SetProxy(p)
+ defer func(op ProxyOperation) {
+ pac.getProxy().Operation = op
+ }(PROXY_GET)
rv := pac.List(path, depth, deep, txid, controlled)
@@ -208,10 +217,16 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
- log.Debugf("Path: %s, Effective: %s, PathLock: %s", path, effectivePath, pathLock)
+ log.Debugw("proxy-get", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ })
pac := PAC().ReservePath(effectivePath, p, pathLock)
defer PAC().ReleasePath(pathLock)
+ p.Operation = PROXY_GET
pac.SetProxy(p)
rv := pac.Get(path, depth, deep, txid, controlled)
@@ -237,7 +252,13 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
- log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s, Controlled: %b", path, effectivePath, fullPath, pathLock, controlled)
+ log.Debugw("proxy-update", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ })
pac := PAC().ReservePath(effectivePath, p, pathLock)
defer PAC().ReleasePath(pathLock)
@@ -247,7 +268,6 @@
defer func(op ProxyOperation) {
pac.getProxy().Operation = op
}(PROXY_GET)
-
log.Debugw("proxy-operation--update", log.Fields{"operation": p.Operation})
return pac.Update(fullPath, data, strict, txid, controlled)
@@ -273,15 +293,21 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
- log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
+ log.Debugw("proxy-add-with-id", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ })
pac := PAC().ReservePath(path, p, pathLock)
defer PAC().ReleasePath(pathLock)
p.Operation = PROXY_ADD
- defer func() {
- p.Operation = PROXY_GET
- }()
+ defer func(op ProxyOperation) {
+ pac.getProxy().Operation = op
+ }(PROXY_GET)
pac.SetProxy(p)
@@ -308,16 +334,22 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
- log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
+ log.Debugw("proxy-add", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ })
pac := PAC().ReservePath(path, p, pathLock)
defer PAC().ReleasePath(pathLock)
p.Operation = PROXY_ADD
pac.SetProxy(p)
- defer func() {
- p.Operation = PROXY_GET
- }()
+ defer func(op ProxyOperation) {
+ pac.getProxy().Operation = op
+ }(PROXY_GET)
log.Debugw("proxy-operation--add", log.Fields{"operation": p.Operation})
@@ -342,16 +374,22 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
- log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
+ log.Debugw("proxy-remove", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ })
pac := PAC().ReservePath(effectivePath, p, pathLock)
defer PAC().ReleasePath(pathLock)
p.Operation = PROXY_REMOVE
pac.SetProxy(p)
- defer func() {
- p.Operation = PROXY_GET
- }()
+ defer func(op ProxyOperation) {
+ pac.getProxy().Operation = op
+ }(PROXY_GET)
log.Debugw("proxy-operation--remove", log.Fields{"operation": p.Operation})
@@ -377,16 +415,22 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
- log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
+ log.Debugw("proxy-create", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ })
pac := PAC().ReservePath(path, p, pathLock)
defer PAC().ReleasePath(pathLock)
p.Operation = PROXY_CREATE
pac.SetProxy(p)
- defer func() {
- p.Operation = PROXY_GET
- }()
+ defer func(op ProxyOperation) {
+ pac.getProxy().Operation = op
+ }(PROXY_GET)
log.Debugw("proxy-operation--create-proxy", log.Fields{"operation": p.Operation})
diff --git a/db/model/revision.go b/db/model/revision.go
index 79620e1..74ae3f7 100644
--- a/db/model/revision.go
+++ b/db/model/revision.go
@@ -15,6 +15,10 @@
*/
package model
+import (
+ "github.com/opencord/voltha-go/db/kvstore"
+)
+
type Revision interface {
Finalize(bool)
IsDiscarded() bool
@@ -38,7 +42,7 @@
Get(int) interface{}
GetData() interface{}
GetNode() *node
- LoadFromPersistence(path string, txid string) []Revision
+ LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision
UpdateData(data interface{}, branch *Branch) Revision
UpdateChildren(name string, children []Revision, branch *Branch) Revision
UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision