VOL-1243: Added logic for thread safety
- Thread safety was added at the proxy level
- Refactored the test init in a base_test structure
- Fixed issue with writing to kv
- Added profiling for locking period
Amendments:
- Comment out a cleanup statement causing KV corruption (as per VOL-1293)
- Added missing license
Change-Id: Id6658270dbb8b738abeef9e9e1d349dce36501bc
diff --git a/db/model/proxy.go b/db/model/proxy.go
index decbf9b..3827ff3 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -23,6 +23,7 @@
"reflect"
"runtime"
"strings"
+ "sync"
)
type OperationContext struct {
@@ -48,25 +49,68 @@
}
type Proxy struct {
+ sync.Mutex
Root *root
Path string
+ FullPath string
Exclusive bool
Callbacks map[CallbackType]map[string]CallbackTuple
}
-func NewProxy(root *root, path string, exclusive bool) *Proxy {
+func NewProxy(root *root, path string, fullPath string, exclusive bool) *Proxy {
callbacks := make(map[CallbackType]map[string]CallbackTuple)
+ if fullPath == "/" {
+ fullPath = ""
+ }
p := &Proxy{
Root: root,
Exclusive: exclusive,
Path: path,
+ FullPath: fullPath,
Callbacks: callbacks,
}
return p
}
+func (p *Proxy) parseForControlledPath(path string) (pathLock string, controlled bool) {
+ // TODO: Add other path prefixes they may need control
+ if strings.HasPrefix(path, "/devices") {
+ split := strings.SplitN(path, "/", -1)
+ switch len(split) {
+ case 2:
+ controlled = false
+ pathLock = ""
+ break
+ case 3:
+ fallthrough
+ default:
+ pathLock = fmt.Sprintf("%s/%s", split[1], split[2])
+ controlled = true
+ }
+ }
+ return pathLock, controlled
+}
+
func (p *Proxy) Get(path string, depth int, deep bool, txid string) interface{} {
- return p.Root.Get(path, "", depth, deep, txid)
+ var effectivePath string
+ if path == "/" {
+ effectivePath = p.FullPath
+ } else {
+ effectivePath = p.FullPath + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ var pac interface{}
+ var exists bool
+
+ p.Lock()
+ if pac, exists = GetProxyAccessControl().Cache.LoadOrStore(path, NewProxyAccessControl(p, pathLock)); !exists {
+ defer GetProxyAccessControl().Cache.Delete(pathLock)
+ }
+ p.Unlock()
+
+ return pac.(ProxyAccessControl).Get(path, depth, deep, txid, controlled)
}
func (p *Proxy) Update(path string, data interface{}, strict bool, txid string) interface{} {
@@ -75,13 +119,27 @@
return nil
}
var fullPath string
+ var effectivePath string
if path == "/" {
fullPath = p.Path
+ effectivePath = p.FullPath
} else {
fullPath = p.Path + path
+ effectivePath = p.FullPath + path
}
- return p.Root.Update(fullPath, data, strict, txid, nil)
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ var pac interface{}
+ var exists bool
+
+ p.Lock()
+ if pac, exists = GetProxyAccessControl().Cache.LoadOrStore(path, NewProxyAccessControl(p, pathLock)); !exists {
+ defer GetProxyAccessControl().Cache.Delete(pathLock)
+ }
+ p.Unlock()
+
+ return pac.(ProxyAccessControl).Update(fullPath, data, strict, txid, controlled)
}
func (p *Proxy) Add(path string, data interface{}, txid string) interface{} {
@@ -90,12 +148,27 @@
return nil
}
var fullPath string
+ var effectivePath string
if path == "/" {
fullPath = p.Path
+ effectivePath = p.FullPath
} else {
fullPath = p.Path + path
+ effectivePath = p.FullPath + path
}
- return p.Root.Add(fullPath, data, txid, nil)
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ var pac interface{}
+ var exists bool
+
+ p.Lock()
+ if pac, exists = GetProxyAccessControl().Cache.LoadOrStore(path, NewProxyAccessControl(p, pathLock)); !exists {
+ defer GetProxyAccessControl().Cache.Delete(pathLock)
+ }
+ p.Unlock()
+
+ return pac.(ProxyAccessControl).Add(fullPath, data, txid, controlled)
}
func (p *Proxy) Remove(path string, txid string) interface{} {
@@ -104,12 +177,27 @@
return nil
}
var fullPath string
+ var effectivePath string
if path == "/" {
fullPath = p.Path
+ effectivePath = p.FullPath
} else {
fullPath = p.Path + path
+ effectivePath = p.FullPath + path
}
- return p.Root.Remove(fullPath, txid, nil)
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ var pac interface{}
+ var exists bool
+
+ p.Lock()
+ if pac, exists = GetProxyAccessControl().Cache.LoadOrStore(path, NewProxyAccessControl(p, pathLock)); !exists {
+ defer GetProxyAccessControl().Cache.Delete(pathLock)
+ }
+ p.Unlock()
+
+ return pac.(ProxyAccessControl).Remove(fullPath, txid, controlled)
}
func (p *Proxy) OpenTransaction() *Transaction {
@@ -199,5 +287,5 @@
}
}
}
- return context
+ return result
}