VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index a7cbf2b..a8e6311 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -75,10 +75,10 @@
// Client represents the set of APIs a KV Client must implement
type Client interface {
- List(key string, timeout int) (map[string]*KVPair, error)
- Get(key string, timeout int) (*KVPair, error)
- Put(key string, value interface{}, timeout int) error
- Delete(key string, timeout int) error
+ List(key string, timeout int, lock ...bool) (map[string]*KVPair, error)
+ Get(key string, timeout int, lock ...bool) (*KVPair, error)
+ Put(key string, value interface{}, timeout int, lock ...bool) error
+ Delete(key string, timeout int, lock ...bool) error
Reserve(key string, value interface{}, ttl int64) (interface{}, error)
ReleaseReservation(key string) error
ReleaseAllReservations() error
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index e4f6baf..a5c71ac 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -65,7 +65,7 @@
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) List(key string, timeout int) (map[string]*KVPair, error) {
+func (c *ConsulClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
duration := GetDuration(timeout)
kv := c.consul.KV()
@@ -86,7 +86,7 @@
// Get returns a key-value pair for a given key. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Get(key string, timeout int) (*KVPair, error) {
+func (c *ConsulClient) Get(key string, timeout int, lock ...bool) (*KVPair, error) {
duration := GetDuration(timeout)
@@ -109,7 +109,7 @@
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Put(key string, value interface{}, timeout int) error {
+func (c *ConsulClient) Put(key string, value interface{}, timeout int, lock ...bool) error {
// Validate that we can create a byte array from the value as consul API expects a byte array
var val []byte
@@ -135,7 +135,7 @@
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Delete(key string, timeout int) error {
+func (c *ConsulClient) Delete(key string, timeout int, lock ...bool) error {
kv := c.consul.KV()
var writeOptions consulapi.WriteOptions
c.writeLock.Lock()
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 490a477..9ecddca 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -16,14 +16,12 @@
package kvstore
import (
- //log "../common"
"context"
"errors"
"fmt"
- //v3Client "github.com/coreos/etcd/clientv3"
- //v3rpcTypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- log "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/log"
v3Client "go.etcd.io/etcd/clientv3"
+ v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"sync"
)
@@ -39,7 +37,6 @@
// NewEtcdClient returns a new client for the Etcd KV store
func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
-
duration := GetDuration(timeout)
c, err := v3Client.New(v3Client.Config{
@@ -52,15 +49,26 @@
}
wc := make(map[string][]map[chan *Event]v3Client.Watcher)
reservations := make(map[string]*v3Client.LeaseID)
+
return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) List(key string, timeout int) (map[string]*KVPair, error) {
+func (c *EtcdClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
+
+ // DO NOT lock by default; otherwise lock per instructed value
+ if len(lock) > 0 && lock[0] {
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu.Lock(context.Background())
+ defer mu.Unlock(context.Background())
+ defer session.Close()
+ }
+
resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
cancel()
if err != nil {
@@ -76,10 +84,20 @@
// Get returns a key-value pair for a given key. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Get(key string, timeout int) (*KVPair, error) {
+func (c *EtcdClient) Get(key string, timeout int, lock ...bool) (*KVPair, error) {
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
+
+ // Lock by default; otherwise lock per instructed value
+ if len(lock) == 0 || lock[0] {
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu.Lock(context.Background())
+ defer mu.Unlock(context.Background())
+ defer session.Close()
+ }
+
resp, err := c.ectdAPI.Get(ctx, key)
cancel()
if err != nil {
@@ -96,7 +114,7 @@
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
// accepts only a string as a value for a put operation. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Put(key string, value interface{}, timeout int) error {
+func (c *EtcdClient) Put(key string, value interface{}, timeout int, lock ...bool) error {
// Validate that we can convert value to a string as etcd API expects a string
var val string
@@ -108,6 +126,16 @@
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
+
+ // Lock by default; otherwise lock per instructed value
+ if len(lock) == 0 || lock[0] {
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu.Lock(context.Background())
+ defer mu.Unlock(context.Background())
+ defer session.Close()
+ }
+
c.writeLock.Lock()
defer c.writeLock.Unlock()
_, err := c.ectdAPI.Put(ctx, key, val)
@@ -130,11 +158,21 @@
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Delete(key string, timeout int) error {
+func (c *EtcdClient) Delete(key string, timeout int, lock ...bool) error {
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
+
+ // Lock by default; otherwise lock per instructed value
+ if len(lock) == 0 || lock[0] {
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu.Lock(context.Background())
+ defer mu.Unlock(context.Background())
+ defer session.Close()
+ }
+
defer cancel()
c.writeLock.Lock()
@@ -228,7 +266,7 @@
}
} else {
// Read the Key to ensure this is our Key
- m, err := c.Get(key, defaultKVGetTimeout)
+ m, err := c.Get(key, defaultKVGetTimeout, false)
if err != nil {
return nil, err
}
diff --git a/db/model/backend.go b/db/model/backend.go
index dc0e6bd..981a1d5 100644
--- a/db/model/backend.go
+++ b/db/model/backend.go
@@ -82,26 +82,26 @@
}
// List retrieves one or more items that match the specified key
-func (b *Backend) List(key string) (map[string]*kvstore.KVPair, error) {
+func (b *Backend) List(key string, lock ...bool) (map[string]*kvstore.KVPair, error) {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
- log.Debugw("listing-key", log.Fields{"key": key, "path": formattedPath})
+ log.Debugw("listing-key", log.Fields{"key": key, "path": formattedPath, "lock": lock})
- return b.Client.List(formattedPath, b.Timeout)
+ return b.Client.List(formattedPath, b.Timeout, lock...)
}
// Get retrieves an item that matches the specified key
-func (b *Backend) Get(key string) (*kvstore.KVPair, error) {
+func (b *Backend) Get(key string, lock ...bool) (*kvstore.KVPair, error) {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
- log.Debugw("getting-key", log.Fields{"key": key, "path": formattedPath})
+ log.Debugw("getting-key", log.Fields{"key": key, "path": formattedPath, "lock": lock})
start := time.Now()
- err, pair := b.Client.Get(formattedPath, b.Timeout)
+ err, pair := b.Client.Get(formattedPath, b.Timeout, lock...)
stop := time.Now()
GetProfiling().AddToDatabaseRetrieveTime(stop.Sub(start).Seconds())
@@ -110,25 +110,25 @@
}
// Put stores an item value under the specifed key
-func (b *Backend) Put(key string, value interface{}) error {
+func (b *Backend) Put(key string, value interface{}, lock ...bool) error {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
- log.Debugw("putting-key", log.Fields{"key": key, "value": string(value.([]byte)), "path": formattedPath})
+ log.Debugw("putting-key", log.Fields{"key": key, "value": string(value.([]byte)), "path": formattedPath, "lock": lock})
- return b.Client.Put(formattedPath, value, b.Timeout)
+ return b.Client.Put(formattedPath, value, b.Timeout, lock...)
}
// Delete removes an item under the specified key
-func (b *Backend) Delete(key string) error {
+func (b *Backend) Delete(key string, lock ...bool) error {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
- log.Debugw("deleting-key", log.Fields{"key": key, "path": formattedPath})
+ log.Debugw("deleting-key", log.Fields{"key": key, "path": formattedPath, "lock": lock})
- return b.Client.Delete(formattedPath, b.Timeout)
+ return b.Client.Delete(formattedPath, b.Timeout, lock...)
}
// CreateWatch starts watching events for the specified key
diff --git a/db/model/merge.go b/db/model/merge.go
index b230076..c59dda4 100644
--- a/db/model/merge.go
+++ b/db/model/merge.go
@@ -90,6 +90,8 @@
mergeChildFunc func(Revision) Revision,
dryRun bool) (rev Revision, changes []ChangeTuple) {
+ log.Debugw("3-way-merge-request", log.Fields{"dryRun": dryRun})
+
var configChanged bool
var revsToDiscard []Revision
@@ -246,7 +248,7 @@
}
}
- if !dryRun && len(newChildren) > 0{
+ if !dryRun && len(newChildren) > 0 {
if configChanged {
rev = srcRev
} else {
@@ -257,11 +259,11 @@
discarded.Drop("", true)
}
- dstRev.GetBranch().GetLatest().Drop("", configChanged)
+ // FIXME: Do not discard the latest value for now
+ //dstRev.GetBranch().GetLatest().Drop("", configChanged)
rev = rev.UpdateAllChildren(newChildren, dstRev.GetBranch())
if configChanged {
- // FIXME: what type of previous/latest data do we want to show? Specific node or Root
changes = append(changes, ChangeTuple{POST_UPDATE, dstRev.GetBranch().GetLatest().GetData(), rev.GetData()})
}
return rev, changes
diff --git a/db/model/node.go b/db/model/node.go
index 77ca565..707fe75 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -114,6 +114,9 @@
// makeLatest will mark the revision of a node as being the latest
func (n *node) makeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
+ n.Lock()
+ defer n.Unlock()
+
branch.AddRevision(revision)
if branch.GetLatest() == nil || revision.GetHash() != branch.GetLatest().GetHash() {
@@ -121,17 +124,17 @@
}
if changeAnnouncement != nil && branch.Txid == "" {
- if n.GetProxy() != nil {
+ if n.Proxy != nil {
for _, change := range changeAnnouncement {
- log.Debugw("invoking callback",
- log.Fields{
- "callbacks": n.GetProxy().getCallbacks(change.Type),
- "type": change.Type,
- "previousData": change.PreviousData,
- "latestData": change.LatestData,
- })
- n.GetRoot().AddCallback(
- n.GetProxy().InvokeCallbacks,
+ //log.Debugw("invoking callback",
+ // log.Fields{
+ // "callbacks": n.Proxy.getCallbacks(change.Type),
+ // "type": change.Type,
+ // "previousData": change.PreviousData,
+ // "latestData": change.LatestData,
+ // })
+ n.Root.AddCallback(
+ n.Proxy.InvokeCallbacks,
change.Type,
true,
change.PreviousData,
@@ -139,18 +142,18 @@
}
}
- for _, change := range changeAnnouncement {
- log.Debugf("sending notification - changeType: %+v, previous:%+v, latest: %+v",
- change.Type,
- change.PreviousData,
- change.LatestData)
- n.GetRoot().AddNotificationCallback(
- n.makeEventBus().Advertise,
- change.Type,
- revision.GetHash(),
- change.PreviousData,
- change.LatestData)
- }
+ //for _, change := range changeAnnouncement {
+ //log.Debugf("sending notification - changeType: %+v, previous:%+v, latest: %+v",
+ // change.Type,
+ // change.PreviousData,
+ // change.LatestData)
+ //n.Root.AddNotificationCallback(
+ // n.makeEventBus().Advertise,
+ // change.Type,
+ // revision.GetHash(),
+ // change.PreviousData,
+ // change.LatestData)
+ //}
}
}
@@ -284,12 +287,9 @@
}
// Get retrieves the data from a node tree that resides at the specified path
-func (n *node) Get(path string, hash string, depth int, deep bool, txid string) interface{} {
- log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth": depth, "deep": deep, "txid": txid})
- if deep {
- depth = -1
- }
-
+func (n *node) Get(path string, hash string, depth int, reconcile bool, txid string) interface{} {
+ log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile,
+ "txid": txid})
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -308,11 +308,20 @@
}
var result interface{}
- if result = n.getPath(rev.GetBranch().GetLatest(), path, depth);
- (result == nil || reflect.ValueOf(result).IsValid() && reflect.ValueOf(result).IsNil()) && n.Root.KvStore != nil {
- // We got nothing from memory, try to pull it from persistence
+
+ // If there is not request to reconcile, try to get it from memory
+ if !reconcile {
+ if result = n.getPath(rev.GetBranch().GetLatest(), path, depth);
+ result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
+ return result
+ }
+ }
+
+ // If we got to this point, we are either trying to reconcile with the db or
+ // or we simply failed at getting information from memory
+ if n.Root.KvStore != nil {
var prList []interface{}
- if pr := rev.LoadFromPersistence(path, txid); pr != nil {
+ if pr := rev.LoadFromPersistence(path, txid); pr != nil && len(pr) > 0 {
// Did we receive a single or multiple revisions?
if len(pr) > 1 {
for _, revEntry := range pr {
@@ -551,7 +560,7 @@
// FIXME VOL-1293: the following statement corrupts the kv when using a subproxy (commenting for now)
// FIXME VOL-1293 cont'd: need to figure out the required conditions otherwise we are not cleaning up entries
- //branch.GetLatest().Drop(branch.Txid, true)
+ branch.GetLatest().Drop(branch.Txid, false)
n.makeLatest(branch, rev, changes)
@@ -611,7 +620,7 @@
if _, exists := n.findRevByKey(children, field.Key, key.String()); exists != nil {
// TODO raise error
- log.Warnw("duplicate-key-found", log.Fields{"key":key.String()})
+ log.Warnw("duplicate-key-found", log.Fields{"key": key.String()})
return exists
}
childRev := n.MakeNode(data, "").Latest()
@@ -809,6 +818,7 @@
if !dryRun {
if rev != nil {
+ rev.SetHash(dstRev.GetHash())
n.makeLatest(dstBranch, rev, changes)
}
n.DeleteBranch(txid)
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index fa35eca..0ecb5ef 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,7 +19,9 @@
import (
"bytes"
"compress/gzip"
+ "encoding/hex"
"github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"reflect"
@@ -105,6 +107,57 @@
}
}
+func (pr *PersistedRevision) updateInMemory(data interface{}) {
+ var pac *proxyAccessControl
+ var pathLock string
+
+ //
+ // If a proxy exists for this revision, use it to lock access to the path
+ // and prevent simultaneous updates to the object in memory
+ //
+ if pr.GetNode().GetProxy() != nil {
+ log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+ pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+ pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+ pac.SetProxy(pr.GetNode().GetProxy())
+ pac.lock()
+
+ defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+ defer pac.unlock()
+ defer PAC().ReleasePath(pathLock)
+ }
+
+ //
+ // Update the object in memory through a transaction
+ // This will allow for the object to be subsequently merged with any changes
+ // that might have occurred in memory
+ //
+
+ log.Debugw("update-in-memory--custom-transaction", log.Fields{"key": pr.GetHash()})
+
+ // Prepare the transaction
+ branch := pr.GetBranch()
+ latest := branch.GetLatest()
+ txidBin, _ := uuid.New().MarshalBinary()
+ txid := hex.EncodeToString(txidBin)[:12]
+
+ makeBranch := func(node *node) *Branch {
+ return node.MakeBranch(txid)
+ }
+
+ // Apply the update in a transaction branch
+ updatedRev := latest.GetNode().Update("", data, false, txid, makeBranch)
+ updatedRev.SetHash(latest.GetHash())
+
+ // Merge the transaction branch in memory
+ if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
+ branch.SetLatest(mergedRev)
+ }
+
+ // The transaction branch must be deleted to free-up memory
+ //latest.GetNode().GetRoot().DeleteTxBranch(txid)
+}
+
func (pr *PersistedRevision) startWatching() {
log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
@@ -136,16 +189,7 @@
if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
} else {
- // Apply changes to current revision
- branch := pr.GetBranch()
- rev := branch.GetLatest()
- updatedRev := rev.UpdateData(data.Interface(), branch)
-
- // ensure that we keep the previous hash value
- updatedRev.SetHash(rev.GetHash())
-
- // Save revision
- branch.SetLatest(updatedRev)
+ pr.updateInMemory(data.Interface())
}
}
@@ -166,7 +210,7 @@
rev = pr
- if pr.kvStore != nil {
+ if pr.kvStore != nil && path != "" {
blobMap, _ := pr.kvStore.List(path)
partition := strings.SplitN(path, "/", 2)
@@ -180,7 +224,7 @@
field := ChildrenFields(rev.GetBranch().Node.Type)[name]
- if field.IsContainer {
+ if field != nil && field.IsContainer {
var children []Revision
children = make([]Revision, len(rev.GetChildren(name)))
copy(children, rev.GetChildren(name))
@@ -226,15 +270,15 @@
if idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue); childRev != nil {
// Key is memory, continue recursing path
- newChildRev := childRev.LoadFromPersistence(path, txid)
+ if newChildRev := childRev.LoadFromPersistence(path, txid); newChildRev != nil {
+ children[idx] = newChildRev[0]
- children[idx] = newChildRev[0]
+ rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
- rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
-
- response = append(response, newChildRev[0])
- continue
+ response = append(response, newChildRev[0])
+ continue
+ }
}
}
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 08c0359..be48ded 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -107,6 +107,8 @@
// getCallback returns a specific callback matching the type and function hash
func (p *Proxy) getCallback(callbackType CallbackType, funcHash string) *CallbackTuple {
+ p.Lock()
+ defer p.Unlock()
if tuple, exists := p.Callbacks[callbackType][funcHash]; exists {
return tuple
}
@@ -222,7 +224,7 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
- log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
+ log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s, Controlled: %b", path, effectivePath, fullPath, pathLock, controlled)
pac := PAC().ReservePath(effectivePath, p, pathLock)
defer PAC().ReleasePath(pathLock)
diff --git a/db/model/proxy_access_control.go b/db/model/proxy_access_control.go
index 234bcd9..295f153 100644
--- a/db/model/proxy_access_control.go
+++ b/db/model/proxy_access_control.go
@@ -18,7 +18,6 @@
import (
"github.com/opencord/voltha-go/common/log"
- "runtime/debug"
"sync"
"time"
)
@@ -163,8 +162,9 @@
func (pac *proxyAccessControl) List(path string, depth int, deep bool, txid string, control bool) interface{} {
if control {
pac.lock()
+ log.Debugw("locked-access--list", log.Fields{"path":pac.Proxy.getFullPath()})
defer pac.unlock()
- log.Debugf("controlling list, stack = %s", string(debug.Stack()))
+ defer log.Debugw("unlocked-access--list", log.Fields{"path":pac.Proxy.getFullPath()})
}
// FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
@@ -177,8 +177,9 @@
func (pac *proxyAccessControl) Get(path string, depth int, deep bool, txid string, control bool) interface{} {
if control {
pac.lock()
+ log.Debugw("locked-access--get", log.Fields{"path":pac.Proxy.getFullPath()})
defer pac.unlock()
- log.Debugf("controlling get, stack = %s", string(debug.Stack()))
+ defer log.Debugw("unlocked-access--get", log.Fields{"path":pac.Proxy.getFullPath()})
}
// FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
@@ -190,8 +191,9 @@
func (pac *proxyAccessControl) Update(path string, data interface{}, strict bool, txid string, control bool) interface{} {
if control {
pac.lock()
+ log.Debugw("locked-access--update", log.Fields{"path":pac.Proxy.getFullPath()})
defer pac.unlock()
- log.Debugf("controlling update, stack = %s", string(debug.Stack()))
+ defer log.Debugw("unlocked-access--update", log.Fields{"path":pac.Proxy.getFullPath()})
}
result := pac.getProxy().GetRoot().Update(path, data, strict, txid, nil)
@@ -205,8 +207,9 @@
func (pac *proxyAccessControl) Add(path string, data interface{}, txid string, control bool) interface{} {
if control {
pac.lock()
+ log.Debugw("locked-access--add", log.Fields{"path":pac.Proxy.getFullPath()})
defer pac.unlock()
- log.Debugf("controlling add %s, stack = %s", pac.Path, string(debug.Stack()))
+ defer log.Debugw("unlocked-access--add", log.Fields{"path":pac.Proxy.getFullPath()})
}
result := pac.getProxy().GetRoot().Add(path, data, txid, nil)
@@ -220,8 +223,9 @@
func (pac *proxyAccessControl) Remove(path string, txid string, control bool) interface{} {
if control {
pac.lock()
+ log.Debugw("locked-access--remove", log.Fields{"path":pac.Proxy.getFullPath()})
defer pac.unlock()
- log.Debugf("controlling remove, stack = %s", string(debug.Stack()))
+ defer log.Debugw("unlocked-access--remove", log.Fields{"path":pac.Proxy.getFullPath()})
}
return pac.getProxy().GetRoot().Remove(path, txid, nil)
}
diff --git a/db/model/root.go b/db/model/root.go
index 0f14c7c..8f9e001 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -104,6 +104,7 @@
} else {
r.node.MergeBranch(txid, false)
r.ExecuteCallbacks()
+ r.DeleteTxBranch(txid)
}
}