VOL-1459 : Ensure data model synchronization from kv

- Introduced a new List function to force a load from persistence
- Properly create a proxy for non-keyed nodes (e.g. /adapters)
- Optimized load from persistence operation to avoid existing entries
- Fixed/Enhanced proxy unit test

Change-Id: Ib368d32c517e74410b541bb8927429d066a9cfd0
diff --git a/db/model/node.go b/db/model/node.go
index 67848eb..7ea4417 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -125,10 +125,10 @@
 			for _, change := range changeAnnouncement {
 				log.Debugw("invoking callback",
 					log.Fields{
-						"callbacks": n.GetProxy().getCallbacks(change.Type),
-						"type":change.Type,
+						"callbacks":    n.GetProxy().getCallbacks(change.Type),
+						"type":         change.Type,
 						"previousData": change.PreviousData,
-						"latestData": change.LatestData,
+						"latestData":   change.LatestData,
 					})
 				n.GetRoot().AddCallback(
 					n.GetProxy().InvokeCallbacks,
@@ -248,7 +248,44 @@
 }
 
 // Get retrieves the data from a node tree that resides at the specified path
+func (n *node) List(path string, hash string, depth int, deep bool, txid string) interface{} {
+	log.Debugw("node-list-request", log.Fields{"path": path, "hash": hash, "depth":depth, "deep":deep, "txid":txid})
+	if deep {
+		depth = -1
+	}
+
+	for strings.HasPrefix(path, "/") {
+		path = path[1:]
+	}
+
+	var branch *Branch
+	var rev Revision
+
+	if branch = n.GetBranch(txid); txid == "" || branch == nil {
+		branch = n.GetBranch(NONE)
+	}
+
+	if hash != "" {
+		rev = branch.GetRevision(hash)
+	} else {
+		rev = branch.GetLatest()
+	}
+
+	var result interface{}
+	var prList []interface{}
+	if pr := rev.LoadFromPersistence(path, txid); pr != nil {
+		for _, revEntry := range pr {
+			prList = append(prList, revEntry.GetData())
+		}
+		result = prList
+	}
+
+	return result
+}
+
+// Get retrieves the data from a node tree that resides at the specified path
 func (n *node) Get(path string, hash string, depth int, deep bool, txid string) interface{} {
+	log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth":depth, "deep":deep, "txid":txid})
 	if deep {
 		depth = -1
 	}
@@ -355,7 +392,7 @@
 	var modifiedMsg interface{}
 
 	if n.GetProxy() != nil {
-		log.Debug("invoking proxy GET Callbacks : %+v", msg)
+		log.Debugw("invoking-get-callbacks", log.Fields{"data": msg})
 		if modifiedMsg = n.GetProxy().InvokeCallbacks(GET, false, msg); modifiedMsg != nil {
 			msg = modifiedMsg
 		}
@@ -367,6 +404,8 @@
 
 // Update changes the content of a node at the specified path with the provided data
 func (n *node) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
+	log.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid":txid, "makeBranch": makeBranch})
+
 	for strings.HasPrefix(path, "/") {
 		path = path[1:]
 	}
@@ -396,9 +435,14 @@
 		path = partition[1]
 	}
 
+
 	field := ChildrenFields(n.Type)[name]
 	var children []Revision
 
+	if field == nil {
+		return n.doUpdate(branch, data, strict)
+	}
+
 	if field.IsContainer {
 		if path == "" {
 			log.Errorf("cannot update a list")
@@ -515,12 +559,15 @@
 
 // Add inserts a new node at the specified path with the provided data
 func (n *node) Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
+	log.Debugw("node-add-request", log.Fields{"path": path, "txid":txid, "makeBranch": makeBranch})
+
 	for strings.HasPrefix(path, "/") {
 		path = path[1:]
 	}
 	if path == "" {
 		// TODO raise error
 		log.Errorf("cannot add for non-container mode")
+		return nil
 	}
 
 	var branch *Branch
@@ -622,6 +669,8 @@
 
 // Remove eliminates a node at the specified path
 func (n *node) Remove(path string, txid string, makeBranch MakeBranchFunction) Revision {
+	log.Debugw("node-remove-request", log.Fields{"path": path, "txid":txid, "makeBranch": makeBranch})
+
 	for strings.HasPrefix(path, "/") {
 		path = path[1:]
 	}
@@ -820,7 +869,8 @@
 	if field.IsContainer {
 		if path == "" {
 			//log.Error("cannot proxy a container field")
-			return n.makeProxy(path, fullPath, parentNode, exclusive)
+			newNode := n.MakeNode(reflect.New(field.ClassType.Elem()).Interface(), "")
+			return newNode.makeProxy(path, fullPath, parentNode, exclusive)
 		} else if field.Key != "" {
 			partition := strings.SplitN(path, "/", 2)
 			key := partition[0]
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 98e80e4..ecef3ae 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -172,37 +172,44 @@
 		field := ChildrenFields(rev.GetBranch().Node.Type)[name]
 
 		if field.IsContainer {
+			var children []Revision
+			children = make([]Revision, len(rev.GetChildren(name)))
+			copy(children, rev.GetChildren(name))
+			existChildMap := make(map[string]int)
+			for i, child := range rev.GetChildren(name) {
+				existChildMap[child.GetHash()] = i
+			}
+
 			for _, blob := range blobMap {
 				output := blob.Value.([]byte)
 
 				data := reflect.New(field.ClassType.Elem())
 
 				if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
-					// TODO report error
+					log.Errorw(
+						"loading-from-persistence--failed-to-unmarshal",
+						log.Fields{"path": path, "txid": txid, "error": err},
+					)
 				} else {
-
-					var children []Revision
-
 					if path == "" {
 						if field.Key != "" {
 							// e.g. /logical_devices/abcde --> path="" name=logical_devices key=abcde
 							if field.Key != "" {
-								children = make([]Revision, len(rev.GetChildren(name)))
-								copy(children, rev.GetChildren(name))
-
 								_, key := GetAttributeValue(data.Interface(), field.Key, 0)
 
 								childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
 								childRev.SetHash(name + "/" + key.String())
 
-								// Create watch for <component>/<key>
-								pr.SetupWatch(childRev.GetHash())
+								// Do not process a child that is already in memory
+								if _, childExists := existChildMap[childRev.GetHash()]; !childExists {
+									// Create watch for <component>/<key>
+									pr.SetupWatch(childRev.GetHash())
 
-								children = append(children, childRev)
-								rev = rev.UpdateChildren(name, children, rev.GetBranch())
+									children = append(children, childRev)
+									rev = rev.UpdateChildren(name, children, rev.GetBranch())
 
-								rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
-
+									rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
+								}
 								response = append(response, childRev)
 								continue
 							}
@@ -219,9 +226,6 @@
 						}
 						keyValue := field.KeyFromStr(key)
 
-						children = make([]Revision, len(rev.GetChildren(name)))
-						copy(children, rev.GetChildren(name))
-
 						idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue)
 
 						newChildRev := childRev.LoadFromPersistence(path, txid)
diff --git a/db/model/proxy.go b/db/model/proxy.go
index cfb2ef1..08c0359 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -158,6 +158,30 @@
 	return pathLock, controlled
 }
 
+// List will retrieve information from the data model at the specified path location
+// A list operation will force access to persistence storage
+func (p *Proxy) List(path string, depth int, deep bool, txid string) interface{} {
+	var effectivePath string
+	if path == "/" {
+		effectivePath = p.getFullPath()
+	} else {
+		effectivePath = p.getFullPath() + path
+	}
+
+	pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+	log.Debugf("Path: %s, Effective: %s, PathLock: %s", path, effectivePath, pathLock)
+
+	pac := PAC().ReservePath(effectivePath, p, pathLock)
+	defer PAC().ReleasePath(pathLock)
+	pac.SetProxy(p)
+
+	rv := pac.List(path, depth, deep, txid, controlled)
+
+	return rv
+}
+
+
 // Get will retrieve information from the data model at the specified path location
 func (p *Proxy) Get(path string, depth int, deep bool, txid string) interface{} {
 	var effectivePath string
diff --git a/db/model/proxy_access_control.go b/db/model/proxy_access_control.go
index 990a61c..a3a7cee 100644
--- a/db/model/proxy_access_control.go
+++ b/db/model/proxy_access_control.go
@@ -159,6 +159,20 @@
 	pac.Proxy = proxy
 }
 
+// List retrieves data linked to a data model path
+func (pac *proxyAccessControl) List(path string, depth int, deep bool, txid string, control bool) interface{} {
+	if control {
+		pac.lock()
+		defer pac.unlock()
+		log.Debugf("controlling list, stack = %s", string(debug.Stack()))
+	}
+
+	// FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
+	// The data traversal through reflection currently corrupts the content
+
+	return pac.getProxy().GetRoot().List(path, "", depth, deep, txid)
+}
+
 // Get retrieves data linked to a data model path
 func (pac *proxyAccessControl) Get(path string, depth int, deep bool, txid string, control bool) interface{} {
 	if control {
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index ea15bac..0180ce1 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -33,7 +33,9 @@
 	TestProxy_Root                  *root
 	TestProxy_Root_LogicalDevice    *Proxy
 	TestProxy_Root_Device           *Proxy
+	TestProxy_Root_Adapter          *Proxy
 	TestProxy_DeviceId              string
+	TestProxy_AdapterId             string
 	TestProxy_LogicalDeviceId       string
 	TestProxy_TargetDeviceId        string
 	TestProxy_TargetLogicalDeviceId string
@@ -43,6 +45,7 @@
 	TestProxy_Flows                 *openflow_13.Flows
 	TestProxy_Device                *voltha.Device
 	TestProxy_LogicalDevice         *voltha.LogicalDevice
+	TestProxy_Adapter               *voltha.Adapter
 )
 
 func init() {
@@ -51,6 +54,7 @@
 	TestProxy_Root = NewRoot(&voltha.Voltha{}, nil)
 	TestProxy_Root_LogicalDevice = TestProxy_Root.CreateProxy("/", false)
 	TestProxy_Root_Device = TestProxy_Root.CreateProxy("/", false)
+	TestProxy_Root_Adapter = TestProxy_Root.CreateProxy("/", false)
 
 	TestProxy_LogicalPorts = []*voltha.LogicalPort{
 		{
@@ -93,6 +97,12 @@
 		Ports:      TestProxy_LogicalPorts,
 		Flows:      TestProxy_Flows,
 	}
+
+	TestProxy_Adapter = &voltha.Adapter{
+		Id:      TestProxy_AdapterId,
+		Vendor:  "test-adapter-vendor",
+		Version: "test-adapter-version",
+	}
 }
 
 func TestProxy_1_1_1_Add_NewDevice(t *testing.T) {
@@ -143,6 +153,39 @@
 	}
 }
 
+func TestProxy_1_1_3_Add_NewAdapter(t *testing.T) {
+	TestProxy_AdapterId = "test-adapter"
+	TestProxy_Adapter.Id = TestProxy_AdapterId
+	preAddExecuted := false
+	postAddExecuted := false
+
+	// Register ADD instructions callbacks
+	TestProxy_Root_Adapter.RegisterCallback(PRE_ADD, commonCallback, "PRE_ADD instructions for adapters", &preAddExecuted)
+	TestProxy_Root_Adapter.RegisterCallback(POST_ADD, commonCallback, "POST_ADD instructions for adapters", &postAddExecuted)
+
+	// Add the adapter
+	if added := TestProxy_Root_Adapter.Add("/adapters", TestProxy_Adapter, ""); added == nil {
+		t.Error("Failed to add adapter")
+	} else {
+		t.Logf("Added adapter : %+v", added)
+	}
+
+	// Verify that the added device can now be retrieved
+	if d := TestProxy_Root_Adapter.Get("/adapters/"+TestProxy_AdapterId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+		t.Error("Failed to find added adapter")
+	} else {
+		djson, _ := json.Marshal(d)
+		t.Logf("Found adapter: %s", string(djson))
+	}
+
+	if !preAddExecuted {
+		t.Error("PRE_ADD callback was not executed")
+	}
+	if !postAddExecuted {
+		t.Error("POST_ADD callback was not executed")
+	}
+}
+
 func TestProxy_1_2_1_Get_AllDevices(t *testing.T) {
 	devices := TestProxy_Root_Device.Get("/devices", 1, false, "")
 
@@ -270,6 +313,52 @@
 	}
 }
 
+func TestProxy_1_3_3_Update_Adapter(t *testing.T) {
+	preUpdateExecuted := false
+	postUpdateExecuted := false
+
+	adaptersProxy := TestProxy_Root.node.CreateProxy("/adapters", false)
+
+	if retrieved := TestProxy_Root_Adapter.Get("/adapters/"+TestProxy_AdapterId, 1, false, ""); retrieved == nil {
+		t.Error("Failed to get adapter")
+	} else {
+		t.Logf("Found raw adapter (root proxy): %+v", retrieved)
+
+		retrieved.(*voltha.Adapter).Version = "test-adapter-version-2"
+
+		adaptersProxy.RegisterCallback(
+			PRE_UPDATE,
+			commonCallback,
+			"PRE_UPDATE instructions for adapters", &preUpdateExecuted,
+		)
+		adaptersProxy.RegisterCallback(
+			POST_UPDATE,
+			commonCallback,
+			"POST_UPDATE instructions for adapters", &postUpdateExecuted,
+		)
+
+		if afterUpdate := adaptersProxy.Update("/"+TestProxy_AdapterId, retrieved, false, ""); afterUpdate == nil {
+			t.Error("Failed to update adapter")
+		} else {
+			t.Logf("Updated adapter : %+v", afterUpdate)
+		}
+
+		if d := TestProxy_Root_Adapter.Get("/adapters/"+TestProxy_AdapterId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+			t.Error("Failed to find updated adapter (root proxy)")
+		} else {
+			djson, _ := json.Marshal(d)
+			t.Logf("Found adapter (root proxy): %s raw: %+v", string(djson), d)
+		}
+
+		if !preUpdateExecuted {
+			t.Error("PRE_UPDATE callback for adapter was not executed")
+		}
+		if !postUpdateExecuted {
+			t.Error("POST_UPDATE callback for adapter was not executed")
+		}
+	}
+}
+
 func TestProxy_1_4_1_Remove_Device(t *testing.T) {
 	preRemoveExecuted := false
 	postRemoveExecuted := false
diff --git a/ro_core/core/device_manager.go b/ro_core/core/device_manager.go
index 1851e27..43bc35f 100644
--- a/ro_core/core/device_manager.go
+++ b/ro_core/core/device_manager.go
@@ -118,11 +118,11 @@
 	return device.Root, nil
 }
 
-// GetDevice retrieves the latest device information from the data model
+// ListDevices retrieves the latest devices from the data model
 func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
 	log.Debug("ListDevices")
 	result := &voltha.Devices{}
-	if devices := dMgr.clusterDataProxy.Get("/devices", 0, false, ""); devices != nil {
+	if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
 		for _, device := range devices.([]interface{}) {
 			if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
 				agent = newDeviceAgent(device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 376433b..cd662ac 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -197,11 +197,11 @@
 	return device.Root, nil
 }
 
-// GetDevice retrieves the latest device information from the data model
+// ListDevices retrieves the latest devices from the data model
 func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
 	log.Debug("ListDevices")
 	result := &voltha.Devices{}
-	if devices := dMgr.clusterDataProxy.Get("/devices", 0, false, ""); devices != nil {
+	if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
 		for _, device := range devices.([]interface{}) {
 			if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
 				agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)