Updated Adapter to support to handle DHCP trap on NNI and packet-in/out and Bug Fixing.
Tested EAPOL/DHCP/HSIA functionality E2E with EdgeCore OLT and TWSH ONU KIT.

patch: PON port is derived from platform and sent to core and bug fixes

Retested EAPOL/DHCP/HSIA use case end to end with EdgeCore OLT and TWSH ONU KIT

Change-Id: I99df82fd7a1385c10878f6fe09ce0d30c48d8e99
diff --git a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
index a56b776..ea99cf7 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
@@ -23,7 +23,6 @@
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
 	"reflect"
-	"runtime/debug"
 	"strings"
 	"sync"
 )
@@ -72,10 +71,7 @@
 		return
 	}
 
-	if pair, _ := pr.kvStore.Get(pr.GetName()); pair != nil && skipOnExist {
-		log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash(), "name": pr.GetName()})
-		return
-	}
+	log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
 
 	if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
 		// TODO report error
@@ -89,10 +85,9 @@
 		}
 
 		if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
-			log.Warnw("problem-storing-revision-config",
-				log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
+			log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
 		} else {
-			log.Debugw("storing-revision-config", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
+			log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
 			pr.isStored = true
 		}
 	}
@@ -100,7 +95,7 @@
 
 func (pr *PersistedRevision) SetupWatch(key string) {
 	if key == "" {
-		log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+		log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
 		return
 	}
 
@@ -111,7 +106,7 @@
 	if pr.events == nil {
 		pr.events = make(chan *kvstore.Event)
 
-		log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+		log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
 
 		pr.SetName(key)
 		pr.events = pr.kvStore.CreateWatch(key)
@@ -120,7 +115,7 @@
 	if !pr.isWatched {
 		pr.isWatched = true
 
-		log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+		log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
 
 		// Start watching
 		go pr.startWatching()
@@ -128,7 +123,7 @@
 }
 
 func (pr *PersistedRevision) startWatching() {
-	log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "stack": string(debug.Stack())})
+	log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
 
 StopWatchLoop:
 	for {
@@ -154,17 +149,106 @@
 			case kvstore.PUT:
 				log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
 
-				if dataPair, err := pr.kvStore.Get(pr.GetName()); err != nil || dataPair == nil {
-					log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
-				} else {
-					data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
+				data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
 
-					if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
-						log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
-					} else {
-						if pr.GetNode().GetProxy() != nil {
-							pr.LoadFromPersistence(pr.GetNode().GetProxy().getFullPath(), "")
+				if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
+					log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
+				} else {
+					var pathLock string
+					var pac *proxyAccessControl
+					var blobs map[string]*kvstore.KVPair
+
+					// The watch reported new persistence data.
+					// Construct an object that will be used to update the memory
+					blobs = make(map[string]*kvstore.KVPair)
+					key, _ := kvstore.ToString(event.Key)
+					blobs[key] = &kvstore.KVPair{
+						Key:     key,
+						Value:   event.Value,
+						Session: "",
+						Lease:   0,
+					}
+
+					if pr.GetNode().GetProxy() != nil {
+						//
+						// If a proxy exists for this revision, use it to lock access to the path
+						// and prevent simultaneous updates to the object in memory
+						//
+						pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
+
+						//If the proxy already has a request in progress, then there is no need to process the watch
+						log.Debugw("checking-if-path-is-locked", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
+						if PAC().IsReserved(pathLock) {
+							log.Debugw("operation-in-progress", log.Fields{
+								"key":       pr.GetHash(),
+								"path":      pr.GetNode().GetProxy().getFullPath(),
+								"operation": pr.GetNode().GetRoot().GetProxy().Operation,
+							})
+
+							continue
+
+							// TODO/FIXME: keep logic for now in case we need to control based on running operation
+							//
+							// The code below seems to revert the in-memory/persistence value (occasionally) with
+							// the one received from the watch event.
+							//
+							// The same problem may occur, in the scenario where the core owning a device
+							// receives a watch event for an update made by another core, and when the owning core is
+							// also processing an update.  Need to investigate...
+							//
+							//switch pr.GetNode().GetRoot().GetProxy().Operation {
+							//case PROXY_UPDATE:
+							//	// We will need to reload once the update operation completes.
+							//	// Therefore, the data of the current event is most likely out-dated
+							//	// and should be ignored
+							//	log.Debugw("reload-required", log.Fields{
+							//		"key":       pr.GetHash(),
+							//		"path":      pr.GetNode().GetProxy().getFullPath(),
+							//		"operation": pr.GetNode().GetRoot().GetProxy().Operation,
+							//	})
+							//
+							//	// Eliminate the object constructed earlier
+							//	blobs = nil
+							//
+							//case PROXY_ADD:
+							//	fallthrough
+							//
+							//case PROXY_REMOVE:
+							//	fallthrough
+							//
+							//case PROXY_GET:
+							//	fallthrough
+							//
+							//default:
+							//	// No need to process the event ... move on
+							//	log.Debugw("", log.Fields{
+							//		"key":       pr.GetHash(),
+							//		"path":      pr.GetNode().GetProxy().getFullPath(),
+							//		"operation": pr.GetNode().GetRoot().GetProxy().Operation,
+							//	})
+							//
+							//	continue
+							//}
 						}
+
+						// Reserve the path to prevent others to modify while we reload from persistence
+						log.Debugw("reserve-and-lock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
+						pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
+						pac.SetProxy(pr.GetNode().GetProxy())
+						pac.lock()
+
+						// Load changes and apply to memory
+						pr.LoadFromPersistence(pr.GetName(), "", blobs)
+
+						log.Debugw("release-and-unlock-path", log.Fields{"key": pr.GetHash(), "path": pathLock})
+						pac.unlock()
+						PAC().ReleasePath(pathLock)
+
+					} else {
+						log.Debugw("revision-with-no-proxy", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+
+						// Load changes and apply to memory
+						pr.LoadFromPersistence(pr.GetName(), "", blobs)
 					}
 				}
 
@@ -176,7 +260,7 @@
 
 	Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
 
-	log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "stack": string(debug.Stack())})
+	log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
 }
 
 // UpdateData modifies the information in the data model and saves it in the persistent storage
@@ -196,7 +280,7 @@
 		newPR.isWatched = false
 		newPR.isStored = false
 		pr.Drop(branch.Txid, false)
-		newPR.SetupWatch(newPR.GetName())
+		pr.Drop(branch.Txid, false)
 	} else {
 		newPR.isWatched = true
 		newPR.isStored = true
@@ -206,7 +290,8 @@
 }
 
 // UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateChildren(name string, children []Revision,
+	branch *Branch) Revision {
 	log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
 
 	newNPR := pr.Revision.UpdateChildren(name, children, branch)
@@ -222,7 +307,6 @@
 		newPR.isWatched = false
 		newPR.isStored = false
 		pr.Drop(branch.Txid, false)
-		newPR.SetupWatch(newPR.GetName())
 	} else {
 		newPR.isWatched = true
 		newPR.isStored = true
@@ -248,7 +332,6 @@
 		newPR.isWatched = false
 		newPR.isStored = false
 		pr.Drop(branch.Txid, false)
-		newPR.SetupWatch(newPR.GetName())
 	} else {
 		newPR.isWatched = true
 		newPR.isStored = true
@@ -267,7 +350,7 @@
 // and its associated config when required
 func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
 	log.Debugw("dropping-revision",
-		log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
+		log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
 
 	pr.mutex.Lock()
 	defer pr.mutex.Unlock()
@@ -297,25 +380,28 @@
 
 // verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
 func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
-	rev := pr
+	//rev := pr
 
-	children := make([]Revision, len(rev.GetBranch().GetLatest().GetChildren(typeName)))
-	copy(children, rev.GetBranch().GetLatest().GetChildren(typeName))
+	children := make([]Revision, len(pr.GetBranch().GetLatest().GetChildren(typeName)))
+	copy(children, pr.GetBranch().GetLatest().GetChildren(typeName))
 
 	// Verify if the revision contains a child that matches that key
-	if childIdx, childRev := rev.GetNode().findRevByKey(rev.GetBranch().GetLatest().GetChildren(typeName), keyName, keyValue); childRev != nil {
+	if childIdx, childRev := pr.GetNode().findRevByKey(pr.GetBranch().GetLatest().GetChildren(typeName), keyName,
+		keyValue); childRev != nil {
 		// A child matching the provided key exists in memory
 		// Verify if the data differs to what was retrieved from persistence
 		if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
 			log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
 				"key":  childRev.GetHash(),
 				"name": childRev.GetName(),
+				"data": childRev.GetData(),
 			})
 
 			// Data has changed; replace the child entry and update the parent revision
-			updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
-			updatedChildRev.SetupWatch(updatedChildRev.GetName())
 			childRev.Drop(txid, false)
+			updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+			updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
+			updatedChildRev.SetupWatch(updatedChildRev.GetName())
 
 			if childIdx >= 0 {
 				children[childIdx] = updatedChildRev
@@ -323,18 +409,19 @@
 				children = append(children, updatedChildRev)
 			}
 
-			rev.GetBranch().LatestLock.Lock()
-			updatedRev := rev.UpdateChildren(typeName, children, rev.GetBranch())
-			rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
-			rev.GetBranch().LatestLock.Unlock()
+			pr.GetBranch().LatestLock.Lock()
+			updatedRev := pr.GetBranch().Node.Latest().UpdateChildren(typeName, children, pr.GetBranch())
+			pr.GetBranch().Node.makeLatest(pr.GetBranch(), updatedRev, nil)
+			pr.GetBranch().LatestLock.Unlock()
 
 			// Drop the previous child revision
-			rev.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
+			pr.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
 
 			if updatedChildRev != nil {
 				log.Debugw("verify-persisted-entry--adding-child", log.Fields{
 					"key":  updatedChildRev.GetHash(),
 					"name": updatedChildRev.GetName(),
+					"data": updatedChildRev.GetData(),
 				})
 				response = updatedChildRev
 			}
@@ -343,11 +430,13 @@
 			log.Debugw("verify-persisted-entry--same-data", log.Fields{
 				"key":  childRev.GetHash(),
 				"name": childRev.GetName(),
+				"data": childRev.GetData(),
 			})
 			if childRev != nil {
 				log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
 					"key":  childRev.GetHash(),
 					"name": childRev.GetName(),
+					"data": childRev.GetData(),
 				})
 				response = childRev
 			}
@@ -358,29 +447,32 @@
 		log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
 			"key":  keyValue,
 			"name": typeName,
+			"data": data,
 		})
 
 		// Construct a new child node with the retrieved persistence data
