VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability

- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
  decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism

Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/db/model/node.go b/db/model/node.go
index 207df09..fcd3b5f 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -20,6 +20,7 @@
 // TODO: proper logging
 
 import (
+	"context"
 	"fmt"
 	"github.com/golang/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
@@ -32,10 +33,6 @@
 // When a branch has no transaction id, everything gets stored in NONE
 const (
 	NONE string = "none"
-
-	// period to determine when data requires a refresh (in seconds)
-	// TODO: make this configurable?
-	DATA_REFRESH_PERIOD int64 = 5000
 )
 
 // Node interface is an abstraction of the node data structure
@@ -43,10 +40,14 @@
 	MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
 
 	// CRUD functions
-	Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
-	Get(path string, hash string, depth int, deep bool, txid string) interface{}
-	Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision
-	Remove(path string, txid string, makeBranch MakeBranchFunction) Revision
+	Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
+	Get(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
+	List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
+	Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision
+	Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision
+	CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy
+
+	GetProxy() *Proxy
 
 	MakeBranch(txid string) *Branch
 	DeleteBranch(txid string)
@@ -55,16 +56,12 @@
 	MakeTxBranch() string
 	DeleteTxBranch(txid string)
 	FoldTxBranch(txid string)
-
-	CreateProxy(path string, exclusive bool) *Proxy
-	GetProxy() *Proxy
 }
 
 type node struct {
-	mutex sync.RWMutex
-	Root  *root
-	Type  interface{}
-
+	mutex     sync.RWMutex
+	Root      *root
+	Type      interface{}
 	Branches  map[string]*Branch
 	Tags      map[string]Revision
 	Proxy     *Proxy
@@ -133,7 +130,7 @@
 			log.Debugw("saving-latest-data", log.Fields{"hash": revision.GetHash(), "data": revision.GetData()})
 			// Tag a timestamp to that revision
 			revision.SetLastUpdate()
-			GetRevCache().Cache.Store(revision.GetName(), revision)
+			GetRevCache().Set(revision.GetName(), revision)
 		}
 		branch.SetLatest(revision)
 	}
