VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability

- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
  decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism

Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 2ab91b7..d2d228f 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -19,7 +19,9 @@
 import (
 	"bytes"
 	"compress/gzip"
+	"context"
 	"github.com/golang/protobuf/proto"
+	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
 	"reflect"
@@ -32,11 +34,13 @@
 	Revision
 	Compress bool
 
-	events    chan *kvstore.Event
-	kvStore   *Backend
-	mutex     sync.RWMutex
-	isStored  bool
-	isWatched bool
+	events       chan *kvstore.Event
+	kvStore      *Backend
+	mutex        sync.RWMutex
+	versionMutex sync.RWMutex
+	Version      int64
+	isStored     bool
+	isWatched    bool
 }
 
 type watchCache struct {
@@ -57,10 +61,23 @@
 func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
 	pr := &PersistedRevision{}
 	pr.kvStore = branch.Node.GetRoot().KvStore
+	pr.Version = 1
 	pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
 	return pr
 }
 
+func (pr *PersistedRevision) getVersion() int64 {
+	pr.versionMutex.RLock()
+	defer pr.versionMutex.RUnlock()
+	return pr.Version
+}
+
+func (pr *PersistedRevision) setVersion(version int64) {
+	pr.versionMutex.Lock()
+	defer pr.versionMutex.Unlock()
+	pr.Version = version
+}
+
 // Finalize is responsible of saving the revision in the persistent storage
 func (pr *PersistedRevision) Finalize(skipOnExist bool) {
 	pr.store(skipOnExist)
@@ -73,8 +90,12 @@
 
 	log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
 
-	if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
-		// TODO report error
+	// clone the revision data to avoid any race conditions with processes
+	// accessing the same data
+	cloned := proto.Clone(pr.GetConfig().Data.(proto.Message))
+
+	if blob, err := proto.Marshal(cloned); err != nil {
+		log.Errorw("problem-to-marshal", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
 	} else {
 		if pr.Compress {
 			var b bytes.Buffer
@@ -84,10 +105,11 @@
 			blob = b.Bytes()
 		}
 
+		GetRevCache().Set(pr.GetName(), pr)
 		if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
 			log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
 		} else {
-			log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
+			log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
 			pr.isStored = true
 		}
 	}
@@ -145,6 +167,20 @@
 
 			case kvstore.PUT:
 				log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+				if latestRev.getVersion() >= event.Version {
+					log.Debugw("skipping-matching-or-older-revision", log.Fields{
+						"watch":          latestRev.GetName(),
+						"watch-version":  event.Version,
+						"latest-version": latestRev.getVersion(),
+					})
+					continue
+				} else {
+					log.Debugw("watch-revision-is-newer", log.Fields{
+						"watch":          latestRev.GetName(),
+						"watch-version":  event.Version,
+						"latest-version": latestRev.getVersion(),
+					})
+				}
 
 				data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
 
@@ -154,7 +190,6 @@
 					log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
 
 					var pathLock string
-					var pac *proxyAccessControl
 					var blobs map[string]*kvstore.KVPair
 
 					// The watch reported new persistence data.
@@ -166,6 +201,7 @@
 						Value:   event.Value,
 						Session: "",
 						Lease:   0,
+						Version: event.Version,
 					}
 
 					if latestRev.GetNode().GetProxy() != nil {
@@ -173,82 +209,35 @@
 						// If a proxy exists for this revision, use it to lock access to the path
 						// and prevent simultaneous updates to the object in memory
 						//
-						pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
 
 						//If the proxy already has a request in progress, then there is no need to process the watch
-						log.Debugw("checking-if-path-is-locked", log.Fields{"key": latestRev.GetHash(), "pathLock": pathLock})
-						if PAC().IsReserved(pathLock) {
+						if latestRev.GetNode().GetProxy().GetOperation() != PROXY_NONE {
 							log.Debugw("operation-in-progress", log.Fields{
 								"key":       latestRev.GetHash(),
 								"path":      latestRev.GetNode().GetProxy().getFullPath(),
-								"operation": latestRev.GetNode().GetProxy().Operation.String(),
+								"operation": latestRev.GetNode().GetProxy().operation.String(),
 							})
-
-							//continue
-
-							// Identify the operation type and determine if the watch event should be applied or not.
-							switch latestRev.GetNode().GetProxy().Operation {
-							case PROXY_REMOVE:
-								fallthrough
-
-							case PROXY_ADD:
-								fallthrough
-
-							case PROXY_UPDATE:
-								// We will need to reload once the operation completes.
-								// Therefore, the data of the current event is most likely out-dated
-								// and should be ignored
-								log.Debugw("ignore-watch-event", log.Fields{
-									"key":       latestRev.GetHash(),
-									"path":      latestRev.GetNode().GetProxy().getFullPath(),
-									"operation": latestRev.GetNode().GetProxy().Operation.String(),
-								})
-
-								continue
-
-							case PROXY_CREATE:
-								fallthrough
-
-							case PROXY_LIST:
-								fallthrough
-
-							case PROXY_GET:
-								fallthrough
-
-							case PROXY_WATCH:
-								fallthrough
-
-							default:
-								log.Debugw("process-watch-event", log.Fields{
-									"key":       latestRev.GetHash(),
-									"path":      latestRev.GetNode().GetProxy().getFullPath(),
-									"operation": latestRev.GetNode().GetProxy().Operation.String(),
-								})
-							}
+							continue
 						}
 
+						pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
+
 						// Reserve the path to prevent others to modify while we reload from persistence
-						log.Debugw("reserve-and-lock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
-						pac = PAC().ReservePath(latestRev.GetNode().GetProxy().getFullPath(),
-							latestRev.GetNode().GetProxy(), pathLock)
-						pac.lock()
-						latestRev.GetNode().GetProxy().Operation = PROXY_WATCH
-						pac.SetProxy(latestRev.GetNode().GetProxy())
+						latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+						latestRev.GetNode().GetProxy().SetOperation(PROXY_WATCH)
 
 						// Load changes and apply to memory
-						latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
+						latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
 
-						log.Debugw("release-and-unlock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
-						pac.getProxy().Operation = PROXY_GET
-						pac.unlock()
-						PAC().ReleasePath(pathLock)
+						// Release path
+						latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
 
 					} else {
 						// This block should be reached only if coming from a non-proxied request
 						log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 
 						// Load changes and apply to memory
-						latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
+						latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
 					}
 				}
 
@@ -264,16 +253,17 @@
 }
 
 // UpdateData modifies the information in the data model and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
 	log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
 
-	newNPR := pr.Revision.UpdateData(data, branch)
+	newNPR := pr.Revision.UpdateData(ctx, data, branch)
 
 	newPR := &PersistedRevision{
 		Revision:  newNPR,
 		Compress:  pr.Compress,
 		kvStore:   pr.kvStore,
 		events:    pr.events,
+		Version:   pr.getVersion(),
 		isWatched: pr.isWatched,
 	}
 
@@ -289,17 +279,17 @@
 }
 
 // UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateChildren(name string, children []Revision,
