VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index fa35eca..0ecb5ef 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,7 +19,9 @@
import (
"bytes"
"compress/gzip"
+ "encoding/hex"
"github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"reflect"
@@ -105,6 +107,57 @@
}
}
+func (pr *PersistedRevision) updateInMemory(data interface{}) {
+ var pac *proxyAccessControl
+ var pathLock string
+
+ //
+ // If a proxy exists for this revision, use it to lock access to the path
+ // and prevent simultaneous updates to the object in memory
+ //
+ if pr.GetNode().GetProxy() != nil {
+ log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+ pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+ pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+ pac.SetProxy(pr.GetNode().GetProxy())
+ pac.lock()
+
+ defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+ defer pac.unlock()
+ defer PAC().ReleasePath(pathLock)
+ }
+
+ //
+ // Update the object in memory through a transaction
+ // This will allow for the object to be subsequently merged with any changes
+ // that might have occurred in memory
+ //
+
+ log.Debugw("update-in-memory--custom-transaction", log.Fields{"key": pr.GetHash()})
+
+ // Prepare the transaction
+ branch := pr.GetBranch()
+ latest := branch.GetLatest()
+ txidBin, _ := uuid.New().MarshalBinary()
+ txid := hex.EncodeToString(txidBin)[:12]
+
+ makeBranch := func(node *node) *Branch {
+ return node.MakeBranch(txid)
+ }
+
+ // Apply the update in a transaction branch
+ updatedRev := latest.GetNode().Update("", data, false, txid, makeBranch)
+ updatedRev.SetHash(latest.GetHash())
+
+ // Merge the transaction branch in memory
+ if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
+ branch.SetLatest(mergedRev)
+ }
+
+ // The transaction branch must be deleted to free-up memory
+ //latest.GetNode().GetRoot().DeleteTxBranch(txid)
+}
+
func (pr *PersistedRevision) startWatching() {
log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
@@ -136,16 +189,7 @@
if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
} else {
- // Apply changes to current revision
- branch := pr.GetBranch()
- rev := branch.GetLatest()
- updatedRev := rev.UpdateData(data.Interface(), branch)
-
- // ensure that we keep the previous hash value
- updatedRev.SetHash(rev.GetHash())
-
- // Save revision
- branch.SetLatest(updatedRev)
+ pr.updateInMemory(data.Interface())
}
}
@@ -166,7 +210,7 @@
rev = pr
- if pr.kvStore != nil {
+ if pr.kvStore != nil && path != "" {
blobMap, _ := pr.kvStore.List(path)
partition := strings.SplitN(path, "/", 2)
@@ -180,7 +224,7 @@
field := ChildrenFields(rev.GetBranch().Node.Type)[name]
- if field.IsContainer {
+ if field != nil && field.IsContainer {
var children []Revision
children = make([]Revision, len(rev.GetChildren(name)))
copy(children, rev.GetChildren(name))
@@ -226,15 +270,15 @@
if idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue); childRev != nil {
// Key is memory, continue recursing path
- newChildRev := childRev.LoadFromPersistence(path, txid)
+ if newChildRev := childRev.LoadFromPersistence(path, txid); newChildRev != nil {
+ children[idx] = newChildRev[0]
- children[idx] = newChildRev[0]
+ rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
- rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
-
- response = append(response, newChildRev[0])
- continue
+ response = append(response, newChildRev[0])
+ continue
+ }
}
}