-		childRev = rev.GetBranch().Node.MakeNode(data, txid).Latest(txid)
+		childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
 
 		// We need to start watching this entry for future changes
 		childRev.SetName(typeName + "/" + keyValue)
 
 		// Add the child to the parent revision
-		rev.GetBranch().LatestLock.Lock()
+		pr.GetBranch().LatestLock.Lock()
 		children = append(children, childRev)
-		updatedRev := rev.GetBranch().Node.Latest().UpdateChildren(typeName, children, rev.GetBranch())
+		updatedRev := pr.GetBranch().Node.Latest().UpdateChildren(typeName, children, pr.GetBranch())
+		updatedRev.GetNode().SetProxy(pr.GetNode().GetProxy())
 		childRev.SetupWatch(childRev.GetName())
 
 		//rev.GetBranch().Node.Latest().Drop(txid, false)
-		rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
-		rev.GetBranch().LatestLock.Unlock()
+		pr.GetBranch().Node.makeLatest(pr.GetBranch(), updatedRev, nil)
+		pr.GetBranch().LatestLock.Unlock()
 
 		// Child entry is valid and can be included in the response object
 		if childRev != nil {
 			log.Debugw("verify-persisted-entry--adding-child", log.Fields{
 				"key":  childRev.GetHash(),
 				"name": childRev.GetName(),
+				"data": childRev.GetData(),
 			})
 			response = childRev
 		}
