VOL-1283: Fixed callback execution consistency for all proxy levels
- Callbacks are executed at any proxy levels
- Uncovered some issues with the base children fields structure
- cleaned up the root/node structures
- Ensure that a get command returns a clone.
Change-Id: Ic2cd5420c29332bd9b5d6f303a7fd9d0d0ccaf06
diff --git a/db/model/child_type.go b/db/model/child_type.go
index d503b8d..a96883c 100644
--- a/db/model/child_type.go
+++ b/db/model/child_type.go
@@ -74,12 +74,12 @@
if proto.HasExtension(options, common.E_ChildNode) {
isContainer := *field.Label == descriptor.FieldDescriptorProto_LABEL_REPEATED
meta, _ := proto.GetExtension(options, common.E_ChildNode)
- var keyFromStr func(string) interface{}
- if meta.(*common.ChildNode).GetKey() == "" {
- //log.Debugf("Child key is empty ... moving on")
- } else {
- parentType := FindOwnerType(reflect.ValueOf(cls), field.GetName(), 0, false)
+ var keyFromStr func(string) interface{}
+ var ct ChildType
+
+ parentType := FindOwnerType(reflect.ValueOf(cls), field.GetName(), 0, false)
+ if meta.(*common.ChildNode).GetKey() != "" {
keyType := FindKeyOwner(reflect.New(parentType).Elem().Interface(), meta.(*common.ChildNode).GetKey(), 0)
switch keyType.(reflect.Type).Name() {
@@ -110,18 +110,17 @@
default:
log.Errorf("Key type not implemented - type: %s\n", keyType.(reflect.Type))
}
-
- ct := ChildType{
- ClassModule: parentType.String(),
- ClassType: parentType,
- IsContainer: isContainer,
- Key: meta.(*common.ChildNode).GetKey(),
- KeyFromStr: keyFromStr,
- }
-
- names[field.GetName()] = &ct
-
}
+
+ ct = ChildType{
+ ClassModule: parentType.String(),
+ ClassType: parentType,
+ IsContainer: isContainer,
+ Key: meta.(*common.ChildNode).GetKey(),
+ KeyFromStr: keyFromStr,
+ }
+
+ names[field.GetName()] = &ct
}
}
}
diff --git a/db/model/child_type_test.go b/db/model/child_type_test.go
index fcfe949..4659805 100644
--- a/db/model/child_type_test.go
+++ b/db/model/child_type_test.go
@@ -17,7 +17,6 @@
import (
"github.com/opencord/voltha-go/protos/voltha"
- "reflect"
"testing"
)
@@ -41,9 +40,9 @@
names := ChildrenFields(cls)
- tst := reflect.ValueOf(cls).Elem().FieldByName("ImageDownloads")
-
- t.Logf("############ Field by name : %+v\n", reflect.TypeOf(tst.Interface()))
+ //tst := reflect.ValueOf(cls).Elem().FieldByName("ImageDownloads")
+ //
+ //t.Logf("############ Field by name : %+v\n", reflect.TypeOf(tst.Interface()))
if names == nil || len(names) == 0 {
t.Errorf("ChildrenFields failed to return names: %+v\n", names)
diff --git a/db/model/event_bus.go b/db/model/event_bus.go
index c4dfbdc..aaeb7ac 100644
--- a/db/model/event_bus.go
+++ b/db/model/event_bus.go
@@ -17,6 +17,7 @@
import (
"encoding/json"
+ "github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/protos/voltha"
)
@@ -47,11 +48,11 @@
//func (bus *EventBus) Advertise(eventType CallbackType, data interface{}, hash string) {
func (bus *EventBus) Advertise(args ...interface{}) interface{} {
eventType := args[0].(CallbackType)
- data := args[1]
- hash := args[2].(string)
+ hash := args[1].(string)
+ data := args[2:]
if _, ok := IGNORED_CALLBACKS[eventType]; ok {
- log.Debugf("ignoring event - type:%s, data:%+v\n", eventType, data)
+ log.Debugf("ignoring event - type:%s, data:%+v", eventType, data)
}
var kind voltha.ConfigEventType_ConfigEventType
switch eventType {
@@ -66,11 +67,15 @@
var msg []byte
var err error
if IsProtoMessage(data) {
- if msg, err = json.Marshal(data); err != nil {
- log.Errorf("problem marshalling data: %+v, err:%s\n", data, err.Error())
+ if msg, err = proto.Marshal(data[0].(proto.Message)); err != nil {
+ log.Errorf("problem marshalling proto data: %+v, err:%s", data[0], err.Error())
+ }
+ } else if data[0] != nil {
+ if msg, err = json.Marshal(data[0]); err != nil {
+ log.Errorf("problem marshalling json data: %+v, err:%s", data[0], err.Error())
}
} else {
- msg = data.([]byte)
+ log.Errorf("no data to advertise : %+v", data[0])
}
event := voltha.ConfigEvent{
diff --git a/db/model/model.go b/db/model/model.go
index 2b95280..1312a41 100644
--- a/db/model/model.go
+++ b/db/model/model.go
@@ -20,6 +20,6 @@
)
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "DB_MODEL"})
+ log.AddPackage(log.JSON, log.DebugLevel, log.Fields{"instanceId": "DB_MODEL"})
defer log.CleanUp()
}
diff --git a/db/model/node.go b/db/model/node.go
index 75f6915..4a92208 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -48,10 +48,14 @@
FoldTxBranch(txid string)
GetProxy(path string, exclusive bool) *Proxy
+
+ ExecuteCallbacks()
+ AddCallback(callback CallbackFunction, args ...interface{})
+ AddNotificationCallback(callback CallbackFunction, args ...interface{})
}
type node struct {
- root *root
+ Root *root
Type interface{}
Branches map[string]*Branch
Tags map[string]Revision
@@ -69,7 +73,7 @@
func NewNode(root *root, initialData interface{}, autoPrune bool, txid string) *node {
n := &node{}
- n.root = root
+ n.Root = root
n.Branches = make(map[string]*Branch)
n.Tags = make(map[string]Revision)
n.Proxy = nil
@@ -91,11 +95,15 @@
}
func (n *node) MakeNode(data interface{}, txid string) *node {
- return NewNode(n.root, data, true, txid)
+ return NewNode(n.Root, data, true, txid)
}
func (n *node) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
- return n.root.MakeRevision(branch, data, children)
+ if n.Root.RevisionClass.(reflect.Type) == reflect.TypeOf(PersistedRevision{}) {
+ return NewPersistedRevision(branch, data, children)
+ }
+
+ return NewNonPersistedRevision(branch, data, children)
}
func (n *node) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
@@ -113,17 +121,30 @@
if changeAnnouncement != nil && branch.Txid == "" {
if n.Proxy != nil {
for _, change := range changeAnnouncement {
- log.Debugf("invoking callback - changeType: %+v, previous:%+v, latest: %+v\n", change.Type,
- change.PreviousData, change.LatestData)
- n.root.AddCallback(n.Proxy.InvokeCallbacks, change.Type, change.PreviousData, change.LatestData, true)
+ log.Debugf("invoking callback - changeType: %+v, previous:%+v, latest: %+v",
+ change.Type,
+ change.PreviousData,
+ change.LatestData)
+ n.Root.AddCallback(
+ n.Proxy.InvokeCallbacks,
+ change.Type,
+ true,
+ change.PreviousData,
+ change.LatestData)
}
}
for _, change := range changeAnnouncement {
- log.Debugf("sending notification - changeType: %+v, previous:%+v, latest: %+v\n", change.Type,
+ log.Debugf("sending notification - changeType: %+v, previous:%+v, latest: %+v",
+ change.Type,
change.PreviousData,
change.LatestData)
- n.root.AddNotificationCallback(n.makeEventBus().Advertise, change.Type, change.PreviousData, change.LatestData, revision.GetHash())
+ n.Root.AddNotificationCallback(
+ n.makeEventBus().Advertise,
+ change.Type,
+ revision.GetHash(),
+ change.PreviousData,
+ change.LatestData)
}
}
}
@@ -205,7 +226,7 @@
fieldValue := dataValue.Elem().FieldByName(dataStruct.Name)
- log.Debugf("fieldValue: %+v, type: %+v, value: %+v", fieldValue.Interface(), fieldValue.Type(), value)
+ //log.Debugf("fieldValue: %+v, type: %+v, value: %+v", fieldValue.Interface(), fieldValue.Type(), value)
a := fmt.Sprintf("%s", fieldValue.Interface())
b := fmt.Sprintf("%s", value)
if a == b {
@@ -313,7 +334,7 @@
if n.Proxy != nil {
log.Debug("invoking proxy GET Callbacks")
- msg = n.Proxy.InvokeCallbacks(GET, msg, false)
+ msg = n.Proxy.InvokeCallbacks(GET, false, msg)
}
return msg
@@ -359,7 +380,7 @@
if field.IsContainer {
if path == "" {
- log.Errorf("cannot update a list\n")
+ log.Errorf("cannot update a list")
} else if field.Key != "" {
partition := strings.SplitN(path, "/", 2)
key := partition[0]
@@ -376,7 +397,9 @@
}
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
childNode := childRev.GetNode()
+ childNode.Proxy = n.Proxy
newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+
if newChildRev.GetHash() == childRev.GetHash() {
if newChildRev != childRev {
log.Debug("clear-hash - %s %+v", newChildRev.GetHash(), newChildRev)
@@ -390,15 +413,15 @@
_newKeyType := fmt.Sprintf("%s", newKey)
_keyValueType := fmt.Sprintf("%s", keyValue)
if _newKeyType != _keyValueType {
- log.Errorf("cannot change key field\n")
+ log.Errorf("cannot change key field")
}
children[idx] = newChildRev
rev = rev.UpdateChildren(name, children, branch)
branch.Latest.Drop(txid, false)
- n.root.MakeLatest(branch, rev, nil)
+ n.MakeLatest(branch, rev, nil)
return rev
} else {
- log.Errorf("cannot index into container with no keys\n")
+ log.Errorf("cannot index into container with no keys")
}
} else {
childRev := rev.GetChildren()[name][0]
@@ -406,7 +429,7 @@
newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
rev = rev.UpdateChildren(name, []Revision{newChildRev}, branch)
branch.Latest.Drop(txid, false)
- n.root.MakeLatest(branch, rev, nil)
+ n.MakeLatest(branch, rev, nil)
return rev
}
return nil
@@ -428,7 +451,7 @@
if n.Proxy != nil {
log.Debug("invoking proxy PRE_UPDATE Callbacks")
- n.Proxy.InvokeCallbacks(PRE_UPDATE, data, false)
+ n.Proxy.InvokeCallbacks(PRE_UPDATE, false, branch.Latest.GetData(), data)
}
if !reflect.DeepEqual(branch.Latest.GetData(), data) {
if strict {
@@ -438,7 +461,7 @@
rev := branch.Latest.UpdateData(data, branch)
changes := []ChangeTuple{{POST_UPDATE, branch.Latest.GetData(), rev.GetData()}}
branch.Latest.Drop(branch.Txid, true)
- n.root.MakeLatest(branch, rev, changes)
+ n.MakeLatest(branch, rev, changes)
return rev
} else {
return branch.Latest
@@ -454,7 +477,7 @@
}
if path == "" {
// TODO raise error
- log.Errorf("cannot add for non-container mode\n")
+ log.Errorf("cannot add for non-container mode")
}
var branch *Branch
@@ -484,7 +507,7 @@
if field.Key != "" {
if n.Proxy != nil {
log.Debug("invoking proxy PRE_ADD Callbacks")
- n.Proxy.InvokeCallbacks(PRE_ADD, data, false)
+ n.Proxy.InvokeCallbacks(PRE_ADD, false, data)
}
for _, v := range rev.GetChildren()[name] {
@@ -492,20 +515,21 @@
children = append(children, revCopy)
}
_, key := GetAttributeValue(data, field.Key, 0)
- if _, rev := n.findRevByKey(children, field.Key, key.String()); rev != nil {
+ if _, exists := n.findRevByKey(children, field.Key, key.String()); exists != nil {
// TODO raise error
log.Errorf("duplicate key found: %s", key.String())
+ } else {
+ childRev := n.MakeNode(data, txid).Latest(txid)
+ children = append(children, childRev)
+ rev := rev.UpdateChildren(name, children, branch)
+ changes := []ChangeTuple{{POST_ADD, nil, rev.GetData()}}
+ branch.Latest.Drop(txid, false)
+ n.MakeLatest(branch, rev, changes)
+ return rev
}
- childRev := n.MakeNode(data, txid).Latest(txid)
- children = append(children, childRev)
- rev := rev.UpdateChildren(name, children, branch)
- changes := []ChangeTuple{{POST_ADD, branch.Latest.GetData(), rev.GetData()}}
- branch.Latest.Drop(txid, false)
- n.root.MakeLatest(branch, rev, changes)
- return rev
} else {
- log.Errorf("cannot add to non-keyed container\n")
+ log.Errorf("cannot add to non-keyed container")
}
} else if field.Key != "" {
partition := strings.SplitN(path, "/", 2)
@@ -523,13 +547,13 @@
children[idx] = newChildRev
rev := rev.UpdateChildren(name, children, branch)
branch.Latest.Drop(txid, false)
- n.root.MakeLatest(branch, rev, nil)
+ n.MakeLatest(branch, rev, nil)
return rev
} else {
- log.Errorf("cannot add to non-keyed container\n")
+ log.Errorf("cannot add to non-keyed container")
}
} else {
- log.Errorf("cannot add to non-container field\n")
+ log.Errorf("cannot add to non-container field")
}
return nil
}
@@ -543,7 +567,7 @@
}
if path == "" {
// TODO raise error
- log.Errorf("cannot remove for non-container mode\n")
+ log.Errorf("cannot remove for non-container mode")
}
var branch *Branch
var ok bool
@@ -569,7 +593,7 @@
if field.IsContainer {
if path == "" {
- log.Errorf("cannot remove without a key\n")
+ log.Errorf("cannot remove without a key")
} else if field.Key != "" {
partition := strings.SplitN(path, "/", 2)
key := partition[0]
@@ -590,7 +614,7 @@
children[idx] = newChildRev
rev := rev.UpdateChildren(name, children, branch)
branch.Latest.Drop(txid, false)
- n.root.MakeLatest(branch, rev, nil)
+ n.MakeLatest(branch, rev, nil)
return rev
} else {
for _, v := range rev.GetChildren()[name] {
@@ -600,7 +624,7 @@
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
if n.Proxy != nil {
data := childRev.GetData()
- n.Proxy.InvokeCallbacks(PRE_REMOVE, data, false)
+ n.Proxy.InvokeCallbacks(PRE_REMOVE, false, data)
postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, data, nil})
} else {
postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, childRev.GetData(), nil})
@@ -609,14 +633,14 @@
children = append(children[:idx], children[idx+1:]...)
rev := rev.UpdateChildren(name, children, branch)
branch.Latest.Drop(txid, false)
- n.root.MakeLatest(branch, rev, postAnnouncement)
+ n.MakeLatest(branch, rev, postAnnouncement)
return rev
}
} else {
- log.Errorf("cannot add to non-keyed container\n")
+ log.Errorf("cannot add to non-keyed container")
}
} else {
- log.Errorf("cannot add to non-container field\n")
+ log.Errorf("cannot add to non-container field")
}
return nil
@@ -661,7 +685,7 @@
rev, changes := Merge3Way(forkRev, srcRev, dstRev, n.mergeChild(txid, dryRun), dryRun)
if !dryRun {
- n.root.MakeLatest(dstBranch, rev, changes)
+ n.MakeLatest(dstBranch, rev, changes)
delete(n.Branches, txid)
}
@@ -707,31 +731,34 @@
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ node Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
func (n *node) GetProxy(path string, exclusive bool) *Proxy {
- r := NewRoot(n.Type, n.root.KvStore)
- r.node = *n
- r.KvStore = n.root.KvStore
+ //r := NewRoot(n.Type, n.KvStore)
+ //r.node = n
+ //r.KvStore = n.KvStore
- return n.getProxy(path, r, path, exclusive)
+ return n.getProxy(path, path, exclusive)
}
-func (n *node) getProxy(path string, root Root, fullPath string, exclusive bool) *Proxy {
+func (n *node) getProxy(path string, fullPath string, exclusive bool) *Proxy {
for strings.HasPrefix(path, "/") {
path = path[1:]
}
if path == "" {
- return n.makeProxy(root, path, exclusive)
+ return n.makeProxy(path, exclusive)
}
rev := n.Branches[NONE].Latest
partition := strings.SplitN(path, "/", 2)
name := partition[0]
- path = partition[1]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
field := ChildrenFields(n.Type)[name]
- if field.IsContainer {
+ if field != nil && field.IsContainer {
if path == "" {
log.Error("cannot proxy a container field")
- }
- if field.Key != "" {
+ } else if field.Key != "" {
partition := strings.SplitN(path, "/", 2)
key := partition[0]
if len(partition) < 2 {
@@ -740,29 +767,40 @@
path = partition[1]
}
keyValue := field.KeyFromStr(key)
- children := rev.GetChildren()[name]
+ var children []Revision
+ for _, v := range rev.GetChildren()[name] {
+ newV := reflect.ValueOf(v).Interface().(Revision)
+ children = append(children, newV)
+ }
_, childRev := n.findRevByKey(children, field.Key, keyValue)
childNode := childRev.GetNode()
- r := NewRoot(childNode.Type, n.root.KvStore)
- r.node = *childNode
- r.KvStore = childNode.root.KvStore
-
- return childNode.getProxy(path, r, fullPath, exclusive)
+ return childNode.getProxy(path, fullPath, exclusive)
+ } else {
+ log.Error("cannot index into container with no keys")
}
- log.Error("cannot index into container with no keys")
} else {
childRev := rev.GetChildren()[name][0]
childNode := childRev.GetNode()
- return childNode.getProxy(path, root, fullPath, exclusive)
+ return childNode.getProxy(path, fullPath, exclusive)
}
return nil
}
-func (n *node) makeProxy(root Root, fullPath string, exclusive bool) *Proxy {
+func (n *node) makeProxy(fullPath string, exclusive bool) *Proxy {
+ r := &root{
+ node: n,
+ Callbacks: n.Root.Callbacks,
+ NotificationCallbacks: n.Root.NotificationCallbacks,
+ DirtyNodes: n.Root.DirtyNodes,
+ KvStore: n.Root.KvStore,
+ Loading: n.Root.Loading,
+ RevisionClass: n.Root.RevisionClass,
+ }
+
if n.Proxy == nil {
- n.Proxy = NewProxy(root, n, fullPath, exclusive)
+ n.Proxy = NewProxy(r, fullPath, exclusive)
} else {
if n.Proxy.Exclusive {
log.Error("node is already owned exclusively")
@@ -780,20 +818,14 @@
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Persistence Loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-func (n *node) LoadLatest(kvStore *Backend, hash string) {
+func (n *node) LoadLatest(hash string) {
branch := NewBranch(n, "", nil, n.AutoPrune)
pr := &PersistedRevision{}
- rev := pr.Load(branch, kvStore, n.Type, hash)
+ rev := pr.Load(branch, n.Root.KvStore, n.Type, hash)
n.makeLatest(branch, rev, nil)
n.Branches[NONE] = branch
}
-func (n *node) MakeTxBranch() string {
- return n.root.MakeTxBranch()
-}
-func (n *node) FoldTxBranch(txid string) {
- n.root.FoldTxBranch(txid)
-}
-func (n *node) DeleteTxBranch(txid string) {
- n.root.DeleteTxBranch(txid)
+func (n *node) ExecuteCallbacks() {
+ n.Root.ExecuteCallbacks()
}
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 99166e5..29cadf7 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -19,7 +19,7 @@
"bytes"
"crypto/md5"
"fmt"
- "github.com/opencord/voltha-go/common/log"
+ "github.com/golang/protobuf/proto"
"reflect"
"sort"
)
@@ -156,25 +156,29 @@
}
}
} else {
- rev := npr.Children[fieldName][0]
- childData := rev.Get(depth - 1)
- foundEntry := false
- for i := 0; i < childDataHolder.Len(); i++ {
- if reflect.DeepEqual(childDataHolder.Index(i).Interface(), childData) {
- foundEntry = true
- break
+ if revs := npr.Children[fieldName]; revs != nil && len(revs) > 0 {
+ rev := npr.Children[fieldName][0]
+ if rev != nil {
+ childData := rev.Get(depth - 1)
+ if reflect.TypeOf(childData) == reflect.TypeOf(childDataHolder.Interface()) {
+ childDataHolder = reflect.ValueOf(childData)
+ }
}
}
- if !foundEntry {
- // avoid duplicates by adding if the child was not found in the holder
- childDataHolder = reflect.Append(childDataHolder, reflect.ValueOf(childData))
- }
}
// Merge child data with cloned object
reflect.ValueOf(data).Elem().FieldByName(childDataName).Set(childDataHolder)
}
}
- return data
+
+ result := data
+
+ if result != nil {
+ clone := proto.Clone(data.(proto.Message))
+ result = reflect.ValueOf(clone).Interface()
+ }
+
+ return result
}
func (npr *NonPersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
@@ -183,7 +187,6 @@
newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
newRev.SetBranch(branch)
- log.Debugf("newRev config : %+v, npr: %+v", newRev.GetConfig(), npr)
newRev.SetConfig(NewDataRevision(data))
newRev.Finalize()
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 774b77e..b62c569 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -34,7 +34,7 @@
func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
pr := &PersistedRevision{}
- pr.kvStore = branch.Node.root.KvStore
+ pr.kvStore = branch.Node.Root.KvStore
pr.Revision = NewNonPersistedRevision(branch, data, children)
pr.Finalize()
return pr
@@ -64,7 +64,9 @@
for fieldName, children := range pr.GetChildren() {
hashes := []string{}
for _, rev := range children {
- hashes = append(hashes, rev.GetHash())
+ if rev != nil {
+ hashes = append(hashes, rev.GetHash())
+ }
}
childrenHashes[fieldName] = hashes
}
@@ -119,7 +121,7 @@
var children []Revision
for _, childHash := range childrenHashes[fieldName] {
childNode := node.MakeNode(reflect.New(child.ClassType).Elem().Interface(), "")
- childNode.LoadLatest(kvStore, childHash)
+ childNode.LoadLatest(childHash)
childRev := childNode.Latest()
children = append(children, childRev)
}
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 0f5ddc7..decbf9b 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -48,18 +48,16 @@
}
type Proxy struct {
- Root Root
- Node Node
+ Root *root
Path string
Exclusive bool
Callbacks map[CallbackType]map[string]CallbackTuple
}
-func NewProxy(root Root, node Node, path string, exclusive bool) *Proxy {
+func NewProxy(root *root, path string, exclusive bool) *Proxy {
callbacks := make(map[CallbackType]map[string]CallbackTuple)
p := &Proxy{
Root: root,
- Node: node,
Exclusive: exclusive,
Path: path,
Callbacks: callbacks,
@@ -68,7 +66,7 @@
}
func (p *Proxy) Get(path string, depth int, deep bool, txid string) interface{} {
- return p.Node.Get(path, "", depth, deep, txid)
+ return p.Root.Get(path, "", depth, deep, txid)
}
func (p *Proxy) Update(path string, data interface{}, strict bool, txid string) interface{} {
@@ -82,6 +80,7 @@
} else {
fullPath = p.Path + path
}
+
return p.Root.Update(fullPath, data, strict, txid, nil)
}
@@ -126,20 +125,19 @@
p.Root.DeleteTxBranch(txid)
}
-//type CallbackFunction func(context context.Context, args ...interface{})
type CallbackFunction func(args ...interface{}) interface{}
type CallbackTuple struct {
callback CallbackFunction
args []interface{}
}
-func (tuple *CallbackTuple) Execute(context interface{}) interface{} {
- newArgs := []interface{}{}
- if context != nil {
- newArgs = append(newArgs, context)
+func (tuple *CallbackTuple) Execute(contextArgs interface{}) interface{} {
+ args := []interface{}{}
+ args = append(args, tuple.args...)
+ if contextArgs != nil {
+ args = append(args, contextArgs)
}
- newArgs = append(newArgs, tuple.args...)
- return tuple.callback(newArgs...)
+ return tuple.callback(args...)
}
func (p *Proxy) RegisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
@@ -169,7 +167,7 @@
delete(p.Callbacks[callbackType], funcHash)
}
-func (p *Proxy) invoke(callback CallbackTuple, context interface{}) (result interface{}, err error) {
+func (p *Proxy) invoke(callback CallbackTuple, context ...interface{}) (result interface{}, err error) {
defer func() {
if r := recover(); r != nil {
errStr := fmt.Sprintf("callback error occurred: %+v", r)
@@ -183,17 +181,16 @@
return result, err
}
-//func (p *Proxy) InvokeCallbacks(callbackType CallbackType, context context.Context, proceedOnError bool) {
-func (p *Proxy) InvokeCallbacks(args ...interface{}) interface{} {
+func (p *Proxy) InvokeCallbacks(args ...interface{}) (result interface{}) {
callbackType := args[0].(CallbackType)
- context := args[1]
- proceedOnError := args[2].(bool)
+ proceedOnError := args[1].(bool)
+ context := args[2:]
var err error
if _, exists := p.Callbacks[callbackType]; exists {
for _, callback := range p.Callbacks[callbackType] {
- if context, err = p.invoke(callback, context); err != nil {
+ if result, err = p.invoke(callback, context); err != nil {
if !proceedOnError {
log.Info("An error occurred. Stopping callback invocation")
break
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index 45329fd..6226caa 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -21,6 +21,7 @@
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/protos/common"
+ "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
"reflect"
"strconv"
@@ -47,8 +48,41 @@
DbPort: 2379,
DbTimeout: 5,
}
+ ports = []*voltha.Port{
+ {
+ PortNo: 123,
+ Label: "test-port-0",
+ Type: voltha.Port_PON_OLT,
+ AdminState: common.AdminState_ENABLED,
+ OperStatus: common.OperStatus_ACTIVE,
+ DeviceId: "etcd_port-0-device-id",
+ Peers: []*voltha.Port_PeerPort{},
+ },
+ }
+
+ stats = &openflow_13.OfpFlowStats{
+ Id: 1111,
+ }
+ flows = &openflow_13.Flows{
+ Items: []*openflow_13.OfpFlowStats{stats},
+ }
+ device = &voltha.Device{
+ Id: devId,
+ Type: "simulated_olt",
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ Flows: flows,
+ Ports: ports,
+ }
devId string
targetDeviceId string
+
+ preAddExecuted = false
+ postAddExecuted = false
+ preUpdateExecuted = false
+ postUpdateExecuted = false
+ preRemoveExecuted = false
+ postRemoveExecuted = false
)
func init() {
@@ -68,7 +102,60 @@
pt.Proxy = pt.Root.GetProxy("/", false)
}
-func Test_Proxy_1_GetDevices(t *testing.T) {
+func commonCallback(args ...interface{}) interface{} {
+ log.Infof("Running common callback - arg count: %s", len(args))
+
+ for i := 0; i < len(args); i++ {
+ log.Infof("ARG %d : %+v", i, args[i])
+ }
+ execStatus := args[1].(*bool)
+
+ // Inform the caller that the callback was executed
+ *execStatus = true
+
+ return nil
+}
+
+func Test_Proxy_1_1_Add_NewDevice(t *testing.T) {
+ devIdBin, _ := uuid.New().MarshalBinary()
+ devId = "0001" + hex.EncodeToString(devIdBin)[:12]
+
+ pt.Proxy.RegisterCallback(PRE_ADD, commonCallback, "PRE_ADD instructions", &preAddExecuted)
+ pt.Proxy.RegisterCallback(POST_ADD, commonCallback, "POST_ADD instructions", &postAddExecuted)
+
+ device.Id = devId
+ if added := pt.Proxy.Add("/devices", device, ""); added == nil {
+ t.Error("Failed to add device")
+ } else {
+ t.Logf("Added device : %+v", added)
+ }
+
+ if d := pt.Proxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find added device")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found device: %s", string(djson))
+ }
+
+ if !preAddExecuted {
+ t.Error("PRE_ADD callback was not executed")
+ }
+ if !postAddExecuted {
+ t.Error("POST_ADD callback was not executed")
+ }
+}
+
+func Test_Proxy_1_2_Add_ExistingDevice(t *testing.T) {
+ device.Id = devId
+ if added := pt.Proxy.Add("/devices", device, ""); added == nil {
+ t.Logf("Successfully detected that the device already exists: %s", devId)
+ } else {
+ t.Errorf("A new device should not have been created : %+v", added)
+ }
+
+}
+
+func Test_Proxy_2_1_Get_AllDevices(t *testing.T) {
devices := pt.Proxy.Get("/devices", 1, false, "")
if len(devices.([]interface{})) == 0 {
@@ -76,64 +163,132 @@
} else {
// Save the target device id for later tests
targetDeviceId = devices.([]interface{})[0].(*voltha.Device).Id
- t.Logf("retrieved devices: %+v", devices)
+ t.Logf("retrieved all devices: %+v", devices)
}
}
-func Test_Proxy_2_AddDevice(t *testing.T) {
- devIdBin, _ := uuid.New().MarshalBinary()
- devId = "0001" + hex.EncodeToString(devIdBin)[:12]
-
- ports := []*voltha.Port{
- {
- PortNo: 123,
- Label: "test-port-0",
- Type: voltha.Port_PON_OLT,
- AdminState: common.AdminState_ENABLED,
- OperStatus: common.OperStatus_ACTIVE,
- DeviceId: "etcd_port-0-device-id",
- Peers: []*voltha.Port_PeerPort{},
- },
- }
-
- device := &voltha.Device{
- Id: devId,
- Type: "simulated_olt",
- Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
- AdminState: voltha.AdminState_PREPROVISIONED,
- Ports: ports,
- }
-
- if added := pt.Proxy.Add("/devices", device, ""); added == nil {
- t.Error("Failed to add device")
- } else {
- t.Logf("Added device : %+v", added)
- }
-}
-
-func Test_Proxy_3_GetDevice_PostAdd(t *testing.T) {
- if d := pt.Proxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
- t.Error("Failed to find added device")
+func Test_Proxy_2_2_Get_SingleDevice(t *testing.T) {
+ if d := pt.Proxy.Get("/devices/"+targetDeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Errorf("Failed to find device : %s", targetDeviceId)
} else {
djson, _ := json.Marshal(d)
+ t.Logf("Found device: %s", string(djson))
+ }
- t.Logf("Found device: count: %s", djson)
+}
+
+func Test_Proxy_3_1_Update_Device_WithRootProxy(t *testing.T) {
+ if retrieved := pt.Proxy.Get("/devices/"+targetDeviceId, 1, false, ""); retrieved == nil {
+ t.Error("Failed to get device")
+ } else {
+ var fwVersion int
+ if retrieved.(*voltha.Device).FirmwareVersion == "n/a" {
+ fwVersion = 0
+ } else {
+ fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
+ fwVersion += 1
+ }
+
+ preUpdateExecuted = false
+ postUpdateExecuted = false
+
+ pt.Proxy.RegisterCallback(
+ PRE_UPDATE,
+ commonCallback,
+ "PRE_UPDATE instructions (root proxy)", &preUpdateExecuted,
+ )
+ pt.Proxy.RegisterCallback(
+ POST_UPDATE,
+ commonCallback,
+ "POST_UPDATE instructions (root proxy)", &postUpdateExecuted,
+ )
+
+ //cloned := reflect.ValueOf(retrieved).Elem().Interface().(voltha.Device)
+ //cloned.FirmwareVersion = strconv.Itoa(fwVersion)
+ retrieved.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
+ //t.Logf("Before update : %+v", cloned)
+
+ if afterUpdate := pt.Proxy.Update("/devices/"+targetDeviceId, retrieved, false, ""); afterUpdate == nil {
+ t.Error("Failed to update device")
+ } else {
+ t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
+ }
+ if d := pt.Proxy.Get("/devices/"+targetDeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated device (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found device (root proxy): %s", string(djson))
+ }
+
+ if !preUpdateExecuted {
+ t.Error("PRE_UPDATE callback was not executed")
+ }
+ if !postUpdateExecuted {
+ t.Error("POST_UPDATE callback was not executed")
+ }
}
}
-func Test_Proxy_3_1_RegisterProxy(t *testing.T) {
+
+func Test_Proxy_3_2_Update_Flow_WithSubProxy(t *testing.T) {
// Get a device proxy and update a specific port
- devProxy := pt.Root.GetProxy("/devices/"+devId, false)
- port123 := devProxy.Get("/ports/123", 0, false, "")
- t.Logf("got ports: %+v", port123)
+ devflowsProxy := pt.Root.GetProxy("/devices/"+devId+"/flows", false)
+ flows := devflowsProxy.Get("/", 0, false, "")
+ //flows.([]interface{})[0].(*openflow_13.Flows).Items[0].TableId = 2222
+ flows.([]interface{})[0].(*openflow_13.Flows).Items[0].TableId = 2244
+ //flows.(*openflow_13.Flows).Items[0].TableId = 2244
+ t.Logf("before updated flows: %+v", flows)
- devProxy.RegisterCallback(POST_UPDATE, deviceCallback, nil)
+ //devPortsProxy := pt.Root.node.GetProxy("/devices/"+devId+"/ports", false)
+ //port123 := devPortsProxy.Get("/123", 0, false, "")
+ //t.Logf("got ports: %+v", port123)
+ //port123.(*voltha.Port).OperStatus = common.OperStatus_DISCOVERED
- port123.(*voltha.Port).OperStatus = common.OperStatus_DISCOVERED
+ preUpdateExecuted = false
+ postUpdateExecuted = false
- devProxy.Update("/ports/123", port123, false, "")
- updated := devProxy.Get("/ports", 0, false, "")
- t.Logf("got updated ports: %+v", updated)
+ devflowsProxy.RegisterCallback(
+ PRE_UPDATE,
+ commonCallback,
+ "PRE_UPDATE instructions (flows proxy)", &preUpdateExecuted,
+ )
+ devflowsProxy.RegisterCallback(
+ POST_UPDATE,
+ commonCallback,
+ "POST_UPDATE instructions (flows proxy)", &postUpdateExecuted,
+ )
+ kvFlows := devflowsProxy.Get("/", 0, false, "")
+
+ if reflect.DeepEqual(flows, kvFlows) {
+ t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
+ }
+
+ if updated := devflowsProxy.Update("/", flows.([]interface{})[0], false, ""); updated == nil {
+ t.Error("Failed to update flow")
+ } else {
+ t.Logf("Updated flows : %+v", updated)
+ }
+
+ if d := devflowsProxy.Get("/", 0, false, ""); d == nil {
+ t.Error("Failed to find updated flows (flows proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found flows (flows proxy): %s", string(djson))
+ }
+
+ if d := pt.Proxy.Get("/devices/"+devId+"/flows", 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated flows (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found flows (root proxy): %s", string(djson))
+ }
+
+ if !preUpdateExecuted {
+ t.Error("PRE_UPDATE callback was not executed")
+ }
+ if !postUpdateExecuted {
+ t.Error("POST_UPDATE callback was not executed")
+ }
//
// Get a device proxy and update all its ports
//
@@ -141,7 +296,7 @@
//devProxy := pt.Root.GetProxy("/devices/"+devId, false)
//ports := devProxy.Get("/ports", 0, false, "")
//t.Logf("got ports: %+v", ports)
- //devProxy.RegisterCallback(POST_UPDATE, deviceCallback, nil)
+ //devProxy.RegisterCallback(POST_UPDATE, commonCallback, nil)
//
//ports.([]interface{})[0].(*voltha.Port).OperStatus = common.OperStatus_DISCOVERED
//
@@ -156,7 +311,7 @@
//devProxy := pt.Root.GetProxy("/devices/"+devId, false)
//ports := devProxy.Get("/ports", 0, false, "")
//t.Logf("got ports: %+v", ports)
- //devProxy.RegisterCallback(POST_UPDATE, deviceCallback, nil)
+ //devProxy.RegisterCallback(POST_UPDATE, commonCallback, nil)
//
//ports.([]interface{})[0].(*voltha.Port).OperStatus = common.OperStatus_DISCOVERED
//
@@ -165,61 +320,39 @@
//t.Logf("got updated ports: %+v", updated)
}
-func Test_Proxy_3_2_GetDevice_PostRegister(t *testing.T) {
- if d := pt.Proxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
- t.Error("Failed to find updated registered device")
- } else {
- djson, _ := json.Marshal(d)
+func Test_Proxy_4_1_Remove_Device(t *testing.T) {
+ preRemoveExecuted = false
+ postRemoveExecuted = false
- t.Logf("Found device: count: %s", djson)
- }
-}
+ pt.Proxy.RegisterCallback(
+ PRE_REMOVE,
+ commonCallback,
+ "PRE_REMOVE instructions (root proxy)", &preRemoveExecuted,
+ )
+ pt.Proxy.RegisterCallback(
+ POST_REMOVE,
+ commonCallback,
+ "POST_REMOVE instructions (root proxy)", &postRemoveExecuted,
+ )
-func Test_Proxy_4_UpdateDevice(t *testing.T) {
- if retrieved := pt.Proxy.Get("/devices/"+targetDeviceId, 1, false, ""); retrieved == nil {
- t.Error("Failed to get device")
- } else {
- var fwVersion int
- if retrieved.(*voltha.Device).FirmwareVersion == "n/a" {
- fwVersion = 0
- } else {
- fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
- fwVersion += 1
- }
-
- cloned := reflect.ValueOf(retrieved).Elem().Interface().(voltha.Device)
- cloned.FirmwareVersion = strconv.Itoa(fwVersion)
- t.Logf("Before update : %+v", cloned)
-
- if afterUpdate := pt.Proxy.Update("/devices/"+targetDeviceId, &cloned, false, ""); afterUpdate == nil {
- t.Error("Failed to update device")
- } else {
- t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
- }
- }
-}
-
-func Test_Proxy_5_GetDevice_PostUpdate(t *testing.T) {
- device := pt.Proxy.Get("/devices/"+targetDeviceId, 0, false, "")
-
- t.Logf("content of updated device: %+v", device)
-}
-
-func Test_Proxy_6_RemoveDevice(t *testing.T) {
if removed := pt.Proxy.Remove("/devices/"+devId, ""); removed == nil {
t.Error("Failed to remove device")
} else {
t.Logf("Removed device : %+v", removed)
}
-}
-
-func Test_Proxy_7_GetDevice_PostRemove(t *testing.T) {
if d := pt.Proxy.Get("/devices/"+devId, 0, false, ""); reflect.ValueOf(d).IsValid() {
djson, _ := json.Marshal(d)
t.Errorf("Device was not removed - %s", djson)
} else {
t.Logf("Device was removed: %s", devId)
}
+
+ if !preRemoveExecuted {
+ t.Error("PRE_UPDATE callback was not executed")
+ }
+ if !postRemoveExecuted {
+ t.Error("POST_UPDATE callback was not executed")
+ }
}
// -----------------------------
@@ -232,10 +365,6 @@
log.Infof("Running first callback - name: %s, id: %s\n", name, id)
return nil
}
-func deviceCallback(args ...interface{}) interface{} {
- log.Infof("Running device callback\n")
- return nil
-}
func secondCallback(args ...interface{}) interface{} {
name := args[0].(map[string]string)
id := args[1]
@@ -263,10 +392,11 @@
}
func Test_Proxy_Callbacks_2_Invoke_WithNoInterruption(t *testing.T) {
- pt.Proxy.InvokeCallbacks(PRE_ADD, nil, true)
+ pt.Proxy.InvokeCallbacks(PRE_ADD, false, nil)
}
+
func Test_Proxy_Callbacks_3_Invoke_WithInterruption(t *testing.T) {
- pt.Proxy.InvokeCallbacks(PRE_ADD, nil, false)
+ pt.Proxy.InvokeCallbacks(PRE_ADD, true, nil)
}
func Test_Proxy_Callbacks_4_Unregister(t *testing.T) {
@@ -274,3 +404,11 @@
pt.Proxy.UnregisterCallback(PRE_ADD, secondCallback)
pt.Proxy.UnregisterCallback(PRE_ADD, thirdCallback)
}
+
+//func Test_Proxy_Callbacks_5_Add(t *testing.T) {
+// pt.Proxy.Root.AddCallback(pt.Proxy.InvokeCallbacks, POST_UPDATE, false, "some data", "some new data")
+//}
+//
+//func Test_Proxy_Callbacks_6_Execute(t *testing.T) {
+// pt.Proxy.Root.ExecuteCallbacks()
+//}
diff --git a/db/model/root.go b/db/model/root.go
index 27e4ec4..45eafb9 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -29,18 +29,20 @@
}
type root struct {
- node
+ *node
+
+ Callbacks []CallbackTuple
+ NotificationCallbacks []CallbackTuple
DirtyNodes map[string][]*node
KvStore *Backend
Loading bool
RevisionClass interface{}
- Callbacks []CallbackTuple
- NotificationCallbacks []CallbackTuple
}
func NewRoot(initialData interface{}, kvStore *Backend) *root {
root := &root{}
+
root.KvStore = kvStore
root.DirtyNodes = make(map[string][]*node)
root.Loading = false
@@ -53,23 +55,15 @@
root.Callbacks = []CallbackTuple{}
root.NotificationCallbacks = []CallbackTuple{}
- root.node = *NewNode(root, initialData, false, "")
+ root.node = NewNode(root, initialData,false, "")
return root
}
-func (r *root) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
- if r.RevisionClass.(reflect.Type) == reflect.TypeOf(PersistedRevision{}) {
- return NewPersistedRevision(branch, data, children)
- }
-
- return NewNonPersistedRevision(branch, data, children)
-}
-
func (r *root) MakeTxBranch() string {
txid_bin, _ := uuid.New().MarshalBinary()
txid := hex.EncodeToString(txid_bin)[:12]
- r.DirtyNodes[txid] = []*node{&r.node}
+ r.DirtyNodes[txid] = []*node{r.node}
r.node.MakeBranch(txid)
return txid
}
@@ -135,7 +129,7 @@
result = r.node.Update(path, data, strict, "", nil)
}
- r.ExecuteCallbacks()
+ r.node.ExecuteCallbacks()
return result
}
@@ -161,7 +155,7 @@
result = r.node.Add(path, data, "", nil)
}
- r.ExecuteCallbacks()
+ r.node.ExecuteCallbacks()
return result
}
@@ -187,7 +181,7 @@
result = r.node.Remove(path, "", nil)
}
- r.ExecuteCallbacks()
+ r.node.ExecuteCallbacks()
return result
}
@@ -227,10 +221,6 @@
}
}
-func (r *root) LoadLatest(hash string) {
- r.node.LoadLatest(r.KvStore, hash)
-}
-
type rootData struct {
Latest string `json:latest`
Tags map[string]string `json:tags`
@@ -249,10 +239,10 @@
stop := time.Now()
GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
for tag, hash := range data.Tags {
- r.node.LoadLatest(r.KvStore, hash)
- r.node.Tags[tag] = r.node.Latest()
+ r.LoadLatest(hash)
+ r.Tags[tag] = r.Latest()
}
- r.node.LoadLatest(r.KvStore, data.Latest)
+ r.LoadLatest(data.Latest)
r.Loading = false
}
diff --git a/db/model/utils.go b/db/model/utils.go
index 1966c5c..1a460ed 100644
--- a/db/model/utils.go
+++ b/db/model/utils.go
@@ -23,7 +23,7 @@
)
func IsProtoMessage(object interface{}) bool {
- var ok bool
+ var ok = false
if object != nil {
st := reflect.TypeOf(object)
@@ -33,17 +33,29 @@
}
func FindOwnerType(obj reflect.Value, name string, depth int, found bool) reflect.Type {
+ prefix := ""
+ for d:=0; d< depth; d++ {
+ prefix += ">>"
+ }
k := obj.Kind()
switch k {
case reflect.Ptr:
+ if found {
+ return obj.Type()
+ }
+
t := obj.Type().Elem()
n := reflect.New(t)
- if rc := FindOwnerType(n.Elem(), name, depth+1, false); rc != nil {
+ if rc := FindOwnerType(n.Elem(), name, depth+1, found); rc != nil {
return rc
}
case reflect.Struct:
+ if found {
+ return obj.Type()
+ }
+
for i := 0; i < obj.NumField(); i += 1 {
v := reflect.Indirect(obj)
@@ -53,7 +65,7 @@
return FindOwnerType(obj.Field(i), name, depth+1, true)
}
- if rc := FindOwnerType(obj.Field(i), name, depth+1, false); rc != nil {
+ if rc := FindOwnerType(obj.Field(i), name, depth+1, found); rc != nil {
return rc
}
}
@@ -73,7 +85,7 @@
return obj.Index(i).Type()
}
- if rc := FindOwnerType(obj.Index(i), name, depth+1, false); rc != nil {
+ if rc := FindOwnerType(obj.Index(i), name, depth+1, found); rc != nil {
return rc
}
}