VOL-1509 : Partial fix for merging issue
- Changed channel map in etcd to a sync.Map
- Changed graph boundaryPorts to sync.Map
- Added logic to check if proxy access is currently reserved
- Changed watch logic to exit when proxy access in progress
- Fixed UpdateAllChildren method
- Commented out the Drop operation again in node.go
Change-Id: I8a61798e907be0ff6b0785dcc70721708308611d
diff --git a/db/model/node.go b/db/model/node.go
index 707fe75..aeac883 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -560,7 +560,7 @@
// FIXME VOL-1293: the following statement corrupts the kv when using a subproxy (commenting for now)
// FIXME VOL-1293 cont'd: need to figure out the required conditions otherwise we are not cleaning up entries
- branch.GetLatest().Drop(branch.Txid, false)
+ //branch.GetLatest().Drop(branch.Txid, false)
n.makeLatest(branch, rev, changes)
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index cb8c9ca..1b9325d 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -313,14 +313,13 @@
npr.mutex.Lock()
defer npr.mutex.Unlock()
- newRev := &NonPersistedRevision{}
+ newRev := npr
newRev.Config = npr.Config
newRev.Hash = npr.Hash
newRev.Branch = branch
newRev.Children = make(map[string][]Revision)
- for entryName, childrenEntry := range npr.Children {
- newRev.Children[entryName] = make([]Revision, len(childrenEntry))
- copy(newRev.Children[entryName], childrenEntry)
+ for entryName, childrenEntry := range children {
+ newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
}
newRev.Finalize(false)
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 0ecb5ef..f335216 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -40,6 +40,7 @@
mutex sync.RWMutex `json:"-"`
isStored bool
isWatched bool
+ watchName string
}
// NewPersistedRevision creates a new instance of a PersistentRevision structure
@@ -96,8 +97,9 @@
if pr.events == nil {
pr.events = make(chan *kvstore.Event)
- log.Debugw("setting-watch", log.Fields{"key": key})
+ log.Debugw("setting-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+ pr.watchName = key
pr.events = pr.kvStore.CreateWatch(key)
pr.isWatched = true
@@ -107,7 +109,61 @@
}
}
+//func (pr *PersistedRevision) mergeWithMemory(pacBlock bool) Revision {
+// if pair, err := pr.kvStore.Get(pr.GetHash()); err != nil {
+// log.Debugw("merge-with-memory--error-occurred", log.Fields{"hash": pr.GetHash(), "error": err})
+// } else if pair == nil {
+// log.Debugw("merge-with-memory--no-data-to-merge", log.Fields{"hash": pr.GetHash()})
+// } else {
+// data := reflect.New(reflect.TypeOf(pr.GetData()).Elem()).Interface()
+//
+// if err := proto.Unmarshal(pair.Value.([]byte), data.(proto.Message)); err != nil {
+// log.Errorw("merge-with-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
+// } else {
+// if pr.GetNode().GetProxy() != nil && pacBlock {
+// var pac *proxyAccessControl
+// var pathLock string
+//
+// pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+// log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+//
+// pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+// pac.SetProxy(pr.GetNode().GetProxy())
+// pac.lock()
+//
+// defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+// defer pac.unlock()
+// defer PAC().ReleasePath(pathLock)
+// }
+// //Prepare the transaction
+// branch := pr.GetBranch()
+// //latest := branch.GetLatest()
+// txidBin, _ := uuid.New().MarshalBinary()
+// txid := hex.EncodeToString(txidBin)[:12]
+//
+// makeBranch := func(node *node) *Branch {
+// return node.MakeBranch(txid)
+// }
+//
+// //Apply the update in a transaction branch
+// updatedRev := pr.GetNode().Update("", data, false, txid, makeBranch)
+// updatedRev.SetHash(pr.GetHash())
+//
+// //Merge the transaction branch in memory
+// if mergedRev, _ := pr.GetNode().MergeBranch(txid, false); mergedRev != nil {
+// branch.SetLatest(mergedRev)
+// return mergedRev
+// }
+// }
+// }
+//
+// return nil
+//}
+
func (pr *PersistedRevision) updateInMemory(data interface{}) {
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
+
var pac *proxyAccessControl
var pathLock string
@@ -116,8 +172,28 @@
// and prevent simultaneous updates to the object in memory
//
if pr.GetNode().GetProxy() != nil {
- log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+
+ // If the proxy already has a request in progress, then there is no need to process the watch
+ log.Debugw("update-in-memory--checking-pathlock", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+ if PAC().IsReserved(pathLock) {
+ switch pr.GetNode().GetRoot().GetProxy().Operation {
+ case PROXY_ADD:
+ fallthrough
+ case PROXY_REMOVE:
+ fallthrough
+ case PROXY_UPDATE:
+ log.Debugw("update-in-memory--skipping", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
+ return
+ default:
+ log.Debugw("update-in-memory--operation", log.Fields{"operation": pr.GetNode().GetRoot().GetProxy().Operation})
+ }
+ } else {
+ log.Debugw("update-in-memory--path-not-locked", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
+ }
+
+ log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
+
pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
pac.SetProxy(pr.GetNode().GetProxy())
pac.lock()
@@ -153,9 +229,6 @@
if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
branch.SetLatest(mergedRev)
}
-
- // The transaction branch must be deleted to free-up memory
- //latest.GetNode().GetRoot().DeleteTxBranch(txid)
}
func (pr *PersistedRevision) startWatching() {
@@ -166,40 +239,40 @@
select {
case event, ok := <-pr.events:
if !ok {
- log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash()})
+ log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
break StopWatchLoop
}
- log.Debugw("received-event", log.Fields{"type": event.EventType})
+ log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.watchName})
switch event.EventType {
case kvstore.DELETE:
- log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash()})
+ log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
pr.Revision.Drop("", true)
break StopWatchLoop
case kvstore.PUT:
- log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash()})
+ log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
- if dataPair, err := pr.kvStore.Get(pr.GetHash()); err != nil || dataPair == nil {
- log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "error": err})
+ if dataPair, err := pr.kvStore.Get(pr.watchName); err != nil || dataPair == nil {
+ log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "error": err})
} else {
data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
- log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
+ log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "error": err})
} else {
pr.updateInMemory(data.Interface())
}
}
default:
- log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "type": event.EventType})
+ log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.watchName, "type": event.EventType})
}
}
}
- log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash()})
+ log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.watchName})
}
func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
diff --git a/db/model/proxy.go b/db/model/proxy.go
index be48ded..86d426a 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -55,13 +55,14 @@
// Proxy holds the information for a specific location with the data model
type Proxy struct {
sync.RWMutex
- Root *root
- Node *node
- ParentNode *node
- Path string
- FullPath string
- Exclusive bool
- Callbacks map[CallbackType]map[string]*CallbackTuple
+ Root *root
+ Node *node
+ ParentNode *node
+ Path string
+ FullPath string
+ Exclusive bool
+ Callbacks map[CallbackType]map[string]*CallbackTuple
+ Operation ProxyOperation
}
// NewProxy instantiates a new proxy to a specific location
@@ -71,13 +72,13 @@
fullPath = ""
}
p := &Proxy{
- Root: root,
- Node: node,
- ParentNode: parentNode,
- Exclusive: exclusive,
- Path: path,
- FullPath: fullPath,
- Callbacks: callbacks,
+ Root: root,
+ Node: node,
+ ParentNode: parentNode,
+ Exclusive: exclusive,
+ Path: path,
+ FullPath: fullPath,
+ Callbacks: callbacks,
}
return p
}
@@ -136,6 +137,18 @@
delete(p.Callbacks[callbackType], funcHash)
}
+// CallbackType is an enumerated value to express when a callback should be executed
+type ProxyOperation uint8
+
+// Enumerated list of callback types
+const (
+ PROXY_GET ProxyOperation = iota
+ PROXY_LIST
+ PROXY_ADD
+ PROXY_UPDATE
+ PROXY_REMOVE
+)
+
// parseForControlledPath verifies if a proxy path matches a pattern
// for locations that need to be access controlled.
func (p *Proxy) parseForControlledPath(path string) (pathLock string, controlled bool) {
@@ -183,7 +196,6 @@
return rv
}
-
// Get will retrieve information from the data model at the specified path location
func (p *Proxy) Get(path string, depth int, deep bool, txid string) interface{} {
var effectivePath string
@@ -228,8 +240,12 @@
pac := PAC().ReservePath(effectivePath, p, pathLock)
defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_UPDATE
pac.SetProxy(p)
+ log.Debugw("proxy-operation--update", log.Fields{"operation":p.Operation})
+
return pac.Update(fullPath, data, strict, txid, controlled)
}
@@ -257,8 +273,12 @@
pac := PAC().ReservePath(path, p, pathLock)
defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_ADD
pac.SetProxy(p)
+ log.Debugw("proxy-operation--add", log.Fields{"operation":p.Operation})
+
return pac.Add(fullPath, data, txid, controlled)
}
@@ -284,8 +304,12 @@
pac := PAC().ReservePath(path, p, pathLock)
defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_ADD
pac.SetProxy(p)
+ log.Debugw("proxy-operation--add", log.Fields{"operation":p.Operation})
+
return pac.Add(fullPath, data, txid, controlled)
}
@@ -311,8 +335,12 @@
pac := PAC().ReservePath(effectivePath, p, pathLock)
defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_REMOVE
pac.SetProxy(p)
+ log.Debugw("proxy-operation--remove", log.Fields{"operation":p.Operation})
+
return pac.Remove(fullPath, txid, controlled)
}
diff --git a/db/model/proxy_access_control.go b/db/model/proxy_access_control.go
index 295f153..66d3222 100644
--- a/db/model/proxy_access_control.go
+++ b/db/model/proxy_access_control.go
@@ -24,7 +24,7 @@
type singletonProxyAccessControl struct {
sync.RWMutex
- cache sync.Map
+ cache sync.Map
reservedCount int
}
@@ -39,6 +39,17 @@
return instanceProxyAccessControl
}
+// IsReserved will verify if access control is active for a specific path within the model
+func (singleton *singletonProxyAccessControl) IsReserved(pathLock string) bool {
+ singleton.Lock()
+ defer singleton.Unlock()
+
+ _, exists := singleton.cache.Load(pathLock)
+ log.Debugw("is-reserved", log.Fields{"pathLock": pathLock, "exists": exists})
+
+ return exists
+}
+
// ReservePath will apply access control for a specific path within the model
func (singleton *singletonProxyAccessControl) ReservePath(path string, proxy *Proxy, pathLock string) *proxyAccessControl {
singleton.Lock()
@@ -47,7 +58,7 @@
if pac, exists := singleton.cache.Load(pathLock); !exists {
log.Debugf("Creating new PAC entry for path:%s pathLock:%s", path, pathLock)
newPac := NewProxyAccessControl(proxy, pathLock)
- singleton.cache.Store(pathLock,newPac)
+ singleton.cache.Store(pathLock, newPac)
return newPac
} else {
log.Debugf("Re-using existing PAC entry for path:%s pathLock:%s", path, pathLock)
@@ -162,9 +173,9 @@
func (pac *proxyAccessControl) List(path string, depth int, deep bool, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--list", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--list", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
defer pac.unlock()
- defer log.Debugw("unlocked-access--list", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--list", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
// FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
@@ -177,9 +188,9 @@
func (pac *proxyAccessControl) Get(path string, depth int, deep bool, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--get", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--get", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
defer pac.unlock()
- defer log.Debugw("unlocked-access--get", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--get", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
// FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
@@ -191,10 +202,11 @@
func (pac *proxyAccessControl) Update(path string, data interface{}, strict bool, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--update", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--update", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
defer pac.unlock()
- defer log.Debugw("unlocked-access--update", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--update", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
+
result := pac.getProxy().GetRoot().Update(path, data, strict, txid, nil)
if result != nil {
@@ -207,10 +219,11 @@
func (pac *proxyAccessControl) Add(path string, data interface{}, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--add", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--add", log.Fields{"path": path, "fullPath": pac.Path})
defer pac.unlock()
- defer log.Debugw("unlocked-access--add", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--add", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
+
result := pac.getProxy().GetRoot().Add(path, data, txid, nil)
if result != nil {
@@ -223,9 +236,10 @@
func (pac *proxyAccessControl) Remove(path string, txid string, control bool) interface{} {
if control {
pac.lock()
- log.Debugw("locked-access--remove", log.Fields{"path":pac.Proxy.getFullPath()})
+ log.Debugw("locked-access--remove", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
defer pac.unlock()
- defer log.Debugw("unlocked-access--remove", log.Fields{"path":pac.Proxy.getFullPath()})
+ defer log.Debugw("unlocked-access--remove", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
}
+
return pac.getProxy().GetRoot().Remove(path, txid, nil)
}