VOL-1175: Added proxy CRUD for new data model
Change-Id: Ie218a2567746d87a951f23aa6b774b2f01541cf9
diff --git a/db/model/backend.go b/db/model/backend.go
index 396e233..340dd26 100644
--- a/db/model/backend.go
+++ b/db/model/backend.go
@@ -20,6 +20,8 @@
"fmt"
"github.com/opencord/voltha-go/db/kvstore"
"strconv"
+ "time"
+ "github.com/opencord/voltha-go/common/log"
)
//TODO: missing cache stuff
@@ -48,7 +50,7 @@
address := host + ":" + strconv.Itoa(port)
if b.Client, err = b.newClient(address, timeout); err != nil {
- fmt.Errorf("failed to create a new kv Client - %s", err.Error())
+ log.Errorf("failed to create a new kv Client - %s", err.Error())
}
return b
@@ -65,12 +67,22 @@
}
func (b *Backend) makePath(key string) string {
- return fmt.Sprintf("%s/%s", b.PathPrefix, key)
+ path := fmt.Sprintf("%s/%s", b.PathPrefix, key)
+ log.Debugf("formatting path: %s", path)
+ return path
+}
+func (b *Backend) List(key string) (map[string]*kvstore.KVPair, error) {
+ return b.Client.List(b.makePath(key), b.Timeout)
}
func (b *Backend) Get(key string) (*kvstore.KVPair, error) {
- return b.Client.Get(b.makePath(key), b.Timeout)
+ start := time.Now()
+ err, pair := b.Client.Get(b.makePath(key), b.Timeout)
+ stop := time.Now()
+ GetProfiling().AddToDatabaseRetrieveTime(stop.Sub(start).Seconds())
+ return err, pair
}
func (b *Backend) Put(key string, value interface{}) error {
+ log.Debugf("Put key: %s, value: %+v, path: %s", key, string(value.([]byte)), b.makePath(key))
return b.Client.Put(b.makePath(key), value, b.Timeout)
}
func (b *Backend) Delete(key string) error {
diff --git a/db/model/branch.go b/db/model/branch.go
index 44fe230..b59f727 100644
--- a/db/model/branch.go
+++ b/db/model/branch.go
@@ -19,31 +19,31 @@
// TODO: missing proper logging
type Branch struct {
- node *Node
+ Node *Node
Txid string
- origin *Revision
- revisions map[string]*Revision
- Latest *Revision
+ Origin Revision
+ Revisions map[string]Revision
+ Latest Revision
}
-func NewBranch(node *Node, txid string, origin *Revision, autoPrune bool) *Branch {
+func NewBranch(node *Node, txid string, origin Revision, autoPrune bool) *Branch {
cb := &Branch{}
- cb.node = node
+ cb.Node = node
cb.Txid = txid
- cb.origin = origin
- cb.revisions = make(map[string]*Revision)
+ cb.Origin = origin
+ cb.Revisions = make(map[string]Revision)
cb.Latest = origin
return cb
}
// TODO: Check if the following are required
-func (cb *Branch) get(hash string) *Revision {
- return cb.revisions[hash]
+func (cb *Branch) get(hash string) Revision {
+ return cb.Revisions[hash]
}
-func (cb *Branch) GetLatest() *Revision {
+func (cb *Branch) GetLatest() Revision {
return cb.Latest
}
-func (cb *Branch) GetOrigin() *Revision {
- return cb.origin
+func (cb *Branch) GetOrigin() Revision {
+ return cb.Origin
}
diff --git a/db/model/branch_test.go b/db/model/branch_test.go
index 719bd82..456ec79 100644
--- a/db/model/branch_test.go
+++ b/db/model/branch_test.go
@@ -29,31 +29,31 @@
func Test_ConfigBranch_New(t *testing.T) {
node := &Node{}
hash := fmt.Sprintf("%x", md5.Sum([]byte("origin_hash")))
- origin := &Revision{
+ origin := &NonPersistedRevision{
Config: &DataRevision{},
- Children: make(map[string][]*Revision),
+ Children: make(map[string][]Revision),
Hash: hash,
- branch: &Branch{},
+ Branch: &Branch{},
WeakRef: "need to fix this",
}
txid := fmt.Sprintf("%x", md5.Sum([]byte("branch_transaction_id")))
BRANCH = NewBranch(node, txid, origin, true)
- t.Logf("New branch created: %+v\n", BRANCH)
+ t.Logf("New Branch created: %+v\n", BRANCH)
}
func Test_ConfigBranch_AddRevision(t *testing.T) {
HASH = fmt.Sprintf("%x", md5.Sum([]byte("revision_hash")))
- rev := &Revision{
+ rev := &NonPersistedRevision{
Config: &DataRevision{},
- Children: make(map[string][]*Revision),
+ Children: make(map[string][]Revision),
Hash: HASH,
- branch: &Branch{},
+ Branch: &Branch{},
WeakRef: "need to fix this",
}
- BRANCH.revisions[HASH] = rev
+ BRANCH.Revisions[HASH] = rev
t.Logf("Added revision: %+v\n", rev)
}
@@ -66,6 +66,6 @@
t.Logf("Got GetLatest revision:%+v\n", rev)
}
func Test_ConfigBranch_OriginRevision(t *testing.T) {
- rev := BRANCH.origin
- t.Logf("Got origin revision:%+v\n", rev)
+ rev := BRANCH.Origin
+ t.Logf("Got Origin revision:%+v\n", rev)
}
diff --git a/db/model/child_type.go b/db/model/child_type.go
index 0020e7d..13b0245 100644
--- a/db/model/child_type.go
+++ b/db/model/child_type.go
@@ -20,7 +20,7 @@
desc "github.com/golang/protobuf/descriptor"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
- "github.com/opencord/voltha/protos/go/common"
+ "github.com/opencord/voltha-go/protos/common"
"reflect"
"strconv"
"sync"
@@ -61,7 +61,9 @@
msgType := reflect.TypeOf(cls)
- if names, names_exist = GetInstance().ChildrenFieldsCache[msgType.String()]; !names_exist {
+ inst := GetInstance()
+
+ if names, names_exist = inst.ChildrenFieldsCache[msgType.String()]; !names_exist {
names = make(map[string]*ChildType)
_, md := desc.ForMessage(cls.(desc.Message))
@@ -75,7 +77,7 @@
var keyFromStr func(string) interface{}
if meta.(*common.ChildNode).GetKey() == "" {
- fmt.Println("Child key is empty ... moving on")
+ //fmt.Println("Child key is empty ... moving on")
} else {
parentType := FindOwnerType(reflect.ValueOf(cls), field.GetName(), 0, false)
keyType := FindKeyOwner(reflect.New(parentType).Elem().Interface(), meta.(*common.ChildNode).GetKey(), 0)
diff --git a/db/model/child_type_test.go b/db/model/child_type_test.go
index f60462c..2f75eba 100644
--- a/db/model/child_type_test.go
+++ b/db/model/child_type_test.go
@@ -17,7 +17,7 @@
import (
"fmt"
- "github.com/opencord/voltha/protos/go/voltha"
+ "github.com/opencord/voltha-go/protos/voltha"
"reflect"
"testing"
)
diff --git a/db/model/event_bus.go b/db/model/event_bus.go
index 6d95a4d..c20eac3 100644
--- a/db/model/event_bus.go
+++ b/db/model/event_bus.go
@@ -18,7 +18,7 @@
import (
"encoding/json"
"fmt"
- "github.com/opencord/voltha/protos/go/voltha"
+ "github.com/opencord/voltha-go/protos/voltha"
)
type EventBus struct {
diff --git a/db/model/event_bus_client.go b/db/model/event_bus_client.go
index 6d3ce2d..1ca1d78 100644
--- a/db/model/event_bus_client.go
+++ b/db/model/event_bus_client.go
@@ -17,7 +17,7 @@
import (
"fmt"
- "github.com/opencord/voltha/protos/go/voltha"
+ "github.com/opencord/voltha-go/protos/voltha"
)
type EventBusClient struct {
diff --git a/db/model/merge.go b/db/model/merge.go
new file mode 100644
index 0000000..8ff89c4
--- /dev/null
+++ b/db/model/merge.go
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/opencord/voltha-go/common/log"
+ "reflect"
+)
+
+func revisionsAreEqual(a, b []Revision) bool {
+ // If one is nil, the other must also be nil.
+ if (a == nil) != (b == nil) {
+ return false
+ }
+
+ if len(a) != len(b) {
+ return false
+ }
+
+ for i := range a {
+ if a[i] != b[i] {
+ return false
+ }
+ }
+
+ return true
+}
+
+type changeAnalysis struct {
+ KeyMap1 map[reflect.Value]int
+ KeyMap2 map[reflect.Value]int
+ AddedKeys map[reflect.Value]struct{}
+ RemovedKeys map[reflect.Value]struct{}
+ ChangedKeys map[reflect.Value]struct{}
+}
+
+func newChangeAnalysis(lst1, lst2 []Revision, keyName string) *changeAnalysis {
+ changes := &changeAnalysis{}
+
+ changes.KeyMap1 = make(map[reflect.Value]int)
+ changes.KeyMap2 = make(map[reflect.Value]int)
+
+ changes.AddedKeys = make(map[reflect.Value]struct{})
+ changes.RemovedKeys = make(map[reflect.Value]struct{})
+ changes.ChangedKeys = make(map[reflect.Value]struct{})
+
+ for i, rev := range lst1 {
+ _, v := GetAttributeValue(rev.GetData(), keyName, 0)
+ changes.KeyMap1[v] = i
+ }
+ for i, rev := range lst2 {
+ _, v := GetAttributeValue(rev.GetData(), keyName, 0)
+ changes.KeyMap2[v] = i
+ }
+ for v, _ := range changes.KeyMap2 {
+ if _, ok := changes.KeyMap1[v]; !ok {
+ changes.AddedKeys[v] = struct{}{}
+ }
+ }
+ for v, _ := range changes.KeyMap1 {
+ if _, ok := changes.KeyMap2[v]; !ok {
+ changes.RemovedKeys[v] = struct{}{}
+ }
+ }
+ for v, _ := range changes.KeyMap1 {
+ if _, ok := changes.KeyMap2[v]; ok && lst1[changes.KeyMap1[v]].GetHash() != lst1[changes.KeyMap2[v]].GetHash() {
+ changes.ChangedKeys[v] = struct{}{}
+ }
+ }
+
+ return changes
+}
+
+func Merge3Way(
+ forkRev, srcRev, dstRev Revision,
+ mergeChildFunc func(Revision) Revision,
+ dryRun bool) (rev Revision, changes map[CallbackType][]interface{}) {
+
+ var configChanged bool
+
+ if dstRev.GetConfig() == forkRev.GetConfig() {
+ configChanged = dstRev.GetConfig() != srcRev.GetConfig()
+ } else {
+ if dstRev.GetConfig().Hash != srcRev.GetConfig().Hash {
+ log.Error("config-collision")
+ }
+ configChanged = true
+ }
+
+ newChildren := reflect.ValueOf(dstRev.GetChildren()).Elem().Interface().(map[string][]Revision)
+ childrenFields := ChildrenFields(forkRev.GetData())
+
+ for fieldName, field := range childrenFields {
+ forkList := forkRev.GetChildren()[fieldName]
+ srcList := srcRev.GetChildren()[fieldName]
+ dstList := dstRev.GetChildren()[fieldName]
+
+ if revisionsAreEqual(forkList, srcList) {
+ for _, rev := range srcList {
+ mergeChildFunc(rev)
+ }
+ continue
+ }
+
+ if field.Key == "" {
+ if revisionsAreEqual(dstList, forkList) {
+ if !revisionsAreEqual(srcList, forkList) {
+ log.Error("we should not be here")
+ } else {
+ for _, rev := range srcList {
+ newChildren[fieldName] = append(newChildren[fieldName], mergeChildFunc(rev))
+ }
+ if field.IsContainer {
+ changes[POST_LISTCHANGE] = append(
+ changes[POST_LISTCHANGE],
+ NewOperationContext("", nil, fieldName, ""),
+ )
+ }
+ }
+ } else {
+ if !revisionsAreEqual(srcList, forkList) {
+ log.Error("cannot merge - single child node or un-keyed children list has changed")
+ }
+ }
+ } else {
+ if revisionsAreEqual(dstList, forkList) {
+ src := newChangeAnalysis(forkList, srcList, field.Key)
+
+ newList := reflect.ValueOf(srcList).Elem().Interface().([]Revision)
+
+ for key, _ := range src.AddedKeys {
+ idx := src.KeyMap1[key]
+ newRev := mergeChildFunc(newList[idx])
+ newList[idx] = newRev
+
+ changes[POST_ADD] = append(
+ changes[POST_ADD],
+ newRev.GetData(),
+ )
+ }
+ for key, _ := range src.RemovedKeys {
+ oldRev := forkList[src.KeyMap1[key]]
+
+ changes[POST_REMOVE] = append(
+ changes[POST_REMOVE],
+ oldRev.GetData(),
+ )
+ }
+ for key, _ := range src.ChangedKeys {
+ idx := src.KeyMap2[key]
+ newRev := mergeChildFunc(newList[idx])
+ newList[idx] = newRev
+ }
+
+ newChildren[fieldName] = newList
+ } else {
+ src := newChangeAnalysis(forkList, srcList, field.Key)
+ dst := newChangeAnalysis(forkList, dstList, field.Key)
+
+ newList := reflect.ValueOf(dstList).Elem().Interface().([]Revision)
+
+ for key, _ := range src.AddedKeys {
+ if _, exists := dst.AddedKeys[key]; exists {
+ childDstRev := dstList[dst.KeyMap2[key]]
+ childSrcRev := srcList[src.KeyMap2[key]]
+ if childDstRev.GetHash() == childSrcRev.GetHash() {
+ mergeChildFunc(childDstRev)
+ } else {
+ log.Error("conflict error - revision has been added is different")
+ }
+ } else {
+ newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
+ newList = append(newList, newRev)
+ changes[POST_ADD] = append(
+ changes[POST_ADD],
+ newRev.GetData(),
+ )
+ }
+ }
+ for key, _ := range src.ChangedKeys {
+ if _, removed := dst.RemovedKeys[key]; removed {
+ log.Error("conflict error - revision has been removed")
+ } else if _, changed := dst.ChangedKeys[key]; changed {
+ childDstRev := dstList[dst.KeyMap2[key]]
+ childSrcRev := srcList[src.KeyMap2[key]]
+ if childDstRev.GetHash() == childSrcRev.GetHash() {
+ mergeChildFunc(childSrcRev)
+ } else if childDstRev.GetConfig().Hash != childSrcRev.GetConfig().Hash {
+ log.Error("conflict error - revision has been changed and is different")
+ } else {
+ newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
+ newList[dst.KeyMap2[key]] = newRev
+ }
+ } else {
+ newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
+ newList[dst.KeyMap2[key]] = newRev
+ }
+ }
+
+ // TODO: how do i sort this map in reverse order?
+ for key, _ := range src.RemovedKeys {
+ if _, changed := dst.ChangedKeys[key]; changed {
+ log.Error("conflict error - revision has changed")
+ }
+ if _, removed := dst.ChangedKeys[key]; !removed {
+ dstIdx := dst.KeyMap2[key]
+ oldRev := newList[dstIdx]
+
+ copy(newList[dstIdx:], newList[dstIdx+1:])
+ newList[len(newList)-1] = nil
+ newList = newList[:len(newList)-1]
+
+ changes[POST_REMOVE] = append(
+ changes[POST_REMOVE],
+ oldRev.GetData(),
+ )
+ }
+ }
+
+ newChildren[fieldName] = newList
+
+ }
+ }
+ }
+
+ if !dryRun {
+ if configChanged {
+ rev = srcRev
+ } else {
+ rev = dstRev
+ }
+
+ rev = rev.UpdateAllChildren(newChildren, dstRev.GetBranch())
+
+ if configChanged {
+ changes[POST_UPDATE] = append(
+ changes[POST_UPDATE],
+ rev.GetData(),
+ )
+ }
+ return rev, changes
+ }
+
+ return nil, nil
+}
diff --git a/db/model/node.go b/db/model/node.go
index eacbec7..90ab666 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -18,6 +18,7 @@
import (
"fmt"
"github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
"reflect"
"strings"
)
@@ -30,55 +31,55 @@
root *Root
Type interface{}
Branches map[string]*Branch
- Tags map[string]*Revision
+ Tags map[string]Revision
Proxy *Proxy
EventBus *EventBus
AutoPrune bool
}
func NewNode(root *Root, initialData interface{}, autoPrune bool, txid string) *Node {
- cn := &Node{}
+ n := &Node{}
- cn.root = root
- cn.Branches = make(map[string]*Branch)
- cn.Tags = make(map[string]*Revision)
- cn.Proxy = nil
- cn.EventBus = nil
- cn.AutoPrune = autoPrune
+ n.root = root
+ n.Branches = make(map[string]*Branch)
+ n.Tags = make(map[string]Revision)
+ n.Proxy = nil
+ n.EventBus = nil
+ n.AutoPrune = autoPrune
if IsProtoMessage(initialData) {
- cn.Type = reflect.ValueOf(initialData).Interface()
+ n.Type = reflect.ValueOf(initialData).Interface()
dataCopy := proto.Clone(initialData.(proto.Message))
- cn.initialize(dataCopy, txid)
+ n.initialize(dataCopy, txid)
} else if reflect.ValueOf(initialData).IsValid() {
- cn.Type = reflect.ValueOf(initialData).Interface()
+ n.Type = reflect.ValueOf(initialData).Interface()
} else {
// not implemented error
fmt.Errorf("cannot process initial data - %+v", initialData)
}
- return cn
+ return n
}
-func (cn *Node) makeNode(data interface{}, txid string) *Node {
- return NewNode(cn.root, data, true, txid)
+func (n *Node) MakeNode(data interface{}, txid string) *Node {
+ return NewNode(n.root, data, true, txid)
}
-func (cn *Node) makeRevision(branch *Branch, data interface{}, children map[string][]*Revision) *Revision {
- return cn.root.makeRevision(branch, data, children)
+func (n *Node) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ return n.root.MakeRevision(branch, data, children)
}
-func (cn *Node) makeLatest(branch *Branch, revision *Revision, changeAnnouncement map[string]interface{}) {
- if _, ok := branch.revisions[revision.Hash]; !ok {
- branch.revisions[revision.Hash] = revision
+func (n *Node) MakeLatest(branch *Branch, revision Revision, changeAnnouncement map[CallbackType][]interface{}) {
+ if _, ok := branch.Revisions[revision.GetHash()]; !ok {
+ branch.Revisions[revision.GetHash()] = revision
}
- if branch.Latest == nil || revision.Hash != branch.Latest.Hash {
+ if branch.Latest == nil || revision.GetHash() != branch.Latest.GetHash() {
branch.Latest = revision
}
if changeAnnouncement != nil && branch.Txid == "" {
- if cn.Proxy != nil {
+ if n.Proxy != nil {
for changeType, data := range changeAnnouncement {
// TODO: Invoke callback
fmt.Printf("invoking callback - changeType: %+v, data:%+v\n", changeType, data)
@@ -92,22 +93,22 @@
}
}
-func (cn *Node) Latest() *Revision {
- if branch, exists := cn.Branches[NONE]; exists {
+func (n *Node) Latest() Revision {
+ if branch, exists := n.Branches[NONE]; exists {
return branch.Latest
}
return nil
}
-func (cn *Node) GetHash(hash string) *Revision {
- return cn.Branches[NONE].revisions[hash]
+func (n *Node) GetHash(hash string) Revision {
+ return n.Branches[NONE].Revisions[hash]
}
-func (cn *Node) initialize(data interface{}, txid string) {
- var children map[string][]*Revision
- children = make(map[string][]*Revision)
- for fieldName, field := range ChildrenFields(cn.Type) {
- fieldValue := GetAttributeValue(data, fieldName, 0)
+func (n *Node) initialize(data interface{}, txid string) {
+ var children map[string][]Revision
+ children = make(map[string][]Revision)
+ for fieldName, field := range ChildrenFields(n.Type) {
+ _, fieldValue := GetAttributeValue(data, fieldName, 0)
if fieldValue.IsValid() {
if field.IsContainer {
@@ -116,8 +117,8 @@
for i := 0; i < fieldValue.Len(); i++ {
v := fieldValue.Index(i)
- rev := cn.makeNode(v.Interface(), txid).Latest()
- key := GetAttributeValue(v.Interface(), field.Key, 0)
+ rev := n.MakeNode(v.Interface(), txid).Latest()
+ _, key := GetAttributeValue(v.Interface(), field.Key, 0)
for _, k := range keysSeen {
if k == key.String() {
fmt.Errorf("duplicate key - %s", k)
@@ -130,11 +131,11 @@
} else {
for i := 0; i < fieldValue.Len(); i++ {
v := fieldValue.Index(i)
- children[fieldName] = append(children[fieldName], cn.makeNode(v.Interface(), txid).Latest())
+ children[fieldName] = append(children[fieldName], n.MakeNode(v.Interface(), txid).Latest())
}
}
} else {
- children[fieldName] = append(children[fieldName], cn.makeNode(fieldValue.Interface(), txid).Latest())
+ children[fieldName] = append(children[fieldName], n.MakeNode(fieldValue.Interface(), txid).Latest())
}
} else {
fmt.Errorf("field is invalid - %+v", fieldValue)
@@ -142,29 +143,21 @@
}
// FIXME: ClearField??? No such method in go protos. Reset?
//data.ClearField(field_name)
- branch := NewBranch(cn, "", nil, cn.AutoPrune)
- rev := cn.makeRevision(branch, data, children)
- cn.makeLatest(branch, rev, nil)
- cn.Branches[txid] = branch
-}
+ branch := NewBranch(n, "", nil, n.AutoPrune)
+ rev := n.MakeRevision(branch, data, children)
+ n.MakeLatest(branch, rev, nil)
-func (cn *Node) makeTxBranch(txid string) *Branch {
- branchPoint := cn.Branches[NONE].Latest
- branch := NewBranch(cn, txid, branchPoint, true)
- cn.Branches[txid] = branch
- return branch
+ if txid == "" {
+ n.Branches[NONE] = branch
+ } else {
+ n.Branches[txid] = branch
+ }
}
-func (cn *Node) deleteTxBranch(txid string) {
- delete(cn.Branches, txid)
-}
-
-type t_makeBranch func(*Node) *Branch
-
//
// Get operation
//
-func (cn *Node) Get(path string, hash string, depth int, deep bool, txid string) interface{} {
+func (n *Node) Get(path string, hash string, depth int, deep bool, txid string) interface{} {
if deep {
depth = -1
}
@@ -174,26 +167,26 @@
}
var branch *Branch
- var rev *Revision
+ var rev Revision
// FIXME: should empty txid be cleaned up?
- if branch = cn.Branches[txid]; txid == "" || branch == nil {
- branch = cn.Branches[NONE]
+ if branch = n.Branches[txid]; txid == "" || branch == nil {
+ branch = n.Branches[NONE]
}
if hash != "" {
- rev = branch.revisions[hash]
+ rev = branch.Revisions[hash]
} else {
rev = branch.Latest
}
- return cn.get(rev, path, depth)
+ return n.get(rev, path, depth)
}
-func (cn *Node) findRevByKey(revs []*Revision, keyName string, value string) (int, *Revision) {
+func (n *Node) findRevByKey(revs []Revision, keyName string, value string) (int, Revision) {
for i, rev := range revs {
- dataValue := reflect.ValueOf(rev.Config.Data)
- dataStruct := GetAttributeStructure(rev.Config.Data, keyName, 0)
+ dataValue := reflect.ValueOf(rev.GetData())
+ dataStruct := GetAttributeStructure(rev.GetData(), keyName, 0)
fieldValue := dataValue.Elem().FieldByName(dataStruct.Name)
@@ -207,50 +200,72 @@
return -1, nil
}
-func (cn *Node) get(rev *Revision, path string, depth int) interface{} {
+func (n *Node) get(rev Revision, path string, depth int) interface{} {
if path == "" {
- return cn.doGet(rev, depth)
+ return n.doGet(rev, depth)
}
partition := strings.SplitN(path, "/", 2)
name := partition[0]
- path = partition[1]
- field := ChildrenFields(cn.Type)[name]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
- if field.IsContainer {
+ names := ChildrenFields(n.Type)
+ field := names[name]
+
+ if field != nil && field.IsContainer {
if field.Key != "" {
- children := rev.Children[name]
+ children := rev.GetChildren()[name]
if path != "" {
partition = strings.SplitN(path, "/", 2)
key := partition[0]
path = ""
key = field.KeyFromStr(key).(string)
- _, childRev := cn.findRevByKey(children, field.Key, key)
- childNode := childRev.getNode()
- return childNode.get(childRev, path, depth)
+ if _, childRev := n.findRevByKey(children, field.Key, key); childRev == nil {
+ return nil
+ } else {
+ childNode := childRev.GetNode()
+ return childNode.get(childRev, path, depth)
+ }
} else {
var response []interface{}
for _, childRev := range children {
- childNode := childRev.getNode()
+ childNode := childRev.GetNode()
value := childNode.doGet(childRev, depth)
response = append(response, value)
}
return response
}
} else {
- childRev := rev.Children[name][0]
- childNode := childRev.getNode()
- return childNode.get(childRev, path, depth)
+ var response []interface{}
+ if path != "" {
+ // TODO: raise error
+ return response
+ }
+ for _, childRev := range rev.GetChildren()[name] {
+ childNode := childRev.GetNode()
+ value := childNode.doGet(childRev, depth)
+ response = append(response, value)
+ }
+ return response
}
+ } else {
+ c1 := rev.GetChildren()[name]
+ childRev := c1[0]
+ childNode := childRev.GetNode()
+ return childNode.get(childRev, path, depth)
}
return nil
}
-func (cn *Node) doGet(rev *Revision, depth int) interface{} {
+func (n *Node) doGet(rev Revision, depth int) interface{} {
msg := rev.Get(depth)
- if cn.Proxy != nil {
+ if n.Proxy != nil {
// TODO: invoke GET callback
fmt.Println("invoking proxy GET Callbacks")
}
@@ -260,7 +275,7 @@
//
// Update operation
//
-func (n *Node) Update(path string, data interface{}, strict bool, txid string, makeBranch t_makeBranch) *Revision {
+func (n *Node) Update(path string, data interface{}, strict bool, txid string, makeBranch t_makeBranch) Revision {
// FIXME: is this required ... a bit overkill to take out a "/"
for strings.HasPrefix(path, "/") {
path = path[1:]
@@ -268,63 +283,117 @@
var branch *Branch
var ok bool
- if branch, ok = n.Branches[txid]; !ok {
+ if txid == "" {
+ branch = n.Branches[NONE]
+ } else if branch, ok = n.Branches[txid]; !ok {
branch = makeBranch(n)
}
+ log.Debugf("Branch data : %+v, Passed data: %+v", branch.Latest.GetData(), data)
+
if path == "" {
return n.doUpdate(branch, data, strict)
}
- return &Revision{}
-}
-func (n *Node) doUpdate(branch *Branch, data interface{}, strict bool) *Revision {
- if reflect.TypeOf(data) != n.Type {
- // TODO raise error
- fmt.Errorf("data does not match type: %+v", n.Type)
+ // TODO missing some code here...
+ rev := branch.Latest
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
}
- // TODO: noChildren?
+ field := ChildrenFields(n.Type)[name]
+ var children []Revision
+
+ if field.IsContainer {
+ if path == "" {
+ fmt.Errorf("cannot update a list\n")
+ } else if field.Key != "" {
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ key = field.KeyFromStr(key).(string)
+ // TODO. Est-ce que le copy ne fonctionne pas? dois-je plutôt faire un clone de chaque item?
+ for _, v := range rev.GetChildren()[name] {
+ revCopy := reflect.ValueOf(v).Interface().(Revision)
+ children = append(children, revCopy)
+ }
+ idx, childRev := n.findRevByKey(children, field.Key, key)
+ childNode := childRev.GetNode()
+ newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+ if newChildRev.GetHash() == childRev.GetHash() {
+ if newChildRev != childRev {
+ log.Debug("clear-hash - %s %+v", newChildRev.GetHash(), newChildRev)
+ newChildRev.ClearHash()
+ }
+ return branch.Latest
+ }
+ if _, newKey := GetAttributeValue(newChildRev.GetData(), field.Key, 0); newKey.Interface().(string) != key {
+ fmt.Errorf("cannot change key field\n")
+ }
+ children[idx] = newChildRev
+ rev = rev.UpdateChildren(name, children, branch)
+ n.root.MakeLatest(branch, rev, nil)
+ return rev
+ } else {
+ fmt.Errorf("cannot index into container with no keys\n")
+ }
+ } else {
+ childRev := rev.GetChildren()[name][0]
+ childNode := childRev.GetNode()
+ newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+ rev = rev.UpdateChildren(name, []Revision{newChildRev}, branch)
+ n.root.MakeLatest(branch, rev, nil)
+ return rev
+ }
+ return nil
+}
+
+func (n *Node) doUpdate(branch *Branch, data interface{}, strict bool) Revision {
+ log.Debugf("Comparing types - expected: %+v, actual: %+v", reflect.ValueOf(n.Type).Type(), reflect.TypeOf(data))
+
+ if reflect.TypeOf(data) != reflect.ValueOf(n.Type).Type() {
+ // TODO raise error
+ fmt.Errorf("data does not match type: %+v", n.Type)
+ return nil
+ }
+
+ // TODO: validate that this actually works
+ //if n.hasChildren(data) {
+ // return nil
+ //}
if n.Proxy != nil {
// TODO: n.proxy.InvokeCallbacks(CallbackType.PRE_UPDATE, data)
fmt.Println("invoking proxy PRE_UPDATE Callbacks")
}
- if branch.Latest.getData() != data {
+ if !reflect.DeepEqual(branch.Latest.GetData(), data) {
if strict {
- // TODO: checkAccessViolations(data, branch.GetLatest.data)
+ // TODO: checkAccessViolations(data, Branch.GetLatest.data)
fmt.Println("checking access violations")
}
rev := branch.Latest.UpdateData(data, branch)
- n.makeLatest(branch, rev, nil) // TODO -> changeAnnouncement needs to be a tuple (CallbackType.POST_UPDATE, rev.data)
+ n.root.MakeLatest(branch, rev, nil) // TODO -> changeAnnouncement needs to be a tuple (CallbackType.
+ // POST_UPDATE, rev.data)
return rev
} else {
return branch.Latest
}
}
-// TODO: the python implementation has a method to check if the data has no children
-//func (n *SomeNode) noChildren(data interface{}) bool {
-// for fieldName, field := range ChildrenFields(n.Type) {
-// fieldValue := GetAttributeValue(data, fieldName)
-//
-// if fieldValue.IsValid() {
-// if field.IsContainer {
-// if len(fieldValue) > 0 {
-//
-// }
-// } else {
-//
-// }
-//
-// }
-// }
-//}
-
//
// Add operation
//
-func (n *Node) Add(path string, data interface{}, txid string, makeBranch t_makeBranch) *Revision {
+func (n *Node) Add(path string, data interface{}, txid string, makeBranch t_makeBranch) Revision {
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -335,7 +404,9 @@
var branch *Branch
var ok bool
- if branch, ok = n.Branches[txid]; !ok {
+ if txid == "" {
+ branch = n.Branches[NONE]
+ } else if branch, ok = n.Branches[txid]; !ok {
branch = makeBranch(n)
}
@@ -343,10 +414,15 @@
partition := strings.SplitN(path, "/", 2)
name := partition[0]
- path = partition[1]
+
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
field := ChildrenFields(n.Type)[name]
- var children []*Revision
+ var children []Revision
if field.IsContainer {
if path == "" {
@@ -356,17 +432,21 @@
fmt.Println("invoking proxy PRE_ADD Callbacks")
}
- copy(children, rev.Children[name])
- key := GetAttributeValue(data, field.Key, 0)
+ for _, v := range rev.GetChildren()[name] {
+ revCopy := reflect.ValueOf(v).Interface().(Revision)
+ children = append(children, revCopy)
+ }
+ _, key := GetAttributeValue(data, field.Key, 0)
if _, rev := n.findRevByKey(children, field.Key, key.String()); rev != nil {
// TODO raise error
fmt.Errorf("duplicate key found: %s", key.String())
}
- childRev := n.makeNode(data, "").Latest()
+ childRev := n.MakeNode(data, "").Latest()
children = append(children, childRev)
rev := rev.UpdateChildren(name, children, branch)
- n.makeLatest(branch, rev, nil) // TODO -> changeAnnouncement needs to be a tuple (CallbackType.POST_ADD, rev.data)
+ n.root.MakeLatest(branch, rev, nil) // TODO -> changeAnnouncement needs to be a tuple (CallbackType.
+ // POST_ADD, rev.data)
return rev
} else {
fmt.Errorf("cannot add to non-keyed container\n")
@@ -374,15 +454,19 @@
} else if field.Key != "" {
partition := strings.SplitN(path, "/", 2)
key := partition[0]
- path = partition[1]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
key = field.KeyFromStr(key).(string)
- copy(children, rev.Children[name])
+ copy(children, rev.GetChildren()[name])
idx, childRev := n.findRevByKey(children, field.Key, key)
- childNode := childRev.getNode()
+ childNode := childRev.GetNode()
newChildRev := childNode.Add(path, data, txid, makeBranch)
children[idx] = newChildRev
rev := rev.UpdateChildren(name, children, branch)
- n.makeLatest(branch, rev, nil)
+ n.root.MakeLatest(branch, rev, nil)
return rev
} else {
fmt.Errorf("cannot add to non-keyed container\n")
@@ -396,7 +480,7 @@
//
// Remove operation
//
-func (n *Node) Remove(path string, txid string, makeBranch t_makeBranch) *Revision {
+func (n *Node) Remove(path string, txid string, makeBranch t_makeBranch) Revision {
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -406,7 +490,9 @@
}
var branch *Branch
var ok bool
- if branch, ok = n.Branches[txid]; !ok {
+ if txid == "" {
+ branch = n.Branches[NONE]
+ } else if branch, ok = n.Branches[txid]; !ok {
branch = makeBranch(n)
}
@@ -414,11 +500,15 @@
partition := strings.SplitN(path, "/", 2)
name := partition[0]
- path = partition[1]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
field := ChildrenFields(n.Type)[name]
- var children []*Revision
- post_anno := make(map[string]interface{})
+ var children []Revision
+ post_anno := make(map[CallbackType][]interface{})
if field.IsContainer {
if path == "" {
@@ -426,22 +516,32 @@
} else if field.Key != "" {
partition := strings.SplitN(path, "/", 2)
key := partition[0]
- path = partition[1]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
key = field.KeyFromStr(key).(string)
if path != "" {
- copy(children, rev.Children[name])
+ for _, v := range rev.GetChildren()[name] {
+ newV := reflect.ValueOf(v).Interface().(Revision)
+ children = append(children, newV)
+ }
idx, childRev := n.findRevByKey(children, field.Key, key)
- childNode := childRev.getNode()
+ childNode := childRev.GetNode()
newChildRev := childNode.Remove(path, txid, makeBranch)
children[idx] = newChildRev
rev := rev.UpdateChildren(name, children, branch)
- n.makeLatest(branch, rev, nil)
+ n.root.MakeLatest(branch, rev, nil)
return rev
} else {
- copy(children, rev.Children[name])
+ for _, v := range rev.GetChildren()[name] {
+ newV := reflect.ValueOf(v).Interface().(Revision)
+ children = append(children, newV)
+ }
idx, childRev := n.findRevByKey(children, field.Key, key)
if n.Proxy != nil {
- data := childRev.getData()
+ data := childRev.GetData()
fmt.Println("invoking proxy PRE_REMOVE Callbacks")
fmt.Printf("setting POST_REMOVE Callbacks : %+v\n", data)
} else {
@@ -449,7 +549,7 @@
}
children = append(children[:idx], children[idx+1:]...)
rev := rev.UpdateChildren(name, children, branch)
- n.makeLatest(branch, rev, post_anno)
+ n.root.MakeLatest(branch, rev, post_anno)
return rev
}
} else {
@@ -462,10 +562,154 @@
return nil
}
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+type t_makeBranch func(*Node) *Branch
+
+func (n *Node) makeTxBranch(txid string) *Branch {
+ branchPoint := n.Branches[NONE].Latest
+ branch := NewBranch(n, txid, branchPoint, true)
+ n.Branches[txid] = branch
+ return branch
+}
+
+func (n *Node) deleteTxBranch(txid string) {
+ delete(n.Branches, txid)
+}
+
+func (n *Node) mergeChild(txid string, dryRun bool) func(Revision) Revision {
+ f := func(rev Revision) Revision {
+ childBranch := rev.GetBranch()
+
+ if childBranch.Txid == txid {
+ rev = childBranch.Node.mergeTxBranch(txid, dryRun)
+ }
+
+ return rev
+ }
+ return f
+}
+
+func (n *Node) mergeTxBranch(txid string, dryRun bool) Revision {
+ srcBranch := n.Branches[txid]
+ dstBranch := n.Branches[NONE]
+
+ forkRev := srcBranch.Origin
+ srcRev := srcBranch.Latest
+ dstRev := dstBranch.Latest
+
+ rev, changes := Merge3Way(forkRev, srcRev, dstRev, n.mergeChild(txid, dryRun), dryRun)
+
+ if !dryRun {
+ n.root.MakeLatest(dstBranch, rev, changes)
+ delete(n.Branches, txid)
+ }
+
+ return rev
+}
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+//func (n *Node) diff(hash1, hash2, txid string) {
+// branch := n.Branches[txid]
+// rev1 := branch.get(hash1)
+// rev2 := branch.get(hash2)
+//
+// if rev1.GetHash() == rev2.GetHash() {
+// // empty patch
+// } else {
+// // translate data to json and generate patch
+// patch, err := jsonpatch.MakePatch(rev1.GetData(), rev2.GetData())
+// patch.
+// }
+//}
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tag utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+// TODO: is tag mgmt used in the python implementation? Need to validate
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+func (n *Node) hasChildren(data interface{}) bool {
+ for fieldName, field := range ChildrenFields(n.Type) {
+ _, fieldValue := GetAttributeValue(data, fieldName, 0)
+
+ if (field.IsContainer && fieldValue.Len() > 0) || !fieldValue.IsNil() {
+ log.Error("cannot update external children")
+ return true
+ }
+ }
+
+ return false
+}
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Node Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+func (n *Node) GetProxy(path string, exclusive bool) *Proxy {
+ return n.getProxy(path, n.root, path, exclusive)
+}
+func (n *Node) getProxy(path string, root *Root, fullPath string, exclusive bool) *Proxy {
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+ if path == "" {
+ return n.makeProxy(n.root, path, exclusive)
+ }
+
+ rev := n.Branches[NONE].Latest
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+ path = partition[1]
+
+ field := ChildrenFields(n.Type)[name]
+ if field.IsContainer {
+ if path == "" {
+ log.Error("cannot proxy a container field")
+ }
+ if field.Key != "" {
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ path = partition[1]
+ key = field.KeyFromStr(key).(string)
+ children := rev.GetChildren()[name]
+ _, childRev := n.findRevByKey(children, field.Key, key)
+ childNode := childRev.GetNode()
+ return childNode.getProxy(path, root, fullPath, exclusive)
+ }
+ log.Error("cannot index into container with no keys")
+ } else {
+ childRev := rev.GetChildren()[name][0]
+ childNode := childRev.GetNode()
+ return childNode.getProxy(path, root, fullPath, exclusive)
+ }
+
+ return nil
+}
+
+func (n *Node) makeProxy(root *Root, fullPath string, exclusive bool) *Proxy {
+ if n.Proxy == nil {
+ n.Proxy = NewProxy(root, n, fullPath, exclusive)
+ } else {
+ if n.Proxy.Exclusive {
+ log.Error("node is already owned exclusively")
+ }
+ }
+ return n.Proxy
+}
+
+func (n *Node) makeEventBus() *EventBus {
+ if n.EventBus == nil {
+ n.EventBus = NewEventBus()
+ }
+ return n.EventBus
+}
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Persistence Loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
func (n *Node) LoadLatest(kvStore *Backend, hash string) {
branch := NewBranch(n, "", nil, n.AutoPrune)
pr := &PersistedRevision{}
- rev := pr.load(branch, kvStore, n.Type, hash)
- n.makeLatest(branch, rev.Revision, nil)
+ rev := pr.Load(branch, kvStore, n.Type, hash)
+ n.MakeLatest(branch, rev, nil)
n.Branches[NONE] = branch
}
diff --git a/db/model/node_test.go b/db/model/node_test.go
index cc17981..29f4c7c 100644
--- a/db/model/node_test.go
+++ b/db/model/node_test.go
@@ -19,10 +19,9 @@
"crypto/md5"
"fmt"
"github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha/protos/go/bbf_fiber"
- "github.com/opencord/voltha/protos/go/common"
- "github.com/opencord/voltha/protos/go/openflow_13"
- "github.com/opencord/voltha/protos/go/voltha"
+ "github.com/opencord/voltha-go/protos/common"
+ "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
"testing"
)
@@ -39,34 +38,33 @@
},
}
data := &voltha.Device{
- Id: "Config-SomeNode-01-new-test",
- Type: "simulated_olt",
- Root: true,
- ParentId: "",
- ParentPortNo: 0,
- Vendor: "voltha-test",
- Model: "GetLatest-voltha-simulated-olt",
- HardwareVersion: "1.0.0",
- FirmwareVersion: "1.0.0",
- Images: &voltha.Images{},
- SerialNumber: "abcdef-123456",
- VendorId: "DEADBEEF-INC",
- Adapter: "simulated_olt",
- Vlan: 1234,
- Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
- ExtraArgs: "",
- ProxyAddress: &voltha.Device_ProxyAddress{},
- AdminState: voltha.AdminState_PREPROVISIONED,
- OperStatus: common.OperStatus_ACTIVE,
- Reason: "",
- ConnectStatus: common.ConnectStatus_REACHABLE,
- Custom: &any.Any{},
- Ports: ports,
- Flows: &openflow_13.Flows{},
- FlowGroups: &openflow_13.FlowGroups{},
- PmConfigs: &voltha.PmConfigs{},
- ChannelTerminations: []*bbf_fiber.ChannelterminationConfig{},
- ImageDownloads: []*voltha.ImageDownload{},
+ Id: "Config-SomeNode-01-new-test",
+ Type: "simulated_olt",
+ Root: true,
+ ParentId: "",
+ ParentPortNo: 0,
+ Vendor: "voltha-test",
+ Model: "GetLatest-voltha-simulated-olt",
+ HardwareVersion: "1.0.0",
+ FirmwareVersion: "1.0.0",
+ Images: &voltha.Images{},
+ SerialNumber: "abcdef-123456",
+ VendorId: "DEADBEEF-INC",
+ Adapter: "simulated_olt",
+ Vlan: 1234,
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ ExtraArgs: "",
+ ProxyAddress: &voltha.Device_ProxyAddress{},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ OperStatus: common.OperStatus_ACTIVE,
+ Reason: "",
+ ConnectStatus: common.ConnectStatus_REACHABLE,
+ Custom: &any.Any{},
+ Ports: ports,
+ Flows: &openflow_13.Flows{},
+ FlowGroups: &openflow_13.FlowGroups{},
+ PmConfigs: &voltha.PmConfigs{},
+ ImageDownloads: []*voltha.ImageDownload{},
}
root := &Root{}
txid := fmt.Sprintf("%x", md5.Sum([]byte("node_transaction_id")))
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
new file mode 100644
index 0000000..26d25a0
--- /dev/null
+++ b/db/model/non_persisted_revision.go
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "bytes"
+ "crypto/md5"
+ "fmt"
+ "reflect"
+ "sort"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+var (
+ RevisionCache = make(map[string]interface{})
+)
+
+type NonPersistedRevision struct {
+ Config *DataRevision
+ Children map[string][]Revision
+ Hash string
+ Branch *Branch
+ WeakRef string
+}
+
+func NewNonPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ cr := &NonPersistedRevision{}
+ cr.Branch = branch
+ cr.Config = NewDataRevision(data)
+ cr.Children = children
+ cr.Finalize()
+
+ return cr
+}
+
+func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
+ npr.Config = config
+}
+
+func (npr *NonPersistedRevision) GetConfig() *DataRevision {
+ return npr.Config
+}
+
+func (npr *NonPersistedRevision) SetChildren(children map[string][]Revision) {
+ npr.Children = children
+}
+
+func (npr *NonPersistedRevision) GetChildren() map[string][]Revision {
+ return npr.Children
+}
+
+func (npr *NonPersistedRevision) SetHash(hash string) {
+ npr.Hash = hash
+}
+
+func (npr *NonPersistedRevision) GetHash() string {
+ return npr.Hash
+}
+
+func (npr *NonPersistedRevision) ClearHash() {
+ npr.Hash = ""
+}
+
+func (npr *NonPersistedRevision) SetBranch(branch *Branch) {
+ npr.Branch = branch
+}
+
+func (npr *NonPersistedRevision) GetBranch() *Branch {
+ return npr.Branch
+}
+
+func (npr *NonPersistedRevision) GetData() interface{} {
+ if npr.Config == nil {
+ return nil
+ }
+ return npr.Config.Data
+}
+
+func (npr *NonPersistedRevision) GetNode() *Node {
+ return npr.Branch.Node
+}
+
+func (npr *NonPersistedRevision) Finalize() {
+ npr.SetHash(npr.hashContent())
+
+ if _, exists := RevisionCache[npr.Hash]; !exists {
+ RevisionCache[npr.Hash] = npr
+ }
+ if _, exists := RevisionCache[npr.Config.Hash]; !exists {
+ RevisionCache[npr.Config.Hash] = npr.Config
+ } else {
+ npr.Config = RevisionCache[npr.Config.Hash].(*DataRevision)
+ }
+}
+
+func (npr *NonPersistedRevision) hashContent() string {
+ var buffer bytes.Buffer
+ var childrenKeys []string
+
+ if npr.Config != nil {
+ buffer.WriteString(npr.Config.Hash)
+ }
+
+ for key, _ := range npr.Children {
+ childrenKeys = append(childrenKeys, key)
+ }
+ sort.Strings(childrenKeys)
+
+ if npr.Children != nil && len(npr.Children) > 0 {
+ // Loop through sorted Children keys
+ for _, key := range childrenKeys {
+ for _, child := range npr.Children[key] {
+ if child != nil && child.GetHash() != "" {
+ buffer.WriteString(child.GetHash())
+ }
+ }
+ }
+ }
+
+ return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
+}
+
+func (npr *NonPersistedRevision) Get(depth int) interface{} {
+ originalData := npr.GetData()
+ data := reflect.ValueOf(originalData).Interface()
+
+ if depth != 0 {
+ for fieldName, field := range ChildrenFields(npr.GetData()) {
+ childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
+ if field.IsContainer {
+ for _, rev := range npr.Children[fieldName] {
+ childData := rev.Get(depth - 1)
+ childDataHolder = reflect.Append(childDataHolder, reflect.ValueOf(childData))
+ //fmt.Printf("data:%+v, dataHolder:%+v\n", childData, childDataHolder)
+ }
+ } else {
+ rev := npr.Children[fieldName][0]
+ childData := rev.Get(depth - 1)
+ childDataHolder = reflect.Append(childDataHolder, reflect.ValueOf(childData))
+ //fmt.Printf("data:%+v, dataHolder:%+v\n", childData, childDataHolder)
+ }
+ // Merge child data with cloned object
+ reflect.ValueOf(data).Elem().FieldByName(childDataName).Set(childDataHolder)
+ }
+ }
+ return data
+}
+
+func (npr *NonPersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
+ newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
+ newRev.SetBranch(branch)
+ log.Debugf("newRev config : %+v, npr: %+v", newRev.GetConfig(), npr)
+ newRev.SetConfig(NewDataRevision(data))
+ newRev.Finalize()
+
+ return &newRev
+}
+
+func (npr *NonPersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
+ newChildren := make(map[string][]Revision)
+ for entryName, childrenEntry := range npr.Children {
+ for _, revisionEntry := range childrenEntry {
+ newEntry := reflect.ValueOf(revisionEntry).Interface().(Revision)
+ newChildren[entryName] = append(newChildren[entryName], newEntry)
+ }
+ }
+ newChildren[name] = children
+
+ newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
+ newRev.SetBranch(branch)
+ newRev.SetChildren(newChildren)
+ newRev.Finalize()
+
+ return &newRev
+}
+
+func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
+ newRev := reflect.ValueOf(npr).Interface().(NonPersistedRevision)
+ newRev.SetBranch(branch)
+ newRev.SetChildren(children)
+ newRev.Finalize()
+
+ return &newRev
+}
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index 81d3563..805557a 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -23,23 +23,25 @@
"github.com/golang/protobuf/proto"
"io/ioutil"
"reflect"
+ "time"
)
type PersistedRevision struct {
- *Revision
+ Revision
Compress bool
kvStore *Backend
}
-func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]*Revision) *PersistedRevision {
+func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
pr := &PersistedRevision{}
- pr.kvStore = branch.node.root.KvStore
- pr.Revision = NewRevision(branch, data, children)
+ pr.kvStore = branch.Node.root.KvStore
+ pr.Revision = NewNonPersistedRevision(branch, data, children)
+ pr.Finalize()
return pr
}
-func (pr *PersistedRevision) finalize() {
- pr.Revision.finalize()
+func (pr *PersistedRevision) Finalize() {
+ //pr.Revision.Finalize()
pr.store()
}
@@ -49,23 +51,23 @@
}
func (pr *PersistedRevision) store() {
- if ok, _ := pr.kvStore.Get(pr.Revision.Hash); ok != nil {
+ if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
return
}
pr.storeConfig()
childrenHashes := make(map[string][]string)
- for fieldName, children := range pr.Children {
+ for fieldName, children := range pr.GetChildren() {
hashes := []string{}
for _, rev := range children {
- hashes = append(hashes, rev.Hash)
+ hashes = append(hashes, rev.GetHash())
}
childrenHashes[fieldName] = hashes
}
data := &revData{
Children: childrenHashes,
- Config: pr.Config.Hash,
+ Config: pr.GetConfig().Hash,
}
if blob, err := json.Marshal(data); err != nil {
// TODO report error
@@ -77,12 +79,14 @@
w.Close()
blob = b.Bytes()
}
- pr.kvStore.Put(pr.Hash, blob)
+ pr.kvStore.Put(pr.GetHash(), blob)
}
}
-func (pr *PersistedRevision) load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) *PersistedRevision {
+func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
blob, _ := kvStore.Get(hash)
+
+ start := time.Now()
output := blob.Value.([]byte)
var data revData
if pr.Compress {
@@ -99,32 +103,40 @@
fmt.Errorf("problem to unmarshal data - %s", err.Error())
}
+ stop := time.Now()
+ GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
configHash := data.Config
configData := pr.loadConfig(kvStore, msgClass, configHash)
- assembledChildren := make(map[string][]*Revision)
+
+ assembledChildren := make(map[string][]Revision)
childrenHashes := data.Children
- node := branch.node
+ node := branch.Node
for fieldName, child := range ChildrenFields(msgClass) {
- var children []*Revision
+ var children []Revision
for _, childHash := range childrenHashes[fieldName] {
- //fmt.Printf("child class type: %+v", reflect.New(n).Elem().Interface())
- childNode := node.makeNode(reflect.New(child.ClassType).Elem().Interface(), "")
+ childNode := node.MakeNode(reflect.New(child.ClassType).Elem().Interface(), "")
childNode.LoadLatest(kvStore, childHash)
childRev := childNode.Latest()
children = append(children, childRev)
}
assembledChildren[fieldName] = children
}
+
rev := NewPersistedRevision(branch, configData, assembledChildren)
return rev
}
+func (pr *PersistedRevision) assignValue(a, b Revision) Revision {
+ a = b
+ return a
+}
+
func (pr *PersistedRevision) storeConfig() {
- if ok, _ := pr.kvStore.Get(pr.Config.Hash); ok != nil {
+ if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
return
}
- if blob, err := proto.Marshal(pr.Config.Data.(proto.Message)); err != nil {
+ if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
// TODO report error
} else {
if pr.Compress {
@@ -134,12 +146,13 @@
w.Close()
blob = b.Bytes()
}
- pr.kvStore.Put(pr.Config.Hash, blob)
+ pr.kvStore.Put(pr.GetConfig().Hash, blob)
}
}
func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
blob, _ := kvStore.Get(hash)
+ start := time.Now()
output := blob.Value.([]byte)
if pr.Compress {
@@ -161,5 +174,50 @@
}
}
+ stop := time.Now()
+
+ GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
return data.Interface()
}
+
+func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
+ newNPR := pr.Revision.UpdateData(data, branch)
+
+ newPR := &PersistedRevision{
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ }
+
+ newPR.Finalize()
+
+ return newPR
+}
+
+func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
+ newNPR := pr.Revision.UpdateChildren(name, children, branch)
+
+ newPR := &PersistedRevision{
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ }
+
+ newPR.Finalize()
+
+ return newPR
+}
+
+func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
+ newNPR := pr.Revision.UpdateAllChildren(children, branch)
+
+ newPR := &PersistedRevision{
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ }
+
+ newPR.Finalize()
+
+ return newPR
+}
diff --git a/db/model/profiling.go b/db/model/profiling.go
new file mode 100644
index 0000000..9d13d5a
--- /dev/null
+++ b/db/model/profiling.go
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "sync"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+type profiling struct {
+ DatabaseRetrieveTime float64
+ DatabaseRetrieveCount int
+ InMemoryModelTime float64
+ InMemoryModelCount int
+ InMemoryProcessTime float64
+ DatabaseStoreTime float64
+}
+
+var profiling_instance *profiling
+var profiling_once sync.Once
+
+func GetProfiling() *profiling {
+ profiling_once.Do(func() {
+ profiling_instance = &profiling{}
+ })
+ return profiling_instance
+}
+
+func (p *profiling) AddToDatabaseRetrieveTime(period float64) {
+ p.DatabaseRetrieveTime += period
+ p.DatabaseRetrieveCount += 1
+}
+func (p *profiling) AddToInMemoryModelTime(period float64) {
+ p.InMemoryModelTime += period
+ p.InMemoryModelCount += 1
+}
+func (p *profiling) AddToInMemoryProcessTime(period float64) {
+ p.InMemoryProcessTime += period
+}
+func (p *profiling) AddToDatabaseStoreTime(period float64) {
+ p.DatabaseStoreTime += period
+}
+
+func (p *profiling) Report() {
+ log.Infof("[ Profiling Report ]")
+ log.Infof("Database Retrieval : %f", p.DatabaseRetrieveTime)
+ log.Infof("Database Retrieval Count : %d", p.DatabaseRetrieveCount)
+ log.Infof("Avg Database Retrieval : %f", p.DatabaseRetrieveTime/float64(p.DatabaseRetrieveCount))
+ log.Infof("In-Memory Modeling : %f", p.InMemoryModelTime)
+ log.Infof("In-Memory Modeling Count: %d", p.InMemoryModelCount)
+ log.Infof("Avg In-Memory Modeling : %f", p.InMemoryModelTime/float64(p.InMemoryModelCount))
+
+}
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 82ab113..8085eaf 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -21,6 +21,28 @@
"strings"
)
+type OperationContext struct {
+ Path string
+ Data interface{}
+ FieldName string
+ ChildKey string
+}
+
+func NewOperationContext(path string, data interface{}, fieldName string, childKey string) *OperationContext {
+ oc := &OperationContext{
+ Path: path,
+ Data: data,
+ FieldName: fieldName,
+ ChildKey: childKey,
+ }
+ return oc
+}
+
+func (oc *OperationContext) Update(data interface{}) *OperationContext {
+ oc.Data = data
+ return oc
+}
+
type Proxy struct {
Root *Root
Node *Node
@@ -87,16 +109,16 @@
}
func (p *Proxy) openTransaction() *Transaction {
- txid := p.Root.makeTxBranch()
+ txid := p.Root.MakeTxBranch()
return NewTransaction(p, txid)
}
func (p *Proxy) commitTransaction(txid string) {
- p.Root.foldTxBranch(txid)
+ p.Root.FoldTxBranch(txid)
}
func (p *Proxy) cancelTransaction(txid string) {
- p.Root.deleteTxBranch(txid)
+ p.Root.DeleteTxBranch(txid)
}
func (p *Proxy) RegisterCallback(callbackType CallbackType, callback func(), args ...interface{}) {
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
new file mode 100644
index 0000000..1824d3f
--- /dev/null
+++ b/db/model/proxy_test.go
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/opencord/voltha-go/protos/voltha"
+ "testing"
+ "github.com/opencord/voltha-go/common/log"
+ "strconv"
+ "reflect"
+ "github.com/google/uuid"
+ "encoding/hex"
+ "encoding/json"
+)
+
+type proxyTest struct {
+ Root *Root
+ Backend *Backend
+ Proxy *Proxy
+ DbPrefix string
+ DbType string
+ DbHost string
+ DbPort int
+ DbTimeout int
+}
+
+var (
+ pt = &proxyTest{
+ DbPrefix: "service/voltha/data/core/0001",
+ DbType: "etcd",
+ //DbHost: "10.102.58.0",
+ DbHost: "localhost",
+ DbPort: 2379,
+ DbTimeout: 5,
+ }
+ devId string
+ targetDeviceId string
+)
+
+func init() {
+ if _, err := log.SetLogger(log.CONSOLE, 0, log.Fields{"instanceId": "proxy_test"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("cannot setup logging")
+ }
+ defer log.CleanUp()
+
+}
+func Test_Proxy_0_GetRootProxy(t *testing.T) {
+ pt.Backend = NewBackend(pt.DbType, pt.DbHost, pt.DbPort, pt.DbTimeout, pt.DbPrefix)
+
+ msgClass := &voltha.Voltha{}
+ root := NewRoot(msgClass, pt.Backend, nil)
+ pt.Root = root.Load(msgClass)
+
+ GetProfiling().Report()
+
+ pt.Proxy = pt.Root.Node.GetProxy("/", false)
+}
+
+func Test_Proxy_1_GetDevices(t *testing.T) {
+ devices := pt.Proxy.Get("/devices", 1, false, "")
+
+ if len(devices.([]interface{})) == 0 {
+ t.Error("there are no available devices to retrieve")
+ } else {
+ // Save the target device id for later tests
+ targetDeviceId = devices.([]interface{})[0].(*voltha.Device).Id
+ t.Logf("retrieved devices: %+v", devices)
+ }
+}
+
+func Test_Proxy_2_GetDevice(t *testing.T) {
+ basePath := "/devices/" + targetDeviceId
+ device1 := pt.Proxy.Get(basePath + "/ports", 1, false, "")
+ t.Logf("retrieved device with ports: %+v", device1)
+
+ device2 := pt.Proxy.Get(basePath, 0, false, "")
+
+ t.Logf("retrieved device: %+v", device2)
+}
+
+//func Test_Proxy_3_AddDevice(t *testing.T) {
+// //ports := []*voltha.Port{
+// // {
+// // PortNo: 123,
+// // Label: "test-port-0",
+// // Type: voltha.Port_PON_OLT,
+// // AdminState: common.AdminState_ENABLED,
+// // OperStatus: common.OperStatus_ACTIVE,
+// // DeviceId: "etcd_port-0-device-id",
+// // Peers: []*voltha.Port_PeerPort{},
+// // },
+// //}
+// devIdBin, _ := uuid.New().MarshalBinary()
+// devId := hex.EncodeToString(devIdBin)[:12]
+//
+// device := &voltha.Device{
+// Id: devId,
+// Type: "simulated_olt",
+// //Root: true,
+// //ParentId: "",
+// //ParentPortNo: 0,
+// //Vendor: "voltha-test",
+// //Model: "latest-voltha-simulated-olt",
+// //HardwareVersion: "1.0.0",
+// //FirmwareVersion: "1.0.0",
+// //Images: &voltha.Images{},
+// //SerialNumber: "abcdef-123456",
+// //VendorId: "DEADBEEF-INC",
+// //Adapter: "simulated_olt",
+// //Vlan: 1234,
+// Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+// //ExtraArgs: "",
+// //ProxyAddress: &voltha.Device_ProxyAddress{},
+// AdminState: voltha.AdminState_PREPROVISIONED,
+// //OperStatus: common.OperStatus_ACTIVE,
+// //Reason: "",
+// //ConnectStatus: common.ConnectStatus_REACHABLE,
+// //Custom: &any.Any{},
+// //Ports: ports,
+// //Flows: &openflow_13.Flows{},
+// //FlowGroups: &openflow_13.FlowGroups{},
+// //PmConfigs: &voltha.PmConfigs{},
+// //ImageDownloads: []*voltha.ImageDownload{},
+// }
+//
+// //if retrieved := pt.Proxy.Get("/devices/00019b09a90bbe17", 0, false, ""); retrieved == nil {
+// // t.Error("Failed to get device")
+// //} else {
+// // devIdBin, _ := uuid.New().MarshalBinary()
+// // devId = "0001" + hex.EncodeToString(devIdBin)[:12]
+// // newDevice := Clone(de\).(*voltha.Device)
+// // newDevice.Id = devId
+//
+// if added := pt.Proxy.Add("/devices", device, ""); added == nil {
+// t.Error("Failed to add device")
+// } else {
+// t.Logf("Added device : %+v", added)
+// }
+// //}
+//
+//}
+func Test_Proxy_3_AddDevice(t *testing.T) {
+ devIdBin, _ := uuid.New().MarshalBinary()
+ devId = "0001" + hex.EncodeToString(devIdBin)[:12]
+
+ device := &voltha.Device{
+ Id: devId,
+ Type: "simulated_olt",
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ }
+
+ if added := pt.Proxy.Add("/devices", device, ""); added == nil {
+ t.Error("Failed to add device")
+ } else {
+ t.Logf("Added device : %+v", added)
+ }
+}
+
+func Test_Proxy_4_CheckAddedDevice(t *testing.T) {
+ if d := pt.Proxy.Get("/devices/" + devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find added device")
+ } else {
+ djson, _ := json.Marshal(d)
+
+ t.Logf("Found device: count: %s", djson)
+ }
+}
+
+func Test_Proxy_5_UpdateDevice(t *testing.T) {
+ if retrieved := pt.Proxy.Get("/devices/" + targetDeviceId, 1, false, ""); retrieved == nil {
+ t.Error("Failed to get device")
+ } else {
+ var fwVersion int
+ if retrieved.(*voltha.Device).FirmwareVersion == "n/a" {
+ fwVersion = 0
+ } else {
+ fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
+ fwVersion += 1
+ }
+
+ cloned := reflect.ValueOf(retrieved).Elem().Interface().(voltha.Device)
+ cloned.FirmwareVersion = strconv.Itoa(fwVersion)
+ t.Logf("Before update : %+v", cloned)
+
+ if afterUpdate := pt.Proxy.Update("/devices/" + targetDeviceId, &cloned, false, ""); afterUpdate == nil {
+ t.Error("Failed to update device")
+ } else {
+ t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
+ }
+ }
+}
+
+func Test_Proxy_6_CheckUpdatedDevice(t *testing.T) {
+ device := pt.Proxy.Get("/devices/" + targetDeviceId, 0, false, "")
+
+ t.Logf("content of updated device: %+v", device)
+}
+
+func Test_Proxy_7_RemoveDevice(t *testing.T) {
+ if removed := pt.Proxy.Remove("/devices/" + devId, ""); removed == nil {
+ t.Error("Failed to remove device")
+ } else {
+ t.Logf("Removed device : %+v", removed)
+ }
+}
+
+func Test_Proxy_8_CheckRemovedDevice(t *testing.T) {
+ if d := pt.Proxy.Get("/devices/" + devId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ djson, _ := json.Marshal(d)
+ t.Errorf("Device was not removed - %s", djson)
+ } else {
+ t.Logf("Device was removed: %s", devId)
+ }
+}
diff --git a/db/model/revision.go b/db/model/revision.go
index 07c3388..44f97b2 100644
--- a/db/model/revision.go
+++ b/db/model/revision.go
@@ -15,144 +15,21 @@
*/
package model
-import (
- "bytes"
- "crypto/md5"
- "fmt"
- "reflect"
- "sort"
-)
-
-var (
- RevisionCache = make(map[string]interface{})
-)
-
-type Revision struct {
- Config *DataRevision
- Children map[string][]*Revision
- Hash string
- branch *Branch
- WeakRef string
-}
-
-func NewRevision(branch *Branch, data interface{}, children map[string][]*Revision) *Revision {
- cr := &Revision{}
- cr.branch = branch
- cr.Config = NewDataRevision(data)
- cr.Children = children
- cr.finalize()
-
- return cr
-}
-
-func (cr *Revision) finalize() {
- cr.Hash = cr.hashContent()
-
- if _, exists := RevisionCache[cr.Hash]; !exists {
- RevisionCache[cr.Hash] = cr
- }
- if _, exists := RevisionCache[cr.Config.Hash]; !exists {
- RevisionCache[cr.Config.Hash] = cr.Config
- } else {
- cr.Config = RevisionCache[cr.Config.Hash].(*DataRevision)
- }
-}
-
-func (cr *Revision) hashContent() string {
- var buffer bytes.Buffer
- var childrenKeys []string
-
- if cr.Config != nil {
- buffer.WriteString(cr.Config.Hash)
- }
-
- for key, _ := range cr.Children {
- childrenKeys = append(childrenKeys, key)
- }
- sort.Strings(childrenKeys)
-
- if cr.Children != nil && len(cr.Children) > 0 {
- // Loop through sorted Children keys
- for _, key := range childrenKeys {
- for _, child := range cr.Children[key] {
- buffer.WriteString(child.Hash)
- }
- }
- }
-
- return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
-}
-
-func (cr *Revision) getData() interface{} {
- if cr.Config == nil {
- return nil
- }
- return cr.Config.Data
-}
-
-func (cr *Revision) getNode() *Node {
- return cr.branch.node
-}
-
-func (cr *Revision) getType() reflect.Type {
- // TODO: what is this returning really?
- return reflect.TypeOf(cr.getData())
-}
-
-func (cr *Revision) clearHash() {
- cr.Hash = ""
-}
-
-func (cr *Revision) Get(depth int) interface{} {
- originalData := cr.getData()
- data := Clone(originalData)
-
- if depth > 0 {
- for fieldName, field := range ChildrenFields(cr.getType()) {
- if field.IsContainer {
- for _, rev := range cr.Children[fieldName] {
- childData := rev.Get(depth - 1)
- childDataHolder := GetAttributeValue(data, fieldName, 0)
- // TODO: merge with childData
- fmt.Printf("data:%+v, dataHolder:%+v", childData, childDataHolder)
- }
- } else {
- rev := cr.Children[fieldName][0]
- childData := rev.Get(depth - 1)
- childDataHolder := GetAttributeValue(data, fieldName, 0)
- // TODO: merge with childData
- fmt.Printf("data:%+v, dataHolder:%+v", childData, childDataHolder)
- }
- }
- }
- return data
-}
-
-func (cr *Revision) UpdateData(data interface{}, branch *Branch) *Revision {
- newRev := Clone(cr).(*Revision)
- newRev.branch = branch
- newRev.Config = data.(*DataRevision)
- newRev.finalize()
-
- return newRev
-}
-
-func (cr *Revision) UpdateChildren(name string, children []*Revision, branch *Branch) *Revision {
- newChildren := Clone(cr.Children).(map[string][]*Revision)
- newChildren[name] = children
- newRev := Clone(cr).(*Revision)
- newRev.branch = branch
- newRev.Children = newChildren
- newRev.finalize()
-
- return newRev
-}
-
-func (cr *Revision) UpdateAllChildren(children map[string][]*Revision, branch *Branch) *Revision {
- newRev := Clone(cr).(*Revision)
- newRev.branch = branch
- newRev.Children = children
- newRev.finalize()
-
- return newRev
+type Revision interface {
+ Finalize()
+ SetConfig(revision *DataRevision)
+ GetConfig() *DataRevision
+ SetChildren(children map[string][]Revision)
+ GetChildren() map[string][]Revision
+ SetHash(hash string)
+ GetHash() string
+ ClearHash()
+ SetBranch(branch *Branch)
+ GetBranch() *Branch
+ Get(int) interface{}
+ GetData() interface{}
+ GetNode() *Node
+ UpdateData(data interface{}, branch *Branch) Revision
+ UpdateChildren(name string, children []Revision, branch *Branch) Revision
+ UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision
}
diff --git a/db/model/revision_test.go b/db/model/revision_test.go
index e7da6a5..5a88ff3 100644
--- a/db/model/revision_test.go
+++ b/db/model/revision_test.go
@@ -16,15 +16,15 @@
package model
import (
- "github.com/opencord/voltha/protos/go/voltha"
+ "github.com/opencord/voltha-go/protos/voltha"
"testing"
)
func Test_Revision_01_New(t *testing.T) {
branch := &Branch{}
data := &voltha.Device{}
- children := make(map[string][]*Revision)
- rev := NewRevision(branch, data, children)
+ children := make(map[string][]Revision)
+ rev := NewNonPersistedRevision(branch, data, children)
t.Logf("New revision created: %+v\n", rev)
}
diff --git a/db/model/root.go b/db/model/root.go
index 27a6746..5adb99d 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -21,6 +21,8 @@
"fmt"
"github.com/google/uuid"
"reflect"
+ "time"
+ "github.com/opencord/voltha-go/common/log"
)
type Root struct {
@@ -29,8 +31,8 @@
KvStore *Backend
Loading bool
RevisionClass interface{}
- Callbacks []func() interface{}
- NotificationCallbacks []func() interface{}
+ Callbacks []func()
+ NotificationCallbacks []func()
}
func NewRoot(initialData interface{}, kvStore *Backend, revisionClass interface{}) *Root {
@@ -38,24 +40,27 @@
root.KvStore = kvStore
root.DirtyNodes = make(map[string]*Node)
root.Loading = false
- if kvStore != nil /*&& FIXME: RevisionClass is a subclass of PersistedConfigRevision */ {
+ if kvStore != nil /*&& TODO: RevisionClass is not a subclass of PersistedRevision ??? */ {
revisionClass = reflect.TypeOf(PersistedRevision{})
}
root.RevisionClass = revisionClass
- root.Callbacks = []func() interface{}{}
- root.NotificationCallbacks = []func() interface{}{}
+ root.Callbacks = []func(){}
+ root.NotificationCallbacks = []func(){}
root.Node = NewNode(root, initialData, false, "")
return root
}
-func (r *Root) makeRevision(branch *Branch, data interface{}, children map[string][]*Revision) *Revision {
+func (r *Root) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ if r.RevisionClass.(reflect.Type) == reflect.TypeOf(PersistedRevision{}) {
+ return NewPersistedRevision(branch, data, children)
+ }
- return &Revision{}
+ return NewNonPersistedRevision(branch, data, children)
}
-func (r *Root) makeTxBranch() string {
+func (r *Root) MakeTxBranch() string {
txid_bin, _ := uuid.New().MarshalBinary()
txid := hex.EncodeToString(txid_bin)[:12]
r.DirtyNodes[txid] = r.Node
@@ -63,21 +68,20 @@
return txid
}
-func (r *Root) deleteTxBranch(txid string) {
+func (r *Root) DeleteTxBranch(txid string) {
for _, dirtyNode := range r.DirtyNodes {
dirtyNode.deleteTxBranch(txid)
}
delete(r.DirtyNodes, txid)
}
-func (r *Root) foldTxBranch(txid string) {
- // TODO: implement foldTxBranch
- // if err := r.Node.mergeTxBranch(txid, dryRun=true); err != nil {
- // r.deleteTxBranch(txid)
- // } else {
- // r.Node.mergeTxBranch(txid)
- // r.executeCallbacks()
- // }
+func (r *Root) FoldTxBranch(txid string) {
+ if err := r.Node.mergeTxBranch(txid, true); err != nil {
+ r.DeleteTxBranch(txid)
+ } else {
+ r.Node.mergeTxBranch(txid, false)
+ r.executeCallbacks()
+ }
}
func (r *Root) executeCallbacks() {
@@ -97,16 +101,15 @@
return len(r.Callbacks) == 0
}
-func (r *Root) addCallback(callback func() interface{}) {
+func (r *Root) addCallback(callback func()) {
r.Callbacks = append(r.Callbacks, callback)
}
-func (r *Root) addNotificationCallback(callback func() interface{}) {
+func (r *Root) addNotificationCallback(callback func()) {
r.NotificationCallbacks = append(r.NotificationCallbacks, callback)
}
-func (r *Root) Update(path string, data interface{}, strict bool, txid string, makeBranch t_makeBranch) *Revision {
- var result *Revision
- // FIXME: the more i look at this... i think i need to implement an interface for Node & root
+func (r *Root) Update(path string, data interface{}, strict bool, txid string, makeBranch t_makeBranch) Revision {
+ var result Revision
if makeBranch == nil {
// TODO: raise error
@@ -133,9 +136,8 @@
return result
}
-func (r *Root) Add(path string, data interface{}, txid string, makeBranch t_makeBranch) *Revision {
- var result *Revision
- // FIXME: the more i look at this... i think i need to implement an interface for Node & root
+func (r *Root) Add(path string, data interface{}, txid string, makeBranch t_makeBranch) Revision {
+ var result Revision
if makeBranch == nil {
// TODO: raise error
@@ -162,9 +164,8 @@
return result
}
-func (r *Root) Remove(path string, txid string, makeBranch t_makeBranch) *Revision {
- var result *Revision
- // FIXME: the more i look at this... i think i need to implement an interface for Node & root
+func (r *Root) Remove(path string, txid string, makeBranch t_makeBranch) Revision {
+ var result Revision
if makeBranch == nil {
// TODO: raise error
@@ -199,13 +200,36 @@
return r
}
+func (r *Root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement map[CallbackType][]interface{}) {
+ r.Node.MakeLatest(branch, revision, changeAnnouncement)
+
+ if r.KvStore != nil && branch.Txid == "" {
+ tags := make(map[string]string)
+ for k, v := range r.Tags {
+ tags[k] = v.GetHash()
+ }
+ data := &rootData{
+ Latest: branch.Latest.GetHash(),
+ Tags: tags,
+ }
+ if blob, err := json.Marshal(data); err != nil {
+ // TODO report error
+ } else {
+ log.Debugf("Changing root to : %s", string(blob))
+ if err := r.KvStore.Put("root", blob); err != nil {
+ log.Errorf("failed to properly put value in kvstore - err: %s", err.Error())
+ }
+ }
+ }
+}
+
func (r *Root) LoadLatest(hash string) {
r.Node.LoadLatest(r.KvStore, hash)
}
type rootData struct {
- Latest string `json:GetLatest`
- Tags map[string]string `json:Tags`
+ Latest string `json:latest`
+ Tags map[string]string `json:tags`
}
func (r *Root) loadFromPersistence(rootClass interface{}) {
@@ -214,10 +238,12 @@
r.Loading = true
blob, _ := r.KvStore.Get("root")
+ start := time.Now()
if err := json.Unmarshal(blob.Value.([]byte), &data); err != nil {
fmt.Errorf("problem to unmarshal blob - error:%s\n", err.Error())
}
-
+ stop := time.Now()
+ GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
for tag, hash := range data.Tags {
r.Node.LoadLatest(r.KvStore, hash)
r.Node.Tags[tag] = r.Node.Latest()
diff --git a/db/model/root_test.go b/db/model/root_test.go
index de58800..1887a0a 100644
--- a/db/model/root_test.go
+++ b/db/model/root_test.go
@@ -18,7 +18,7 @@
import (
"encoding/json"
"fmt"
- "github.com/opencord/voltha/protos/go/voltha"
+ "github.com/opencord/voltha-go/protos/voltha"
"testing"
"time"
)
@@ -44,11 +44,11 @@
r := root.Load(msgClass)
afterLoad := time.Now()
- fmt.Printf(">>>>>>>>>>>>> Time to load : %f\n", afterLoad.Sub(start).Seconds())
+ fmt.Printf(">>>>>>>>>>>>> Time to Load : %f\n", afterLoad.Sub(start).Seconds())
d := r.Node.Get(deviceProxy, "", 0, false, "")
afterGet := time.Now()
- fmt.Printf(">>>>>>>>>>>>> Time to load and get: %f\n", afterGet.Sub(start).Seconds())
+ fmt.Printf(">>>>>>>>>>>>> Time to Load and get: %f\n", afterGet.Sub(start).Seconds())
jr, _ := json.Marshal(r)
fmt.Printf("Content of ROOT --> \n%s\n", jr)
diff --git a/db/model/transaction.go b/db/model/transaction.go
index b2afe83..1bed0d1 100644
--- a/db/model/transaction.go
+++ b/db/model/transaction.go
@@ -29,34 +29,34 @@
}
return tx
}
-func (t *Transaction) Get(path string, depth int, deep bool) *Revision {
+func (t *Transaction) Get(path string, depth int, deep bool) Revision {
if t.txid == "" {
fmt.Errorf("closed transaction")
return nil
}
// TODO: need to review the return values at the different layers!!!!!
- return t.proxy.Get(path, depth, deep, t.txid).(*Revision)
+ return t.proxy.Get(path, depth, deep, t.txid).(Revision)
}
-func (t *Transaction) Update(path string, data interface{}, strict bool) *Revision {
+func (t *Transaction) Update(path string, data interface{}, strict bool) Revision {
if t.txid == "" {
fmt.Errorf("closed transaction")
return nil
}
- return t.proxy.Update(path, data, strict, t.txid).(*Revision)
+ return t.proxy.Update(path, data, strict, t.txid).(Revision)
}
-func (t *Transaction) Add(path string, data interface{}) *Revision {
+func (t *Transaction) Add(path string, data interface{}) Revision {
if t.txid == "" {
fmt.Errorf("closed transaction")
return nil
}
- return t.proxy.Add(path, data, t.txid).(*Revision)
+ return t.proxy.Add(path, data, t.txid).(Revision)
}
-func (t *Transaction) Remove(path string) *Revision {
+func (t *Transaction) Remove(path string) Revision {
if t.txid == "" {
fmt.Errorf("closed transaction")
return nil
}
- return t.proxy.Remove(path, t.txid).(*Revision)
+ return t.proxy.Remove(path, t.txid).(Revision)
}
func (t *Transaction) Cancel() {
t.proxy.cancelTransaction(t.txid)
diff --git a/db/model/utils.go b/db/model/utils.go
index 497b3f6..65c1c4a 100644
--- a/db/model/utils.go
+++ b/db/model/utils.go
@@ -129,22 +129,24 @@
// FIXME: Need to figure out if GetAttributeValue and GetAttributeStructure can become one
// Code is repeated in both, but outputs have a different purpose
// Left as-is for now to get things working
-func GetAttributeValue(data interface{}, name string, depth int) reflect.Value {
- var result reflect.Value
+func GetAttributeValue(data interface{}, name string, depth int) (string, reflect.Value) {
+ var attribName string
+ var attribValue reflect.Value
obj := reflect.ValueOf(data)
if !obj.IsValid() {
- return result
+ return attribName, attribValue
}
k := obj.Kind()
switch k {
case reflect.Ptr:
- t := obj.Type().Elem()
- n := reflect.New(t)
+ if obj.IsNil() {
+ return attribName, attribValue
+ }
- if rc := GetAttributeValue(n.Elem().Interface(), name, depth+1); rc.IsValid() {
- return rc
+ if attribName, attribValue = GetAttributeValue(obj.Elem().Interface(), name, depth+1); attribValue.IsValid() {
+ return attribName, attribValue
}
case reflect.Struct:
@@ -152,12 +154,12 @@
json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
if json[0] == name {
- return obj.Field(i)
+ return obj.Type().Field(i).Name, obj.Field(i)
}
if obj.Field(i).IsValid() {
- if rc := GetAttributeValue(obj.Field(i).Interface(), name, depth+1); rc.IsValid() {
- return rc
+ if attribName, attribValue = GetAttributeValue(obj.Field(i).Interface(), name, depth+1); attribValue.IsValid() {
+ return attribName, attribValue
}
}
}
@@ -168,15 +170,15 @@
n.Elem().Set(s)
for i := 0; i < obj.Len(); i += 1 {
- if rc := GetAttributeValue(obj.Index(i).Interface(), name, depth+1); rc.IsValid() {
- return rc
+ if attribName, attribValue = GetAttributeValue(obj.Index(i).Interface(), name, depth+1); attribValue.IsValid() {
+ return attribName, attribValue
}
}
default:
//fmt.Printf("%s Unhandled <%+v> ... It's a %+v\n", prefix, obj, k)
}
- return result
+ return attribName, attribValue
}
@@ -234,7 +236,7 @@
}
-func Clone(a interface{}) interface{} {
+func Clone2(a interface{}) interface{} {
b := reflect.ValueOf(a)
buff := new(bytes.Buffer)
enc := gob.NewEncoder(buff)
@@ -244,3 +246,12 @@
return b.Interface()
}
+
+func Clone(a, b interface{}) interface{} {
+ buff := new(bytes.Buffer)
+ enc := gob.NewEncoder(buff)
+ dec := gob.NewDecoder(buff)
+ enc.Encode(a)
+ dec.Decode(b)
+ return b
+}
diff --git a/db/model/utils_test.go b/db/model/utils_test.go
new file mode 100644
index 0000000..caf540c
--- /dev/null
+++ b/db/model/utils_test.go
@@ -0,0 +1,25 @@
+package model
+
+import (
+ "testing"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "reflect"
+)
+
+func Test_Utils_Clone(t *testing.T) {
+ a := &voltha.Device{
+ Id: "abcde",
+ FirmwareVersion: "someversion",
+ }
+ b:= &voltha.Device{}
+ Clone(reflect.ValueOf(a).Interface(), b)
+ t.Logf("A: %+v, B: %+v", a, b)
+ b.Id = "12345"
+ t.Logf("A: %+v, B: %+v", a, b)
+
+ var c *voltha.Device
+ c = Clone2(a).(*voltha.Device)
+ t.Logf("A: %+v, C: %+v", a, c)
+ c.Id = "12345"
+ t.Logf("A: %+v, C: %+v", a, c)
+}