-	branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
 	log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
 
-	newNPR := pr.Revision.UpdateChildren(name, children, branch)
+	newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
 
 	newPR := &PersistedRevision{
 		Revision:  newNPR,
 		Compress:  pr.Compress,
 		kvStore:   pr.kvStore,
 		events:    pr.events,
+		Version:   pr.getVersion(),
 		isWatched: pr.isWatched,
 	}
 
@@ -324,6 +314,7 @@
 		Compress:  pr.Compress,
 		kvStore:   pr.kvStore,
 		events:    pr.events,
+		Version:   pr.getVersion(),
 		isWatched: pr.isWatched,
 	}
 
@@ -346,8 +337,7 @@
 // Drop takes care of eliminating a revision hash that is no longer needed
 // and its associated config when required
 func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
-	log.Debugw("dropping-revision",
-		log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
+	log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
 
 	pr.mutex.Lock()
 	defer pr.mutex.Unlock()
@@ -376,9 +366,10 @@
 }
 
 // verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
-func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
+func (pr *PersistedRevision) verifyPersistedEntry(ctx context.Context, data interface{}, typeName string, keyName string,
+	keyValue string, txid string, version int64) (response Revision) {
 	// Parent which holds the current node entry
-	parent := pr.GetBranch().Node.Root
+	parent := pr.GetBranch().Node.GetRoot()
 
 	// Get a copy of the parent's children
 	children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
@@ -389,11 +380,12 @@
 		// A child matching the provided key exists in memory
 		// Verify if the data differs from what was retrieved from persistence
 		// Also check if we are treating a newer revision of the data or not
-		if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
+		if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() && childRev.getVersion() < version {
 			log.Debugw("revision-data-is-different", log.Fields{
-				"key":  childRev.GetHash(),
-				"name": childRev.GetName(),
-				"data": childRev.GetData(),
+				"key":     childRev.GetHash(),
+				"name":    childRev.GetName(),
+				"data":    childRev.GetData(),
+				"version": childRev.getVersion(),
 			})
 
 			//
@@ -404,14 +396,15 @@
 			childRev.GetBranch().LatestLock.Lock()
 
 			// Update child
-			updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+			updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
 
 			updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
 			updatedChildRev.SetupWatch(updatedChildRev.GetName())
 			updatedChildRev.SetLastUpdate()
+			updatedChildRev.(*PersistedRevision).setVersion(version)
 
 			// Update cache
-			GetRevCache().Cache.Store(updatedChildRev.GetName(), updatedChildRev)
+			GetRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
 			childRev.Drop(txid, false)
 
 			childRev.GetBranch().LatestLock.Unlock()
@@ -423,7 +416,7 @@
 			// BEGIN lock parent -- Update parent
 			parent.GetBranch(NONE).LatestLock.Lock()
 
-			updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
+			updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
 			parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
 
 			parent.GetBranch(NONE).LatestLock.Unlock()
@@ -441,14 +434,8 @@
 				response = updatedChildRev
 			}
 		} else {
-			// Data is the same. Continue to the next entry
-			log.Debugw("same-revision-data", log.Fields{
-				"key":  childRev.GetHash(),
-				"name": childRev.GetName(),
-				"data": childRev.GetData(),
-			})
 			if childRev != nil {
-				log.Debugw("keeping-same-revision-data", log.Fields{
+				log.Debugw("keeping-revision-data", log.Fields{
 					"key":  childRev.GetHash(),
 					"name": childRev.GetName(),
 					"data": childRev.GetData(),
@@ -456,7 +443,10 @@
 
 				// Update timestamp to reflect when it was last read and to reset tracked timeout
 				childRev.SetLastUpdate()
-				GetRevCache().Cache.Store(childRev.GetName(), childRev)
+				if childRev.getVersion() < version {
+					childRev.(*PersistedRevision).setVersion(version)
+				}
+				GetRevCache().Set(childRev.GetName(), childRev)
 				response = childRev
 			}
 		}
@@ -479,6 +469,10 @@
 		// We need to start watching this entry for future changes
 		childRev.SetName(typeName + "/" + keyValue)
 		childRev.SetupWatch(childRev.GetName())
+		childRev.(*PersistedRevision).setVersion(version)
+
+		// Add entry to cache
+		GetRevCache().Set(childRev.GetName(), childRev)
 
 		pr.GetBranch().LatestLock.Unlock()
 		// END child lock
@@ -490,7 +484,7 @@
 		// BEGIN parent lock
 		parent.GetBranch(NONE).LatestLock.Lock()
 		children = append(children, childRev)
-		updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
+		updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
 		updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
 		parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
 		parent.GetBranch(NONE).LatestLock.Unlock()
@@ -512,7 +506,7 @@
 
 // LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
 // by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
 	pr.mutex.Lock()
 	defer pr.mutex.Unlock()
 
@@ -539,7 +533,7 @@
 			nodeType = pr.GetBranch().Node.Type
 		} else {
 			path = partition[1]
-			nodeType = pr.GetBranch().Node.Root.Type
+			nodeType = pr.GetBranch().Node.GetRoot().Type
 		}
 
 		field := ChildrenFields(nodeType)[name]
@@ -574,7 +568,7 @@
 						// based on the field's key attribute
 						_, key := GetAttributeValue(data.Interface(), field.Key, 0)
 
-						if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
+						if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, key.String(), txid, blob.Version); entry != nil {
 							response = append(response, entry)
 						}
 					} else {
@@ -601,7 +595,7 @@
 					}
 					keyValue := field.KeyFromStr(key)
 
-					if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
+					if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, keyValue.(string), txid, blob.Version); entry != nil {
 						response = append(response, entry)
 					}
 				}