@@ -391,39 +483,46 @@
 
 // LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
 // by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
 	pr.mutex.Lock()
 	defer pr.mutex.Unlock()
 
 	log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
 
 	var response []Revision
-	var rev Revision
 
-	rev = pr
+	for strings.HasPrefix(path, "/") {
+		path = path[1:]
+	}
 
 	if pr.kvStore != nil && path != "" {
-		blobMap, _ := pr.kvStore.List(path)
+		if blobs == nil || len(blobs) == 0 {
+			log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
+			blobs, _ = pr.kvStore.List(path)
+		}
 
 		partition := strings.SplitN(path, "/", 2)
 		name := partition[0]
 
+		var nodeType interface{}
 		if len(partition) < 2 {
 			path = ""
+			nodeType = pr.GetBranch().Node.Type
 		} else {
 			path = partition[1]
+			nodeType = pr.GetBranch().Node.Root.Type
 		}
 
-		field := ChildrenFields(rev.GetBranch().Node.Type)[name]
+		field := ChildrenFields(nodeType)[name]
 
 		if field != nil && field.IsContainer {
 			log.Debugw("load-from-persistence--start-blobs", log.Fields{
 				"path": path,
 				"name": name,
-				"size": len(blobMap),
+				"size": len(blobs),
 			})
 
-			for _, blob := range blobMap {
+			for _, blob := range blobs {
 				output := blob.Value.([]byte)
 
 				data := reflect.New(field.ClassType.Elem())
@@ -440,7 +539,8 @@
 						// based on the field's key attribute
 						_, key := GetAttributeValue(data.Interface(), field.Key, 0)
 
-						if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
+						if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(),
+							txid); entry != nil {
 							response = append(response, entry)
 						}
 					}
@@ -456,7 +556,8 @@
 					}
 					keyValue := field.KeyFromStr(key)
 
-					if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
+					if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string),
+						txid); entry != nil {
 						response = append(response, entry)
 					}
 				}
@@ -465,7 +566,8 @@
 			log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
 		} else {
 			log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
-				"type": rev.GetBranch().Node.Type,
+
+				"type": pr.GetBranch().Node.Type,
 				"name": name,
 			})
 		}