@@ -148,13 +145,13 @@
 			for _, change := range changeAnnouncement {
 				log.Debugw("adding-callback",
 					log.Fields{
-						"callbacks":    n.Proxy.getCallbacks(change.Type),
+						"callbacks":    n.GetProxy().getCallbacks(change.Type),
 						"type":         change.Type,
 						"previousData": change.PreviousData,
 						"latestData":   change.LatestData,
 					})
 				n.Root.AddCallback(
-					n.Proxy.InvokeCallbacks,
+					n.GetProxy().InvokeCallbacks,
 					change.Type,
 					true,
 					change.PreviousData,
@@ -253,7 +250,7 @@
 }
 
 // Get retrieves the data from a node tree that resides at the specified path
-func (n *node) List(path string, hash string, depth int, deep bool, txid string) interface{} {
+func (n *node) List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{} {
 	n.mutex.Lock()
 	defer n.mutex.Unlock()
 
@@ -281,7 +278,7 @@
 
 	var result interface{}
 	var prList []interface{}
-	if pr := rev.LoadFromPersistence(path, txid, nil); pr != nil {
+	if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil {
 		for _, revEntry := range pr {
 			prList = append(prList, revEntry.GetData())
 		}
@@ -292,7 +289,7 @@
 }
 
 // Get retrieves the data from a node tree that resides at the specified path
-func (n *node) Get(path string, hash string, depth int, reconcile bool, txid string) interface{} {
+func (n *node) Get(ctx context.Context, path string, hash string, depth int, reconcile bool, txid string) interface{} {
 	n.mutex.Lock()
 	defer n.mutex.Unlock()
 
@@ -323,9 +320,9 @@
 		// 1.  Start with the cache which stores revisions by watch names
 		// 2.  Then look in the revision tree, especially if it's a sub-path such as /devices/1234/flows
 		// 3.  Move on to the KV store if that path cannot be found or if the entry has expired
-		if entry, exists := GetRevCache().Cache.Load(path); exists && entry.(Revision) != nil {
+		if entry, exists := GetRevCache().Get(path); exists && entry.(Revision) != nil {
 			entryAge := time.Now().Sub(entry.(Revision).GetLastUpdate()).Nanoseconds() / int64(time.Millisecond)
-			if entryAge < DATA_REFRESH_PERIOD {
+			if entryAge < DataRefreshPeriod {
 				log.Debugw("using-cache-entry", log.Fields{
 					"path": path,
 					"hash": hash,
@@ -335,7 +332,7 @@
 			} else {
 				log.Debugw("cache-entry-expired", log.Fields{"path": path, "hash": hash, "age": entryAge})
 			}
-		} else if result = n.getPath(rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
+		} else if result = n.getPath(ctx, rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
 			log.Debugw("using-rev-tree-entry", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
 			return result
 		} else {
@@ -357,7 +354,7 @@
 	// If we got to this point, we are either trying to reconcile with the db
 	// or we simply failed at getting information from memory
 	if n.Root.KvStore != nil {
-		if pr := rev.LoadFromPersistence(path, txid, nil); pr != nil && len(pr) > 0 {
+		if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil && len(pr) > 0 {
 			// Did we receive a single or multiple revisions?
 			if len(pr) > 1 {
 				var revs []interface{}
@@ -375,7 +372,7 @@
 }
 
 //getPath traverses the specified path and retrieves the data associated to it
-func (n *node) getPath(rev Revision, path string, depth int) interface{} {
+func (n *node) getPath(ctx context.Context, rev Revision, path string, depth int) interface{} {
 	if path == "" {
 		return n.getData(rev, depth)
 	}
@@ -406,7 +403,7 @@
 					return nil
 				} else {
 					childNode := childRev.GetNode()
-					return childNode.getPath(childRev, path, depth)
+					return childNode.getPath(ctx, childRev, path, depth)
 				}
 			} else {
 				var response []interface{}
@@ -430,11 +427,13 @@
 			}
 			return response
 		}
+	} else if children := rev.GetChildren(name); children != nil && len(children) > 0 {
+		childRev := children[0]
+		childNode := childRev.GetNode()
+		return childNode.getPath(ctx, childRev, path, depth)
 	}
 
-	childRev := rev.GetChildren(name)[0]
-	childNode := childRev.GetNode()
-	return childNode.getPath(childRev, path, depth)
+	return nil
 }
 
 // getData retrieves the data from a node revision
@@ -454,7 +453,7 @@
 }
 
 // Update changes the content of a node at the specified path with the provided data
-func (n *node) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
+func (n *node) Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
 	n.mutex.Lock()
 	defer n.mutex.Unlock()
 
@@ -475,7 +474,7 @@
 		log.Debugf("Branch data : %+v, Passed data: %+v", branch.GetLatest().GetData(), data)
 	}
 	if path == "" {
-		return n.doUpdate(branch, data, strict)
+		return n.doUpdate(ctx, branch, data, strict)
 	}
 
 	rev := branch.GetLatest()
@@ -493,7 +492,7 @@
 	var children []Revision
 
 	if field == nil {
-		return n.doUpdate(branch, data, strict)
+		return n.doUpdate(ctx, branch, data, strict)
 	}
 
 	if field.IsContainer {
@@ -523,11 +522,11 @@
 
 			// Save proxy in child node to ensure callbacks are called later on
 			// only assign in cases of non sub-folder proxies, i.e. "/"
-			if childNode.Proxy == nil && n.Proxy != nil && n.Proxy.getFullPath() == "" {
+			if childNode.Proxy == nil && n.Proxy != nil && n.GetProxy().getFullPath() == "" {
 				childNode.Proxy = n.Proxy
 			}
 
-			newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+			newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
 
 			if newChildRev.GetHash() == childRev.GetHash() {
 				if newChildRev != childRev {
@@ -559,7 +558,7 @@
 				children = append(children, newChildRev)
 			}
 
-			updatedRev := rev.UpdateChildren(name, children, branch)
+			updatedRev := rev.UpdateChildren(ctx, name, children, branch)
 
 			n.makeLatest(branch, updatedRev, nil)
 			updatedRev.ChildDrop(name, childRev.GetHash())
@@ -572,12 +571,12 @@
 	} else {
 		childRev := rev.GetChildren(name)[0]
 		childNode := childRev.GetNode()
-		newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+		newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
 
 		branch.LatestLock.Lock()
 		defer branch.LatestLock.Unlock()
 
-		updatedRev := rev.UpdateChildren(name, []Revision{newChildRev}, branch)
+		updatedRev := rev.UpdateChildren(ctx, name, []Revision{newChildRev}, branch)
 		n.makeLatest(branch, updatedRev, nil)
 
 		updatedRev.ChildDrop(name, childRev.GetHash())
@@ -588,7 +587,7 @@
 	return nil
 }
 
-func (n *node) doUpdate(branch *Branch, data interface{}, strict bool) Revision {
+func (n *node) doUpdate(ctx context.Context, branch *Branch, data interface{}, strict bool) Revision {
 	log.Debugw("comparing-types", log.Fields{"expected": reflect.ValueOf(n.Type).Type(), "actual": reflect.TypeOf(data)})
 
 	if reflect.TypeOf(data) != reflect.ValueOf(n.Type).Type() {
@@ -613,7 +612,7 @@
 			log.Debugf("checking access violations")
 		}
 
-		rev := branch.GetLatest().UpdateData(data, branch)
+		rev := branch.GetLatest().UpdateData(ctx, data, branch)
 		changes := []ChangeTuple{{POST_UPDATE, branch.GetLatest().GetData(), rev.GetData()}}
 		n.makeLatest(branch, rev, changes)
 
@@ -623,7 +622,7 @@
 }
 
 // Add inserts a new node at the specified path with the provided data
-func (n *node) Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
+func (n *node) Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
 	n.mutex.Lock()
 	defer n.mutex.Unlock()
 
@@ -688,7 +687,7 @@
 
 				children = append(children, childRev)
 
-				updatedRev := rev.UpdateChildren(name, children, branch)
+				updatedRev := rev.UpdateChildren(ctx, name, children, branch)
 				changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
 				childRev.SetupWatch(childRev.GetName())
 
@@ -718,7 +717,7 @@
 			}
 
 			childNode := childRev.GetNode()
-			newChildRev := childNode.Add(path, data, txid, makeBranch)
+			newChildRev := childNode.Add(ctx, path, data, txid, makeBranch)
 
 			// Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
 			newChildRev.SetName(name + "/" + keyValue.(string))
@@ -732,7 +731,7 @@
 				children = append(children, newChildRev)
 			}
 
-			updatedRev := rev.UpdateChildren(name, children, branch)
+			updatedRev := rev.UpdateChildren(ctx, name, children, branch)
 			n.makeLatest(branch, updatedRev, nil)
 
 			updatedRev.ChildDrop(name, childRev.GetHash())
@@ -749,7 +748,7 @@
 }
 
 // Remove eliminates a node at the specified path
-func (n *node) Remove(path string, txid string, makeBranch MakeBranchFunction) Revision {
+func (n *node) Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision {
 	n.mutex.Lock()
 	defer n.mutex.Unlock()
 
@@ -805,7 +804,7 @@
 					if childNode.Proxy == nil {
 						childNode.Proxy = n.Proxy
 					}
-					newChildRev := childNode.Remove(path, txid, makeBranch)
+					newChildRev := childNode.Remove(ctx, path, txid, makeBranch)
 
 					branch.LatestLock.Lock()
 					defer branch.LatestLock.Unlock()
@@ -833,7 +832,7 @@
 				}
 
 				childRev.StorageDrop(txid, true)
-				GetRevCache().Cache.Delete(childRev.GetName())
+				GetRevCache().Delete(childRev.GetName())
 
 				branch.LatestLock.Lock()
 				defer branch.LatestLock.Unlock()
@@ -950,11 +949,11 @@
 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ node Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 // CreateProxy returns a reference to a sub-tree of the data model
-func (n *node) CreateProxy(path string, exclusive bool) *Proxy {
-	return n.createProxy(path, path, n, exclusive)
+func (n *node) CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy {
+	return n.createProxy(ctx, path, path, n, exclusive)
 }
 
-func (n *node) createProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+func (n *node) createProxy(ctx context.Context, path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
 	log.Debugw("node-create-proxy", log.Fields{
 		"node-type":        reflect.ValueOf(n.Type).Type(),
 		"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
@@ -973,7 +972,6 @@
 	partition := strings.SplitN(path, "/", 2)
 	name := partition[0]
 	var nodeType interface{}
-	// Node type is chosen depending on if we have reached the end of path or not
 	if len(partition) < 2 {
 		path = ""
 		nodeType = n.Type
@@ -1020,8 +1018,6 @@
 				children = make([]Revision, len(rev.GetChildren(name)))
 				copy(children, rev.GetChildren(name))
 
-				// Try to find a matching revision in memory
-				// If not found try the db
 				var childRev Revision
 				if _, childRev = n.findRevByKey(children, field.Key, keyValue); childRev != nil {
 					log.Debugw("found-revision-matching-key-in-memory", log.Fields{
@@ -1030,7 +1026,7 @@
 						"fullPath":         fullPath,
 						"name":             name,
 					})
-				} else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(fullPath, "", nil); revs != nil && len(revs) > 0 {
+				} else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(ctx, fullPath, "", nil); revs != nil && len(revs) > 0 {
 					log.Debugw("found-revision-matching-key-in-db", log.Fields{
 						"node-type":        reflect.ValueOf(n.Type).Type(),
 						"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
@@ -1048,7 +1044,7 @@
 				}
 				if childRev != nil {
 					childNode := childRev.GetNode()
-					return childNode.createProxy(path, fullPath, n, exclusive)
+					return childNode.createProxy(ctx, path, fullPath, n, exclusive)
 				}
 			} else {
 				log.Errorw("cannot-access-index-of-empty-container", log.Fields{
@@ -1067,7 +1063,7 @@
 			})
 			childRev := rev.GetChildren(name)[0]
 			childNode := childRev.GetNode()
-			return childNode.createProxy(path, fullPath, n, exclusive)
+			return childNode.createProxy(ctx, path, fullPath, n, exclusive)
 		}
 	} else {
 		log.Debugw("field-object-is-nil", log.Fields{
@@ -1116,12 +1112,12 @@
 		n.Proxy = NewProxy(r, n, parentNode, path, fullPath, exclusive)
 	} else {
 		log.Debugw("node-has-existing-proxy", log.Fields{
-			"node-type":        reflect.ValueOf(n.Proxy.Node.Type).Type(),
-			"parent-node-type": reflect.ValueOf(n.Proxy.ParentNode.Type).Type(),
-			"path":             n.Proxy.Path,
-			"fullPath":         n.Proxy.FullPath,
+			"node-type":        reflect.ValueOf(n.GetProxy().Node.Type).Type(),
+			"parent-node-type": reflect.ValueOf(n.GetProxy().ParentNode.Type).Type(),
+			"path":             n.GetProxy().Path,
+			"fullPath":         n.GetProxy().FullPath,
 		})
-		if n.Proxy.Exclusive {
+		if n.GetProxy().Exclusive {
 			log.Error("node is already owned exclusively")
 		}
 	}
@@ -1160,3 +1156,6 @@
 func (n *node) GetRoot() *root {
 	return n.Root
 }
+func (n *node) SetRoot(root *root) {
+	n.Root = root
+}