VOL-1459 : Ensure data model synchronization from kv
- Introduced a new List function to force a load from persistence
- Properly create a proxy for non-keyed nodes (e.g. /adapters)
- Optimized load from persistence operation to avoid existing entries
- Fixed/Enhanced proxy unit test
Change-Id: Ib368d32c517e74410b541bb8927429d066a9cfd0
diff --git a/db/model/node.go b/db/model/node.go
index 67848eb..7ea4417 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -125,10 +125,10 @@
for _, change := range changeAnnouncement {
log.Debugw("invoking callback",
log.Fields{
- "callbacks": n.GetProxy().getCallbacks(change.Type),
- "type":change.Type,
+ "callbacks": n.GetProxy().getCallbacks(change.Type),
+ "type": change.Type,
"previousData": change.PreviousData,
- "latestData": change.LatestData,
+ "latestData": change.LatestData,
})
n.GetRoot().AddCallback(
n.GetProxy().InvokeCallbacks,
@@ -248,7 +248,44 @@
}
// Get retrieves the data from a node tree that resides at the specified path
+func (n *node) List(path string, hash string, depth int, deep bool, txid string) interface{} {
+ log.Debugw("node-list-request", log.Fields{"path": path, "hash": hash, "depth":depth, "deep":deep, "txid":txid})
+ if deep {
+ depth = -1
+ }
+
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+
+ var branch *Branch
+ var rev Revision
+
+ if branch = n.GetBranch(txid); txid == "" || branch == nil {
+ branch = n.GetBranch(NONE)
+ }
+
+ if hash != "" {
+ rev = branch.GetRevision(hash)
+ } else {
+ rev = branch.GetLatest()
+ }
+
+ var result interface{}
+ var prList []interface{}
+ if pr := rev.LoadFromPersistence(path, txid); pr != nil {
+ for _, revEntry := range pr {
+ prList = append(prList, revEntry.GetData())
+ }
+ result = prList
+ }
+
+ return result
+}
+
+// Get retrieves the data from a node tree that resides at the specified path
func (n *node) Get(path string, hash string, depth int, deep bool, txid string) interface{} {
+ log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth":depth, "deep":deep, "txid":txid})
if deep {
depth = -1
}
@@ -355,7 +392,7 @@
var modifiedMsg interface{}
if n.GetProxy() != nil {
- log.Debug("invoking proxy GET Callbacks : %+v", msg)
+ log.Debugw("invoking-get-callbacks", log.Fields{"data": msg})
if modifiedMsg = n.GetProxy().InvokeCallbacks(GET, false, msg); modifiedMsg != nil {
msg = modifiedMsg
}
@@ -367,6 +404,8 @@
// Update changes the content of a node at the specified path with the provided data
func (n *node) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
+ log.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid":txid, "makeBranch": makeBranch})
+
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -396,9 +435,14 @@
path = partition[1]
}
+
field := ChildrenFields(n.Type)[name]
var children []Revision
+ if field == nil {
+ return n.doUpdate(branch, data, strict)
+ }
+
if field.IsContainer {
if path == "" {
log.Errorf("cannot update a list")
@@ -515,12 +559,15 @@
// Add inserts a new node at the specified path with the provided data
func (n *node) Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
+ log.Debugw("node-add-request", log.Fields{"path": path, "txid":txid, "makeBranch": makeBranch})
+
for strings.HasPrefix(path, "/") {
path = path[1:]
}
if path == "" {
// TODO raise error
log.Errorf("cannot add for non-container mode")
+ return nil
}
var branch *Branch
@@ -622,6 +669,8 @@
// Remove eliminates a node at the specified path
func (n *node) Remove(path string, txid string, makeBranch MakeBranchFunction) Revision {
+ log.Debugw("node-remove-request", log.Fields{"path": path, "txid":txid, "makeBranch": makeBranch})
+
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -820,7 +869,8 @@
if field.IsContainer {
if path == "" {
//log.Error("cannot proxy a container field")
- return n.makeProxy(path, fullPath, parentNode, exclusive)
+ newNode := n.MakeNode(reflect.New(field.ClassType.Elem()).Interface(), "")
+ return newNode.makeProxy(path, fullPath, parentNode, exclusive)
} else if field.Key != "" {
partition := strings.SplitN(path, "/", 2)
key := partition[0]
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 98e80e4..ecef3ae 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -172,37 +172,44 @@
field := ChildrenFields(rev.GetBranch().Node.Type)[name]
if field.IsContainer {
+ var children []Revision
+ children = make([]Revision, len(rev.GetChildren(name)))
+ copy(children, rev.GetChildren(name))
+ existChildMap := make(map[string]int)
+ for i, child := range rev.GetChildren(name) {
+ existChildMap[child.GetHash()] = i
+ }
+
for _, blob := range blobMap {
output := blob.Value.([]byte)
data := reflect.New(field.ClassType.Elem())
if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
- // TODO report error
+ log.Errorw(
+ "loading-from-persistence--failed-to-unmarshal",
+ log.Fields{"path": path, "txid": txid, "error": err},
+ )
} else {
-
- var children []Revision
-
if path == "" {
if field.Key != "" {
// e.g. /logical_devices/abcde --> path="" name=logical_devices key=abcde
if field.Key != "" {
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
-
_, key := GetAttributeValue(data.Interface(), field.Key, 0)
childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
childRev.SetHash(name + "/" + key.String())
- // Create watch for <component>/<key>
- pr.SetupWatch(childRev.GetHash())
+ // Do not process a child that is already in memory
+ if _, childExists := existChildMap[childRev.GetHash()]; !childExists {
+ // Create watch for <component>/<key>
+ pr.SetupWatch(childRev.GetHash())
- children = append(children, childRev)
- rev = rev.UpdateChildren(name, children, rev.GetBranch())
+ children = append(children, childRev)
+ rev = rev.UpdateChildren(name, children, rev.GetBranch())
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
-
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
+ }
response = append(response, childRev)
continue
}
@@ -219,9 +226,6 @@
}
keyValue := field.KeyFromStr(key)
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
-
idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue)
newChildRev := childRev.LoadFromPersistence(path, txid)
diff --git a/db/model/proxy.go b/db/model/proxy.go
index cfb2ef1..08c0359 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -158,6 +158,30 @@
return pathLock, controlled
}
+// List will retrieve information from the data model at the specified path location
+// A list operation will force access to persistence storage
+func (p *Proxy) List(path string, depth int, deep bool, txid string) interface{} {
+ var effectivePath string
+ if path == "/" {
+ effectivePath = p.getFullPath()
+ } else {
+ effectivePath = p.getFullPath() + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ log.Debugf("Path: %s, Effective: %s, PathLock: %s", path, effectivePath, pathLock)
+
+ pac := PAC().ReservePath(effectivePath, p, pathLock)
+ defer PAC().ReleasePath(pathLock)
+ pac.SetProxy(p)
+
+ rv := pac.List(path, depth, deep, txid, controlled)
+
+ return rv
+}
+
+
// Get will retrieve information from the data model at the specified path location
func (p *Proxy) Get(path string, depth int, deep bool, txid string) interface{} {
var effectivePath string
diff --git a/db/model/proxy_access_control.go b/db/model/proxy_access_control.go
index 990a61c..a3a7cee 100644
--- a/db/model/proxy_access_control.go
+++ b/db/model/proxy_access_control.go
@@ -159,6 +159,20 @@
pac.Proxy = proxy
}
+// List retrieves data linked to a data model path
+func (pac *proxyAccessControl) List(path string, depth int, deep bool, txid string, control bool) interface{} {
+ if control {
+ pac.lock()
+ defer pac.unlock()
+ log.Debugf("controlling list, stack = %s", string(debug.Stack()))
+ }
+
+ // FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
+ // The data traversal through reflection currently corrupts the content
+
+ return pac.getProxy().GetRoot().List(path, "", depth, deep, txid)
+}
+
// Get retrieves data linked to a data model path
func (pac *proxyAccessControl) Get(path string, depth int, deep bool, txid string, control bool) interface{} {
if control {
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index ea15bac..0180ce1 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -33,7 +33,9 @@
TestProxy_Root *root
TestProxy_Root_LogicalDevice *Proxy
TestProxy_Root_Device *Proxy
+ TestProxy_Root_Adapter *Proxy
TestProxy_DeviceId string
+ TestProxy_AdapterId string
TestProxy_LogicalDeviceId string
TestProxy_TargetDeviceId string
TestProxy_TargetLogicalDeviceId string
@@ -43,6 +45,7 @@
TestProxy_Flows *openflow_13.Flows
TestProxy_Device *voltha.Device
TestProxy_LogicalDevice *voltha.LogicalDevice
+ TestProxy_Adapter *voltha.Adapter
)
func init() {
@@ -51,6 +54,7 @@
TestProxy_Root = NewRoot(&voltha.Voltha{}, nil)
TestProxy_Root_LogicalDevice = TestProxy_Root.CreateProxy("/", false)
TestProxy_Root_Device = TestProxy_Root.CreateProxy("/", false)
+ TestProxy_Root_Adapter = TestProxy_Root.CreateProxy("/", false)
TestProxy_LogicalPorts = []*voltha.LogicalPort{
{
@@ -93,6 +97,12 @@
Ports: TestProxy_LogicalPorts,
Flows: TestProxy_Flows,
}
+
+ TestProxy_Adapter = &voltha.Adapter{
+ Id: TestProxy_AdapterId,
+ Vendor: "test-adapter-vendor",
+ Version: "test-adapter-version",
+ }
}
func TestProxy_1_1_1_Add_NewDevice(t *testing.T) {
@@ -143,6 +153,39 @@
}
}
+func TestProxy_1_1_3_Add_NewAdapter(t *testing.T) {
+ TestProxy_AdapterId = "test-adapter"
+ TestProxy_Adapter.Id = TestProxy_AdapterId
+ preAddExecuted := false
+ postAddExecuted := false
+
+ // Register ADD instructions callbacks
+ TestProxy_Root_Adapter.RegisterCallback(PRE_ADD, commonCallback, "PRE_ADD instructions for adapters", &preAddExecuted)
+ TestProxy_Root_Adapter.RegisterCallback(POST_ADD, commonCallback, "POST_ADD instructions for adapters", &postAddExecuted)
+
+ // Add the adapter
+ if added := TestProxy_Root_Adapter.Add("/adapters", TestProxy_Adapter, ""); added == nil {
+ t.Error("Failed to add adapter")
+ } else {
+ t.Logf("Added adapter : %+v", added)
+ }
+
+ // Verify that the added device can now be retrieved
+ if d := TestProxy_Root_Adapter.Get("/adapters/"+TestProxy_AdapterId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find added adapter")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found adapter: %s", string(djson))
+ }
+
+ if !preAddExecuted {
+ t.Error("PRE_ADD callback was not executed")
+ }
+ if !postAddExecuted {
+ t.Error("POST_ADD callback was not executed")
+ }
+}
+
func TestProxy_1_2_1_Get_AllDevices(t *testing.T) {
devices := TestProxy_Root_Device.Get("/devices", 1, false, "")
@@ -270,6 +313,52 @@
}
}
+func TestProxy_1_3_3_Update_Adapter(t *testing.T) {
+ preUpdateExecuted := false
+ postUpdateExecuted := false
+
+ adaptersProxy := TestProxy_Root.node.CreateProxy("/adapters", false)
+
+ if retrieved := TestProxy_Root_Adapter.Get("/adapters/"+TestProxy_AdapterId, 1, false, ""); retrieved == nil {
+ t.Error("Failed to get adapter")
+ } else {
+ t.Logf("Found raw adapter (root proxy): %+v", retrieved)
+
+ retrieved.(*voltha.Adapter).Version = "test-adapter-version-2"
+
+ adaptersProxy.RegisterCallback(
+ PRE_UPDATE,
+ commonCallback,
+ "PRE_UPDATE instructions for adapters", &preUpdateExecuted,
+ )
+ adaptersProxy.RegisterCallback(
+ POST_UPDATE,
+ commonCallback,
+ "POST_UPDATE instructions for adapters", &postUpdateExecuted,
+ )
+
+ if afterUpdate := adaptersProxy.Update("/"+TestProxy_AdapterId, retrieved, false, ""); afterUpdate == nil {
+ t.Error("Failed to update adapter")
+ } else {
+ t.Logf("Updated adapter : %+v", afterUpdate)
+ }
+
+ if d := TestProxy_Root_Adapter.Get("/adapters/"+TestProxy_AdapterId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated adapter (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found adapter (root proxy): %s raw: %+v", string(djson), d)
+ }
+
+ if !preUpdateExecuted {
+ t.Error("PRE_UPDATE callback for adapter was not executed")
+ }
+ if !postUpdateExecuted {
+ t.Error("POST_UPDATE callback for adapter was not executed")
+ }
+ }
+}
+
func TestProxy_1_4_1_Remove_Device(t *testing.T) {
preRemoveExecuted := false
postRemoveExecuted := false
diff --git a/ro_core/core/device_manager.go b/ro_core/core/device_manager.go
index 1851e27..43bc35f 100644
--- a/ro_core/core/device_manager.go
+++ b/ro_core/core/device_manager.go
@@ -118,11 +118,11 @@
return device.Root, nil
}
-// GetDevice retrieves the latest device information from the data model
+// ListDevices retrieves the latest devices from the data model
func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
log.Debug("ListDevices")
result := &voltha.Devices{}
- if devices := dMgr.clusterDataProxy.Get("/devices", 0, false, ""); devices != nil {
+ if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
for _, device := range devices.([]interface{}) {
if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
agent = newDeviceAgent(device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 376433b..cd662ac 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -197,11 +197,11 @@
return device.Root, nil
}
-// GetDevice retrieves the latest device information from the data model
+// ListDevices retrieves the latest devices from the data model
func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
log.Debug("ListDevices")
result := &voltha.Devices{}
- if devices := dMgr.clusterDataProxy.Get("/devices", 0, false, ""); devices != nil {
+ if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
for _, device := range devices.([]interface{}) {
if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)