VOL-1027 : Initial commit of voltha 2.0 data model
Change-Id: Ib8006de1af2166281ccf1c9d7c2b9156991bf4e4
diff --git a/db/model/backend.go b/db/model/backend.go
new file mode 100644
index 0000000..c327fc3
--- /dev/null
+++ b/db/model/backend.go
@@ -0,0 +1,63 @@
+package model
+
+import (
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "strconv"
+)
+
+//TODO: missing cache stuff
+//TODO: missing retry stuff
+//TODO: missing proper logging
+
+type Backend struct {
+ Client kvstore.Client
+ StoreType string
+ Host string
+ Port int
+ Timeout int
+ PathPrefix string
+}
+
+func NewBackend(storeType string, host string, port int, timeout int, pathPrefix string) *Backend {
+ var err error
+
+ b := &Backend{
+ StoreType: storeType,
+ Host: host,
+ Port: port,
+ Timeout: timeout,
+ PathPrefix: pathPrefix,
+ }
+
+ address := host + ":" + strconv.Itoa(port)
+ if b.Client, err = b.newClient(address, timeout); err != nil {
+ fmt.Errorf("failed to create a new kv Client - %s", err.Error())
+ }
+
+ return b
+}
+
+func (b *Backend) newClient(address string, timeout int) (kvstore.Client, error) {
+ switch b.StoreType {
+ case "consul":
+ return kvstore.NewConsulClient(address, timeout)
+ case "etcd":
+ return kvstore.NewEtcdClient(address, timeout)
+ }
+ return nil, errors.New("Unsupported KV store")
+}
+
+func (b *Backend) makePath(key string) string {
+ return fmt.Sprintf("%s/%s", b.PathPrefix, key)
+}
+func (b *Backend) Get(key string) (*kvstore.KVPair, error) {
+ return b.Client.Get(b.makePath(key), b.Timeout)
+}
+func (b *Backend) Put(key string, value interface{}) error {
+ return b.Client.Put(b.makePath(key), value, b.Timeout)
+}
+func (b *Backend) Delete(key string) error {
+ return b.Client.Delete(b.makePath(key), b.Timeout)
+}
diff --git a/db/model/backend_test.go b/db/model/backend_test.go
new file mode 100644
index 0000000..f51351c
--- /dev/null
+++ b/db/model/backend_test.go
@@ -0,0 +1,114 @@
+package model
+
+import (
+ "encoding/json"
+ "fmt"
+ "testing"
+)
+
+const (
+ ETCD_KV = "etcd"
+ CONSUL_KV = "consul"
+ INVALID_KV = "invalid"
+
+ etcd_host = "10.104.149.247"
+ etcd_port = 2379
+
+ /*
+ To debug locally with the remote ETCD container
+
+ ssh -f -N vagrant@10.100.198.220 -L 22379:10.104.149.247:2379
+ */
+ //etcd_host = "localhost"
+ //etcd_port = 22379
+ consul_host = "k8s-consul"
+ consul_port = 30080
+ timeout = 5
+ prefix = "backend/test"
+ key = "stephane/1"
+ value = "barbarie"
+)
+
+var (
+ etcd_backend *Backend
+ consul_backend *Backend
+)
+
+func Test_Etcd_Backend_New(t *testing.T) {
+ etcd_backend = NewBackend(ETCD_KV, etcd_host, etcd_port, timeout, prefix)
+}
+
+func Test_Etcd_Backend_Put(t *testing.T) {
+ etcd_backend.Put(key, value)
+
+}
+
+func Test_Etcd_Backend_Get(t *testing.T) {
+ if pair, err := etcd_backend.Client.Get("service/voltha/data/core/0001/root", timeout); err != nil {
+ t.Errorf("backend get failed - %s", err.Error())
+ } else {
+ j, _ := json.Marshal(pair)
+ t.Logf("pair: %s", string(j))
+ }
+}
+
+func Test_Etcd_Backend_GetRoot(t *testing.T) {
+ if pair, err := etcd_backend.Get(key); err != nil {
+ t.Errorf("backend get failed - %s", err.Error())
+ } else {
+ j, _ := json.Marshal(pair)
+ t.Logf("pair: %s", string(j))
+ if pair.Key != (prefix + "/" + key) {
+ t.Errorf("backend key differs - key: %s, expected: %s", pair.Key, key)
+ }
+
+ s := fmt.Sprintf("%s", pair.Value)
+ if s != value {
+ t.Errorf("backend value differs - value: %s, expected:%s", pair.Value, value)
+ }
+ }
+}
+
+func Test_Etcd_Backend_Delete(t *testing.T) {
+ if err := etcd_backend.Delete(key); err != nil {
+ t.Errorf("backend delete failed - %s", err.Error())
+ }
+ //if _, err := backend.Client.Get(key, Timeout); err == nil {
+ // t.Errorf("backend delete failed - %s", err.Error())
+ //}
+}
+
+func Test_Consul_Backend_New(t *testing.T) {
+ consul_backend = NewBackend(CONSUL_KV, consul_host, consul_port, timeout, prefix)
+}
+
+func Test_Consul_Backend_Put(t *testing.T) {
+ consul_backend.Put(key, value)
+
+}
+
+func Test_Consul_Backend_Get(t *testing.T) {
+ if pair, err := consul_backend.Get(key); err != nil {
+ t.Errorf("backend get failed - %s", err.Error())
+ } else {
+ j, _ := json.Marshal(pair)
+ t.Logf("pair: %s", string(j))
+ if pair.Key != (prefix + "/" + key) {
+ t.Errorf("backend key differs - key: %s, expected: %s", pair.Key, key)
+ }
+
+ v := fmt.Sprintf("%s", pair.Value)
+ if v != value {
+ t.Errorf("backend value differs - value: %s, expected:%s", pair.Value, value)
+ }
+ }
+}
+
+func Test_Consul_Backend_Delete(t *testing.T) {
+ if err := consul_backend.Delete(key); err != nil {
+ t.Errorf("backend delete failed - %s", err.Error())
+ }
+ //if _, err := backend.Client.Get(key, Timeout); err == nil {
+ // t.Errorf("backend delete failed - %s", err.Error())
+ //}
+}
diff --git a/db/model/branch.go b/db/model/branch.go
new file mode 100644
index 0000000..8a37157
--- /dev/null
+++ b/db/model/branch.go
@@ -0,0 +1,34 @@
+package model
+
+// TODO: implement weak references or something equivalent
+// TODO: missing proper logging
+
+type Branch struct {
+ node *Node
+ Txid string
+ origin *Revision
+ revisions map[string]*Revision
+ Latest *Revision
+}
+
+func NewBranch(node *Node, txid string, origin *Revision, autoPrune bool) *Branch {
+ cb := &Branch{}
+ cb.node = node
+ cb.Txid = txid
+ cb.origin = origin
+ cb.revisions = make(map[string]*Revision)
+ cb.Latest = origin
+
+ return cb
+}
+
+// TODO: Check if the following are required
+func (cb *Branch) get(hash string) *Revision {
+ return cb.revisions[hash]
+}
+func (cb *Branch) GetLatest() *Revision {
+ return cb.Latest
+}
+func (cb *Branch) GetOrigin() *Revision {
+ return cb.origin
+}
diff --git a/db/model/branch_test.go b/db/model/branch_test.go
new file mode 100644
index 0000000..4016b70
--- /dev/null
+++ b/db/model/branch_test.go
@@ -0,0 +1,56 @@
+package model
+
+import (
+ "crypto/md5"
+ "fmt"
+ "testing"
+)
+
+var (
+ BRANCH *Branch
+ HASH string
+)
+
+func Test_ConfigBranch_New(t *testing.T) {
+ node := &Node{}
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("origin_hash")))
+ origin := &Revision{
+ Config: &DataRevision{},
+ Children: make(map[string][]*Revision),
+ Hash: hash,
+ branch: &Branch{},
+ WeakRef: "need to fix this",
+ }
+ txid := fmt.Sprintf("%x", md5.Sum([]byte("branch_transaction_id")))
+
+ BRANCH = NewBranch(node, txid, origin, true)
+
+ t.Logf("New branch created: %+v\n", BRANCH)
+}
+
+func Test_ConfigBranch_AddRevision(t *testing.T) {
+ HASH = fmt.Sprintf("%x", md5.Sum([]byte("revision_hash")))
+ rev := &Revision{
+ Config: &DataRevision{},
+ Children: make(map[string][]*Revision),
+ Hash: HASH,
+ branch: &Branch{},
+ WeakRef: "need to fix this",
+ }
+
+ BRANCH.revisions[HASH] = rev
+ t.Logf("Added revision: %+v\n", rev)
+}
+
+func Test_ConfigBranch_GetRevision(t *testing.T) {
+ rev := BRANCH.get(HASH)
+ t.Logf("Got revision for hash:%s rev:%+v\n", HASH, rev)
+}
+func Test_ConfigBranch_LatestRevision(t *testing.T) {
+ rev := BRANCH.GetLatest()
+ t.Logf("Got GetLatest revision:%+v\n", rev)
+}
+func Test_ConfigBranch_OriginRevision(t *testing.T) {
+ rev := BRANCH.origin
+ t.Logf("Got origin revision:%+v\n", rev)
+}
diff --git a/db/model/callback_type.go b/db/model/callback_type.go
new file mode 100644
index 0000000..42ff608
--- /dev/null
+++ b/db/model/callback_type.go
@@ -0,0 +1,29 @@
+package model
+
+type CallbackType uint8
+
+const (
+ GET CallbackType = iota
+ PRE_UPDATE
+ POST_UPDATE
+ PRE_ADD
+ POST_ADD
+ PRE_REMOVE
+ POST_REMOVE
+ POST_LISTCHANGE
+)
+
+var enumCallbackTypes = []string{
+ "GET",
+ "PRE_UPDATE",
+ "POST_UPDATE",
+ "PRE_ADD",
+ "POST_ADD",
+ "PRE_REMOVE",
+ "POST_REMOVE",
+ "POST_LISTCHANGE",
+}
+
+func (t CallbackType) String() string {
+ return enumCallbackTypes[t]
+}
diff --git a/db/model/child_type.go b/db/model/child_type.go
new file mode 100644
index 0000000..73f79a3
--- /dev/null
+++ b/db/model/child_type.go
@@ -0,0 +1,107 @@
+package model
+
+import (
+ "fmt"
+ desc "github.com/golang/protobuf/descriptor"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/protoc-gen-go/descriptor"
+ "github.com/opencord/voltha/protos/go/common"
+ "reflect"
+ "strconv"
+ "sync"
+)
+
+type singleton struct {
+ ChildrenFieldsCache map[interface{}]map[string]*ChildType
+}
+
+var instance *singleton
+var once sync.Once
+
+func GetInstance() *singleton {
+ once.Do(func() {
+ instance = &singleton{}
+ })
+ return instance
+}
+
+type ChildType struct {
+ ClassModule string
+ ClassType reflect.Type
+ IsContainer bool
+ Key string
+ KeyFromStr func(s string) interface{}
+}
+
+func ChildrenFields(cls interface{}) map[string]*ChildType {
+ if cls == nil {
+ return nil
+ }
+ var names map[string]*ChildType
+ var names_exist bool
+
+ if GetInstance().ChildrenFieldsCache == nil {
+ GetInstance().ChildrenFieldsCache = make(map[interface{}]map[string]*ChildType)
+ }
+
+ msgType := reflect.TypeOf(cls)
+
+ if names, names_exist = GetInstance().ChildrenFieldsCache[msgType.String()]; !names_exist {
+ names = make(map[string]*ChildType)
+
+ _, md := desc.ForMessage(cls.(desc.Message))
+
+ // TODO: Do we need to validate MD for nil, panic or exception?
+ for _, field := range md.Field {
+ if options := field.GetOptions(); options != nil {
+ if proto.HasExtension(options, common.E_ChildNode) {
+ isContainer := *field.Label == descriptor.FieldDescriptorProto_LABEL_REPEATED
+ meta, _ := proto.GetExtension(options, common.E_ChildNode)
+ var keyFromStr func(string) interface{}
+
+ if meta.(*common.ChildNode).GetKey() == "" {
+ fmt.Println("Child key is empty ... moving on")
+ } else {
+ parentType := FindOwnerType(reflect.ValueOf(cls), field.GetName(), 0, false)
+ keyType := FindKeyOwner(reflect.New(parentType).Elem().Interface(), meta.(*common.ChildNode).GetKey(), 0)
+
+ switch keyType.(reflect.Type).Name() {
+ case "string":
+ keyFromStr = func(s string) interface{} {
+ return s
+ }
+ case "int32":
+ fallthrough
+ case "int64":
+ fallthrough
+ case "uint32":
+ fallthrough
+ case "uint64":
+ keyFromStr = func(s string) interface{} {
+ i, _ := strconv.Atoi(s)
+ return i
+ }
+ default:
+ fmt.Errorf("Key type not implemented - type: %s\n", keyType.(reflect.Type))
+ }
+
+ ct := ChildType{
+ ClassModule: parentType.String(),
+ ClassType: parentType,
+ IsContainer: isContainer,
+ Key: meta.(*common.ChildNode).GetKey(),
+ KeyFromStr: keyFromStr,
+ }
+
+ names[field.GetName()] = &ct
+
+ }
+ }
+ }
+ }
+
+ GetInstance().ChildrenFieldsCache[msgType.String()] = names
+ }
+
+ return names
+}
diff --git a/db/model/child_type_test.go b/db/model/child_type_test.go
new file mode 100644
index 0000000..6996a6d
--- /dev/null
+++ b/db/model/child_type_test.go
@@ -0,0 +1,57 @@
+package model
+
+import (
+ "fmt"
+ "github.com/opencord/voltha/protos/go/voltha"
+ "reflect"
+ "testing"
+)
+
+/*
+
+ */
+func Test_ChildType_01_CacheIsEmpty(t *testing.T) {
+ if GetInstance().ChildrenFieldsCache != nil || len(GetInstance().ChildrenFieldsCache) > 0 {
+ t.Errorf("GetInstance().ChildrenFieldsCache should be empty: %+v\n", GetInstance().ChildrenFieldsCache)
+ }
+ t.Logf("GetInstance().ChildrenFieldsCache is empty - %+v\n", GetInstance().ChildrenFieldsCache)
+}
+
+/*
+
+ */
+func Test_ChildType_02_Device_Proto_ChildrenFields(t *testing.T) {
+
+ var cls *voltha.Device
+ //cls = &voltha.Device{Id:"testing-Config-rev-id"}
+
+ names := ChildrenFields(cls)
+
+ tst := reflect.ValueOf(cls).Elem().FieldByName("ImageDownloads")
+
+ fmt.Printf("############ Field by name : %+v\n", reflect.TypeOf(tst.Interface()))
+
+ if names == nil || len(names) == 0 {
+ t.Errorf("ChildrenFields failed to return names: %+v\n", names)
+ }
+ t.Logf("ChildrenFields returned names: %+v\n", names)
+}
+
+/*
+
+ */
+func Test_ChildType_03_CacheHasOneEntry(t *testing.T) {
+ if GetInstance().ChildrenFieldsCache == nil || len(GetInstance().ChildrenFieldsCache) != 1 {
+ t.Errorf("GetInstance().ChildrenFieldsCache should have one entry: %+v\n", GetInstance().ChildrenFieldsCache)
+ }
+ t.Logf("GetInstance().ChildrenFieldsCache has one entry: %+v\n", GetInstance().ChildrenFieldsCache)
+}
+
+/*
+
+ */
+func Test_ChildType_04_ChildrenFieldsCache_Keys(t *testing.T) {
+ for k := range GetInstance().ChildrenFieldsCache {
+ t.Logf("GetInstance().ChildrenFieldsCache Key:%+v\n", k)
+ }
+}
diff --git a/db/model/data_revision.go b/db/model/data_revision.go
new file mode 100644
index 0000000..deceb87
--- /dev/null
+++ b/db/model/data_revision.go
@@ -0,0 +1,48 @@
+package model
+
+import (
+ "bytes"
+ "crypto/md5"
+ "encoding/json"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "reflect"
+)
+
+type DataRevision struct {
+ Data interface{}
+ Hash string
+}
+
+func NewDataRevision(data interface{}) *DataRevision {
+ cdr := &DataRevision{}
+ cdr.Data = data
+ cdr.Hash = cdr.hashData(data)
+
+ return cdr
+}
+
+func (cr *DataRevision) hashData(data interface{}) string {
+ var buffer bytes.Buffer
+
+ if IsProtoMessage(data) {
+ if pbdata, err := proto.Marshal(data.(proto.Message)); err != nil {
+ fmt.Errorf("problem to marshal protobuf data --> err: %s", err.Error())
+ } else {
+ buffer.Write(pbdata)
+ }
+
+ } else if reflect.ValueOf(data).IsValid() {
+ dataObj := reflect.New(reflect.TypeOf(data).Elem())
+ if json, err := json.Marshal(dataObj.Interface()); err != nil {
+ fmt.Errorf("problem to marshal data --> err: %s", err.Error())
+ } else {
+ buffer.Write(json)
+ }
+ } else {
+ dataObj := reflect.New(reflect.TypeOf(data).Elem())
+ buffer.Write(dataObj.Bytes())
+ }
+
+ return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
+}
diff --git a/db/model/event_bus.go b/db/model/event_bus.go
new file mode 100644
index 0000000..a833d58
--- /dev/null
+++ b/db/model/event_bus.go
@@ -0,0 +1,63 @@
+package model
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/opencord/voltha/protos/go/voltha"
+)
+
+type EventBus struct {
+ client *EventBusClient
+ topic string
+}
+
+var (
+ IGNORED_CALLBACKS = map[CallbackType]struct{}{
+ PRE_ADD: {},
+ GET: {},
+ POST_LISTCHANGE: {},
+ PRE_REMOVE: {},
+ PRE_UPDATE: {},
+ }
+)
+
+func NewEventBus() *EventBus {
+ bus := &EventBus{
+ client: NewEventBusClient(),
+ topic: "model-change-events",
+ }
+ return bus
+}
+
+func (bus *EventBus) Advertise(eventType CallbackType, data interface{}, hash string) {
+ if _, ok := IGNORED_CALLBACKS[eventType]; ok {
+ fmt.Printf("ignoring event - type:%s, data:%+v\n", eventType, data)
+ }
+ var kind voltha.ConfigEventType_ConfigEventType
+ switch eventType {
+ case POST_ADD:
+ kind = voltha.ConfigEventType_add
+ case POST_REMOVE:
+ kind = voltha.ConfigEventType_remove
+ default:
+ kind = voltha.ConfigEventType_update
+ }
+
+ var msg []byte
+ var err error
+ if IsProtoMessage(data) {
+ if msg, err = json.Marshal(data); err != nil {
+ fmt.Errorf("problem marshalling data: %+v, err:%s\n", data, err.Error())
+ }
+ } else {
+ msg = data.([]byte)
+ }
+
+ event := voltha.ConfigEvent{
+ Type: kind,
+ Hash: hash,
+ Data: string(msg),
+ }
+
+ bus.client.Publish(bus.topic, event)
+}
diff --git a/db/model/event_bus_client.go b/db/model/event_bus_client.go
new file mode 100644
index 0000000..262b7cc
--- /dev/null
+++ b/db/model/event_bus_client.go
@@ -0,0 +1,17 @@
+package model
+
+import (
+ "fmt"
+ "github.com/opencord/voltha/protos/go/voltha"
+)
+
+type EventBusClient struct {
+}
+
+func NewEventBusClient() *EventBusClient {
+ return &EventBusClient{}
+}
+
+func (ebc *EventBusClient) Publish(topic string, event voltha.ConfigEvent) {
+ fmt.Printf("publishing event:%+v, topic:%s\n", event, topic)
+}
diff --git a/db/model/node.go b/db/model/node.go
new file mode 100644
index 0000000..936a79a
--- /dev/null
+++ b/db/model/node.go
@@ -0,0 +1,456 @@
+package model
+
+import (
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "reflect"
+ "strings"
+)
+
+const (
+ NONE string = "none"
+)
+
+type Node struct {
+ root *Root
+ Type interface{}
+ Branches map[string]*Branch
+ Tags map[string]*Revision
+ Proxy *Proxy
+ EventBus *EventBus
+ AutoPrune bool
+}
+
+func NewNode(root *Root, initialData interface{}, autoPrune bool, txid string) *Node {
+ cn := &Node{}
+
+ cn.root = root
+ cn.Branches = make(map[string]*Branch)
+ cn.Tags = make(map[string]*Revision)
+ cn.Proxy = nil
+ cn.EventBus = nil
+ cn.AutoPrune = autoPrune
+
+ if IsProtoMessage(initialData) {
+ cn.Type = reflect.ValueOf(initialData).Interface()
+ dataCopy := proto.Clone(initialData.(proto.Message))
+ cn.initialize(dataCopy, txid)
+ } else if reflect.ValueOf(initialData).IsValid() {
+ cn.Type = reflect.ValueOf(initialData).Interface()
+ } else {
+ // not implemented error
+ fmt.Errorf("cannot process initial data - %+v", initialData)
+ }
+
+ return cn
+}
+
+func (cn *Node) makeNode(data interface{}, txid string) *Node {
+ return NewNode(cn.root, data, true, txid)
+}
+
+func (cn *Node) makeRevision(branch *Branch, data interface{}, children map[string][]*Revision) *Revision {
+ return cn.root.makeRevision(branch, data, children)
+}
+
+func (cn *Node) makeLatest(branch *Branch, revision *Revision, changeAnnouncement map[string]interface{}) {
+ if _, ok := branch.revisions[revision.Hash]; !ok {
+ branch.revisions[revision.Hash] = revision
+ }
+
+ if branch.Latest == nil || revision.Hash != branch.Latest.Hash {
+ branch.Latest = revision
+ }
+
+ if changeAnnouncement != nil && branch.Txid == "" {
+ if cn.Proxy != nil {
+ for changeType, data := range changeAnnouncement {
+ // TODO: Invoke callback
+ fmt.Printf("invoking callback - changeType: %+v, data:%+v\n", changeType, data)
+ }
+ }
+
+ for changeType, data := range changeAnnouncement {
+ // TODO: send notifications
+ fmt.Printf("sending notification - changeType: %+v, data:%+v\n", changeType, data)
+ }
+ }
+}
+
+func (cn *Node) Latest() *Revision {
+ if branch, exists := cn.Branches[NONE]; exists {
+ return branch.Latest
+ }
+ return nil
+}
+
+func (cn *Node) GetHash(hash string) *Revision {
+ return cn.Branches[NONE].revisions[hash]
+}
+
+func (cn *Node) initialize(data interface{}, txid string) {
+ var children map[string][]*Revision
+ children = make(map[string][]*Revision)
+ for fieldName, field := range ChildrenFields(cn.Type) {
+ fieldValue := GetAttributeValue(data, fieldName, 0)
+
+ if fieldValue.IsValid() {
+ if field.IsContainer {
+ if field.Key != "" {
+ var keysSeen []string
+
+ for i := 0; i < fieldValue.Len(); i++ {
+ v := fieldValue.Index(i)
+ rev := cn.makeNode(v.Interface(), txid).Latest()
+ key := GetAttributeValue(v.Interface(), field.Key, 0)
+ for _, k := range keysSeen {
+ if k == key.String() {
+ fmt.Errorf("duplicate key - %s", k)
+ }
+ }
+ children[fieldName] = append(children[fieldName], rev)
+ keysSeen = append(keysSeen, key.String())
+ }
+
+ } else {
+ for i := 0; i < fieldValue.Len(); i++ {
+ v := fieldValue.Index(i)
+ children[fieldName] = append(children[fieldName], cn.makeNode(v.Interface(), txid).Latest())
+ }
+ }
+ } else {
+ children[fieldName] = append(children[fieldName], cn.makeNode(fieldValue.Interface(), txid).Latest())
+ }
+ } else {
+ fmt.Errorf("field is invalid - %+v", fieldValue)
+ }
+ }
+ // FIXME: ClearField??? No such method in go protos. Reset?
+ //data.ClearField(field_name)
+ branch := NewBranch(cn, "", nil, cn.AutoPrune)
+ rev := cn.makeRevision(branch, data, children)
+ cn.makeLatest(branch, rev, nil)
+ cn.Branches[txid] = branch
+}
+
+func (cn *Node) makeTxBranch(txid string) *Branch {
+ branchPoint := cn.Branches[NONE].Latest
+ branch := NewBranch(cn, txid, branchPoint, true)
+ cn.Branches[txid] = branch
+ return branch
+}
+
+func (cn *Node) deleteTxBranch(txid string) {
+ delete(cn.Branches, txid)
+}
+
+type t_makeBranch func(*Node) *Branch
+
+//
+// Get operation
+//
+func (cn *Node) Get(path string, hash string, depth int, deep bool, txid string) interface{} {
+ if deep {
+ depth = -1
+ }
+
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+
+ var branch *Branch
+ var rev *Revision
+
+ // FIXME: should empty txid be cleaned up?
+ if branch = cn.Branches[txid]; txid == "" || branch == nil {
+ branch = cn.Branches[NONE]
+ }
+
+ if hash != "" {
+ rev = branch.revisions[hash]
+ } else {
+ rev = branch.Latest
+ }
+
+ return cn.get(rev, path, depth)
+}
+
+func (cn *Node) findRevByKey(revs []*Revision, keyName string, value string) (int, *Revision) {
+ for i, rev := range revs {
+ dataValue := reflect.ValueOf(rev.Config.Data)
+ dataStruct := GetAttributeStructure(rev.Config.Data, keyName, 0)
+
+ fieldValue := dataValue.Elem().FieldByName(dataStruct.Name)
+
+ if fieldValue.Interface().(string) == value {
+ return i, rev
+ }
+ }
+
+ fmt.Errorf("key %s=%s not found", keyName, value)
+
+ return -1, nil
+}
+
+func (cn *Node) get(rev *Revision, path string, depth int) interface{} {
+ if path == "" {
+ return cn.doGet(rev, depth)
+ }
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+ path = partition[1]
+
+ field := ChildrenFields(cn.Type)[name]
+
+ if field.IsContainer {
+ if field.Key != "" {
+ children := rev.Children[name]
+ if path != "" {
+ partition = strings.SplitN(path, "/", 2)
+ key := partition[0]
+ path = ""
+ key = field.KeyFromStr(key).(string)
+ _, childRev := cn.findRevByKey(children, field.Key, key)
+ childNode := childRev.getNode()
+ return childNode.get(childRev, path, depth)
+ } else {
+ var response []interface{}
+ for _, childRev := range children {
+ childNode := childRev.getNode()
+ value := childNode.doGet(childRev, depth)
+ response = append(response, value)
+ }
+ return response
+ }
+ } else {
+ childRev := rev.Children[name][0]
+ childNode := childRev.getNode()
+ return childNode.get(childRev, path, depth)
+ }
+ }
+ return nil
+}
+
+func (cn *Node) doGet(rev *Revision, depth int) interface{} {
+ msg := rev.Get(depth)
+
+ if cn.Proxy != nil {
+ // TODO: invoke GET callback
+ fmt.Println("invoking proxy GET Callbacks")
+ }
+ return msg
+}
+
+//
+// Update operation
+//
+func (n *Node) Update(path string, data interface{}, strict bool, txid string, makeBranch t_makeBranch) *Revision {
+ // FIXME: is this required ... a bit overkill to take out a "/"
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+
+ var branch *Branch
+ var ok bool
+ if branch, ok = n.Branches[txid]; !ok {
+ branch = makeBranch(n)
+ }
+
+ if path == "" {
+ return n.doUpdate(branch, data, strict)
+ }
+ return &Revision{}
+}
+
+func (n *Node) doUpdate(branch *Branch, data interface{}, strict bool) *Revision {
+ if reflect.TypeOf(data) != n.Type {
+ // TODO raise error
+ fmt.Errorf("data does not match type: %+v", n.Type)
+ }
+
+ // TODO: noChildren?
+
+ if n.Proxy != nil {
+ // TODO: n.proxy.InvokeCallbacks(CallbackType.PRE_UPDATE, data)
+ fmt.Println("invoking proxy PRE_UPDATE Callbacks")
+ }
+ if branch.Latest.getData() != data {
+ if strict {
+ // TODO: checkAccessViolations(data, branch.GetLatest.data)
+ fmt.Println("checking access violations")
+ }
+ rev := branch.Latest.UpdateData(data, branch)
+ n.makeLatest(branch, rev, nil) // TODO -> changeAnnouncement needs to be a tuple (CallbackType.POST_UPDATE, rev.data)
+ return rev
+ } else {
+ return branch.Latest
+ }
+}
+
+// TODO: the python implementation has a method to check if the data has no children
+//func (n *SomeNode) noChildren(data interface{}) bool {
+// for fieldName, field := range ChildrenFields(n.Type) {
+// fieldValue := GetAttributeValue(data, fieldName)
+//
+// if fieldValue.IsValid() {
+// if field.IsContainer {
+// if len(fieldValue) > 0 {
+//
+// }
+// } else {
+//
+// }
+//
+// }
+// }
+//}
+
+//
+// Add operation
+//
+func (n *Node) Add(path string, data interface{}, txid string, makeBranch t_makeBranch) *Revision {
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+ if path == "" {
+ // TODO raise error
+ fmt.Errorf("cannot add for non-container mode\n")
+ }
+
+ var branch *Branch
+ var ok bool
+ if branch, ok = n.Branches[txid]; !ok {
+ branch = makeBranch(n)
+ }
+
+ rev := branch.Latest
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+ path = partition[1]
+
+ field := ChildrenFields(n.Type)[name]
+ var children []*Revision
+
+ if field.IsContainer {
+ if path == "" {
+ if field.Key != "" {
+ if n.Proxy != nil {
+ // TODO -> n.proxy.InvokeCallbacks(PRE_ADD, data)
+ fmt.Println("invoking proxy PRE_ADD Callbacks")
+ }
+
+ copy(children, rev.Children[name])
+ key := GetAttributeValue(data, field.Key, 0)
+ if _, rev := n.findRevByKey(children, field.Key, key.String()); rev != nil {
+ // TODO raise error
+ fmt.Errorf("duplicate key found: %s", key.String())
+ }
+
+ childRev := n.makeNode(data, "").Latest()
+ children = append(children, childRev)
+ rev := rev.UpdateChildren(name, children, branch)
+ n.makeLatest(branch, rev, nil) // TODO -> changeAnnouncement needs to be a tuple (CallbackType.POST_ADD, rev.data)
+ return rev
+ } else {
+ fmt.Errorf("cannot add to non-keyed container\n")
+ }
+ } else if field.Key != "" {
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ path = partition[1]
+ key = field.KeyFromStr(key).(string)
+ copy(children, rev.Children[name])
+ idx, childRev := n.findRevByKey(children, field.Key, key)
+ childNode := childRev.getNode()
+ newChildRev := childNode.Add(path, data, txid, makeBranch)
+ children[idx] = newChildRev
+ rev := rev.UpdateChildren(name, children, branch)
+ n.makeLatest(branch, rev, nil)
+ return rev
+ } else {
+ fmt.Errorf("cannot add to non-keyed container\n")
+ }
+ } else {
+ fmt.Errorf("cannot add to non-container field\n")
+ }
+ return nil
+}
+
+//
+// Remove operation
+//
+func (n *Node) Remove(path string, txid string, makeBranch t_makeBranch) *Revision {
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+ if path == "" {
+ // TODO raise error
+ fmt.Errorf("cannot remove for non-container mode\n")
+ }
+ var branch *Branch
+ var ok bool
+ if branch, ok = n.Branches[txid]; !ok {
+ branch = makeBranch(n)
+ }
+
+ rev := branch.Latest
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+ path = partition[1]
+
+ field := ChildrenFields(n.Type)[name]
+ var children []*Revision
+ post_anno := make(map[string]interface{})
+
+ if field.IsContainer {
+ if path == "" {
+ fmt.Errorf("cannot remove without a key\n")
+ } else if field.Key != "" {
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ path = partition[1]
+ key = field.KeyFromStr(key).(string)
+ if path != "" {
+ copy(children, rev.Children[name])
+ idx, childRev := n.findRevByKey(children, field.Key, key)
+ childNode := childRev.getNode()
+ newChildRev := childNode.Remove(path, txid, makeBranch)
+ children[idx] = newChildRev
+ rev := rev.UpdateChildren(name, children, branch)
+ n.makeLatest(branch, rev, nil)
+ return rev
+ } else {
+ copy(children, rev.Children[name])
+ idx, childRev := n.findRevByKey(children, field.Key, key)
+ if n.Proxy != nil {
+ data := childRev.getData()
+ fmt.Println("invoking proxy PRE_REMOVE Callbacks")
+ fmt.Printf("setting POST_REMOVE Callbacks : %+v\n", data)
+ } else {
+ fmt.Println("setting POST_REMOVE Callbacks")
+ }
+ children = append(children[:idx], children[idx+1:]...)
+ rev := rev.UpdateChildren(name, children, branch)
+ n.makeLatest(branch, rev, post_anno)
+ return rev
+ }
+ } else {
+ fmt.Errorf("cannot add to non-keyed container\n")
+ }
+ } else {
+ fmt.Errorf("cannot add to non-container field\n")
+ }
+
+ return nil
+}
+
+func (n *Node) LoadLatest(kvStore *Backend, hash string) {
+ branch := NewBranch(n, "", nil, n.AutoPrune)
+ pr := &PersistedRevision{}
+ rev := pr.load(branch, kvStore, n.Type, hash)
+ n.makeLatest(branch, rev.Revision, nil)
+ n.Branches[NONE] = branch
+}
diff --git a/db/model/node_test.go b/db/model/node_test.go
new file mode 100644
index 0000000..3fa05c9
--- /dev/null
+++ b/db/model/node_test.go
@@ -0,0 +1,62 @@
+package model
+
+import (
+ "crypto/md5"
+ "fmt"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha/protos/go/bbf_fiber"
+ "github.com/opencord/voltha/protos/go/common"
+ "github.com/opencord/voltha/protos/go/openflow_13"
+ "github.com/opencord/voltha/protos/go/voltha"
+ "testing"
+)
+
+func Test_Node_01_New(t *testing.T) {
+ ports := []*voltha.Port{
+ {
+ PortNo: 123,
+ Label: "test-etcd_port-0",
+ Type: voltha.Port_PON_OLT,
+ AdminState: common.AdminState_ENABLED,
+ OperStatus: common.OperStatus_ACTIVE,
+ DeviceId: "etcd_port-0-device-id",
+ Peers: []*voltha.Port_PeerPort{},
+ },
+ }
+ data := &voltha.Device{
+ Id: "Config-SomeNode-01-new-test",
+ Type: "simulated_olt",
+ Root: true,
+ ParentId: "",
+ ParentPortNo: 0,
+ Vendor: "voltha-test",
+ Model: "GetLatest-voltha-simulated-olt",
+ HardwareVersion: "1.0.0",
+ FirmwareVersion: "1.0.0",
+ Images: &voltha.Images{},
+ SerialNumber: "abcdef-123456",
+ VendorId: "DEADBEEF-INC",
+ Adapter: "simulated_olt",
+ Vlan: 1234,
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ ExtraArgs: "",
+ ProxyAddress: &voltha.Device_ProxyAddress{},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ OperStatus: common.OperStatus_ACTIVE,
+ Reason: "",
+ ConnectStatus: common.ConnectStatus_REACHABLE,
+ Custom: &any.Any{},
+ Ports: ports,
+ Flows: &openflow_13.Flows{},
+ FlowGroups: &openflow_13.FlowGroups{},
+ PmConfigs: &voltha.PmConfigs{},
+ ChannelTerminations: []*bbf_fiber.ChannelterminationConfig{},
+ ImageDownloads: []*voltha.ImageDownload{},
+ }
+ root := &Root{}
+ txid := fmt.Sprintf("%x", md5.Sum([]byte("node_transaction_id")))
+
+ node := NewNode(root, data, true, txid)
+
+ t.Logf("new SomeNode created : %+v\n", node)
+}
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
new file mode 100644
index 0000000..f390e0b
--- /dev/null
+++ b/db/model/persisted_revision.go
@@ -0,0 +1,150 @@
+package model
+
+import (
+ "bytes"
+ "compress/gzip"
+ "encoding/json"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "io/ioutil"
+ "reflect"
+)
+
+type PersistedRevision struct {
+ *Revision
+ Compress bool
+ kvStore *Backend
+}
+
+func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]*Revision) *PersistedRevision {
+ pr := &PersistedRevision{}
+ pr.kvStore = branch.node.root.KvStore
+ pr.Revision = NewRevision(branch, data, children)
+ return pr
+}
+
+func (pr *PersistedRevision) finalize() {
+ pr.Revision.finalize()
+ pr.store()
+}
+
+type revData struct {
+ Children map[string][]string
+ Config string
+}
+
+func (pr *PersistedRevision) store() {
+ if ok, _ := pr.kvStore.Get(pr.Revision.Hash); ok != nil {
+ return
+ }
+
+ pr.storeConfig()
+
+ childrenHashes := make(map[string][]string)
+ for fieldName, children := range pr.Children {
+ hashes := []string{}
+ for _, rev := range children {
+ hashes = append(hashes, rev.Hash)
+ }
+ childrenHashes[fieldName] = hashes
+ }
+ data := &revData{
+ Children: childrenHashes,
+ Config: pr.Config.Hash,
+ }
+ if blob, err := json.Marshal(data); err != nil {
+ // TODO report error
+ } else {
+ if pr.Compress {
+ var b bytes.Buffer
+ w := gzip.NewWriter(&b)
+ w.Write(blob)
+ w.Close()
+ blob = b.Bytes()
+ }
+ pr.kvStore.Put(pr.Hash, blob)
+ }
+}
+
+func (pr *PersistedRevision) load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) *PersistedRevision {
+ blob, _ := kvStore.Get(hash)
+ output := blob.Value.([]byte)
+ var data revData
+ if pr.Compress {
+ b := bytes.NewBuffer(blob.Value.([]byte))
+ if r, err := gzip.NewReader(b); err != nil {
+ // TODO : report error
+ } else {
+ if output, err = ioutil.ReadAll(r); err != nil {
+ // TODO report error
+ }
+ }
+ }
+ if err := json.Unmarshal(output, &data); err != nil {
+ fmt.Errorf("problem to unmarshal data - %s", err.Error())
+ }
+
+ configHash := data.Config
+ configData := pr.loadConfig(kvStore, msgClass, configHash)
+ assembledChildren := make(map[string][]*Revision)
+
+ childrenHashes := data.Children
+ node := branch.node
+ for fieldName, child := range ChildrenFields(msgClass) {
+ var children []*Revision
+ for _, childHash := range childrenHashes[fieldName] {
+ //fmt.Printf("child class type: %+v", reflect.New(n).Elem().Interface())
+ childNode := node.makeNode(reflect.New(child.ClassType).Elem().Interface(), "")
+ childNode.LoadLatest(kvStore, childHash)
+ childRev := childNode.Latest()
+ children = append(children, childRev)
+ }
+ assembledChildren[fieldName] = children
+ }
+ rev := NewPersistedRevision(branch, configData, assembledChildren)
+ return rev
+}
+
+func (pr *PersistedRevision) storeConfig() {
+ if ok, _ := pr.kvStore.Get(pr.Config.Hash); ok != nil {
+ return
+ }
+ if blob, err := proto.Marshal(pr.Config.Data.(proto.Message)); err != nil {
+ // TODO report error
+ } else {
+ if pr.Compress {
+ var b bytes.Buffer
+ w := gzip.NewWriter(&b)
+ w.Write(blob)
+ w.Close()
+ blob = b.Bytes()
+ }
+ pr.kvStore.Put(pr.Config.Hash, blob)
+ }
+}
+
+func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
+ blob, _ := kvStore.Get(hash)
+ output := blob.Value.([]byte)
+
+ if pr.Compress {
+ b := bytes.NewBuffer(blob.Value.([]byte))
+ if r, err := gzip.NewReader(b); err != nil {
+ // TODO : report error
+ } else {
+ if output, err = ioutil.ReadAll(r); err != nil {
+ // TODO report error
+ }
+ }
+ }
+
+ var data reflect.Value
+ if msgClass != nil {
+ data = reflect.New(reflect.TypeOf(msgClass).Elem())
+ if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
+ // TODO report error
+ }
+ }
+
+ return data.Interface()
+}
diff --git a/db/model/proxy.go b/db/model/proxy.go
new file mode 100644
index 0000000..41255e5
--- /dev/null
+++ b/db/model/proxy.go
@@ -0,0 +1,94 @@
+package model
+
+import (
+ "context"
+ "fmt"
+ "strings"
+)
+
+type Proxy struct {
+ Root *Root
+ Node *Node
+ Path string
+ Exclusive bool
+ Callbacks []interface{}
+}
+
+func NewProxy(root *Root, node *Node, path string, exclusive bool) *Proxy {
+ p := &Proxy{
+ Root: root,
+ Node: node,
+ Exclusive: exclusive,
+ Path: path,
+ Callbacks: []interface{}{},
+ }
+ return p
+}
+
+func (p *Proxy) Get(path string, depth int, deep bool, txid string) interface{} {
+ return p.Node.Get(path, "", depth, deep, txid)
+}
+
+func (p *Proxy) Update(path string, data interface{}, strict bool, txid string) interface{} {
+ if !strings.HasPrefix(path, "/") {
+ fmt.Errorf("invalid path: %s", path)
+ return nil
+ }
+ var fullPath string
+ if path == "/" {
+ fullPath = p.Path
+ } else {
+ fullPath = p.Path + path
+ }
+ return p.Node.Update(fullPath, data, strict, txid, nil)
+}
+
+func (p *Proxy) Add(path string, data interface{}, txid string) interface{} {
+ if !strings.HasPrefix(path, "/") {
+ fmt.Errorf("invalid path: %s", path)
+ return nil
+ }
+ var fullPath string
+ if path == "/" {
+ fullPath = p.Path
+ } else {
+ fullPath = p.Path + path
+ }
+ return p.Node.Add(fullPath, data, txid, nil)
+}
+
+func (p *Proxy) Remove(path string, txid string) interface{} {
+ if !strings.HasPrefix(path, "/") {
+ fmt.Errorf("invalid path: %s", path)
+ return nil
+ }
+ var fullPath string
+ if path == "/" {
+ fullPath = p.Path
+ } else {
+ fullPath = p.Path + path
+ }
+ return p.Node.Remove(fullPath, txid, nil)
+}
+
+func (p *Proxy) openTransaction() *Transaction {
+ txid := p.Root.makeTxBranch()
+ return NewTransaction(p, txid)
+}
+
+func (p *Proxy) commitTransaction(txid string) {
+ p.Root.foldTxBranch(txid)
+}
+
+func (p *Proxy) cancelTransaction(txid string) {
+ p.Root.deleteTxBranch(txid)
+}
+
+func (p *Proxy) RegisterCallback(callbackType CallbackType, callback func(), args ...interface{}) {
+}
+
+func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback func(), args ...interface{}) {
+}
+
+func (p *Proxy) InvokeCallback(callbackType CallbackType, context context.Context, proceedOnError bool) {
+}
diff --git a/db/model/revision.go b/db/model/revision.go
new file mode 100644
index 0000000..06a04ae
--- /dev/null
+++ b/db/model/revision.go
@@ -0,0 +1,143 @@
+package model
+
+import (
+ "bytes"
+ "crypto/md5"
+ "fmt"
+ "reflect"
+ "sort"
+)
+
+var (
+ RevisionCache = make(map[string]interface{})
+)
+
+type Revision struct {
+ Config *DataRevision
+ Children map[string][]*Revision
+ Hash string
+ branch *Branch
+ WeakRef string
+}
+
+func NewRevision(branch *Branch, data interface{}, children map[string][]*Revision) *Revision {
+ cr := &Revision{}
+ cr.branch = branch
+ cr.Config = NewDataRevision(data)
+ cr.Children = children
+ cr.finalize()
+
+ return cr
+}
+
+func (cr *Revision) finalize() {
+ cr.Hash = cr.hashContent()
+
+ if _, exists := RevisionCache[cr.Hash]; !exists {
+ RevisionCache[cr.Hash] = cr
+ }
+ if _, exists := RevisionCache[cr.Config.Hash]; !exists {
+ RevisionCache[cr.Config.Hash] = cr.Config
+ } else {
+ cr.Config = RevisionCache[cr.Config.Hash].(*DataRevision)
+ }
+}
+
+func (cr *Revision) hashContent() string {
+ var buffer bytes.Buffer
+ var childrenKeys []string
+
+ if cr.Config != nil {
+ buffer.WriteString(cr.Config.Hash)
+ }
+
+ for key, _ := range cr.Children {
+ childrenKeys = append(childrenKeys, key)
+ }
+ sort.Strings(childrenKeys)
+
+ if cr.Children != nil && len(cr.Children) > 0 {
+ // Loop through sorted Children keys
+ for _, key := range childrenKeys {
+ for _, child := range cr.Children[key] {
+ buffer.WriteString(child.Hash)
+ }
+ }
+ }
+
+ return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
+}
+
+func (cr *Revision) getData() interface{} {
+ if cr.Config == nil {
+ return nil
+ }
+ return cr.Config.Data
+}
+
+func (cr *Revision) getNode() *Node {
+ return cr.branch.node
+}
+
+func (cr *Revision) getType() reflect.Type {
+ // TODO: what is this returning really?
+ return reflect.TypeOf(cr.getData())
+}
+
+func (cr *Revision) clearHash() {
+ cr.Hash = ""
+}
+
+func (cr *Revision) Get(depth int) interface{} {
+ originalData := cr.getData()
+ data := Clone(originalData)
+
+ if depth > 0 {
+ for fieldName, field := range ChildrenFields(cr.getType()) {
+ if field.IsContainer {
+ for _, rev := range cr.Children[fieldName] {
+ childData := rev.Get(depth - 1)
+ childDataHolder := GetAttributeValue(data, fieldName, 0)
+ // TODO: merge with childData
+ fmt.Printf("data:%+v, dataHolder:%+v", childData, childDataHolder)
+ }
+ } else {
+ rev := cr.Children[fieldName][0]
+ childData := rev.Get(depth - 1)
+ childDataHolder := GetAttributeValue(data, fieldName, 0)
+ // TODO: merge with childData
+ fmt.Printf("data:%+v, dataHolder:%+v", childData, childDataHolder)
+ }
+ }
+ }
+ return data
+}
+
+func (cr *Revision) UpdateData(data interface{}, branch *Branch) *Revision {
+ newRev := Clone(cr).(*Revision)
+ newRev.branch = branch
+ newRev.Config = data.(*DataRevision)
+ newRev.finalize()
+
+ return newRev
+}
+
+func (cr *Revision) UpdateChildren(name string, children []*Revision, branch *Branch) *Revision {
+ newChildren := Clone(cr.Children).(map[string][]*Revision)
+ newChildren[name] = children
+ newRev := Clone(cr).(*Revision)
+ newRev.branch = branch
+ newRev.Children = newChildren
+ newRev.finalize()
+
+ return newRev
+}
+
+func (cr *Revision) UpdateAllChildren(children map[string][]*Revision, branch *Branch) *Revision {
+ newRev := Clone(cr).(*Revision)
+ newRev.branch = branch
+ newRev.Children = children
+ newRev.finalize()
+
+ return newRev
+}
diff --git a/db/model/revision_test.go b/db/model/revision_test.go
new file mode 100644
index 0000000..9d3ebc5
--- /dev/null
+++ b/db/model/revision_test.go
@@ -0,0 +1,15 @@
+package model
+
+import (
+ "github.com/opencord/voltha/protos/go/voltha"
+ "testing"
+)
+
+func Test_Revision_01_New(t *testing.T) {
+ branch := &Branch{}
+ data := &voltha.Device{}
+ children := make(map[string][]*Revision)
+ rev := NewRevision(branch, data, children)
+
+ t.Logf("New revision created: %+v\n", rev)
+}
diff --git a/db/model/root.go b/db/model/root.go
new file mode 100644
index 0000000..722fc94
--- /dev/null
+++ b/db/model/root.go
@@ -0,0 +1,213 @@
+package model
+
+import (
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "github.com/google/uuid"
+ "reflect"
+)
+
+type Root struct {
+ *Node
+ DirtyNodes map[string]*Node
+ KvStore *Backend
+ Loading bool
+ RevisionClass interface{}
+ Callbacks []func() interface{}
+ NotificationCallbacks []func() interface{}
+}
+
+func NewRoot(initialData interface{}, kvStore *Backend, revisionClass interface{}) *Root {
+ root := &Root{}
+ root.KvStore = kvStore
+ root.DirtyNodes = make(map[string]*Node)
+ root.Loading = false
+ if kvStore != nil /*&& FIXME: RevisionClass is a subclass of PersistedConfigRevision */ {
+ revisionClass = reflect.TypeOf(PersistedRevision{})
+ }
+ root.RevisionClass = revisionClass
+ root.Callbacks = []func() interface{}{}
+ root.NotificationCallbacks = []func() interface{}{}
+
+ root.Node = NewNode(root, initialData, false, "")
+
+ return root
+}
+
+func (r *Root) makeRevision(branch *Branch, data interface{}, children map[string][]*Revision) *Revision {
+
+ return &Revision{}
+}
+
+func (r *Root) makeTxBranch() string {
+ txid_bin, _ := uuid.New().MarshalBinary()
+ txid := hex.EncodeToString(txid_bin)[:12]
+ r.DirtyNodes[txid] = r.Node
+ r.Node.makeTxBranch(txid)
+ return txid
+}
+
+func (r *Root) deleteTxBranch(txid string) {
+ for _, dirtyNode := range r.DirtyNodes {
+ dirtyNode.deleteTxBranch(txid)
+ }
+ delete(r.DirtyNodes, txid)
+}
+
+func (r *Root) foldTxBranch(txid string) {
+ // TODO: implement foldTxBranch
+ // if err := r.Node.mergeTxBranch(txid, dryRun=true); err != nil {
+ // r.deleteTxBranch(txid)
+ // } else {
+ // r.Node.mergeTxBranch(txid)
+ // r.executeCallbacks()
+ // }
+}
+
+func (r *Root) executeCallbacks() {
+ for len(r.Callbacks) > 0 {
+ callback := r.Callbacks[0]
+ r.Callbacks = r.Callbacks[1:]
+ callback()
+ }
+ for len(r.NotificationCallbacks) > 0 {
+ callback := r.NotificationCallbacks[0]
+ r.NotificationCallbacks = r.NotificationCallbacks[1:]
+ callback()
+ }
+}
+
+func (r *Root) noCallbacks() bool {
+ return len(r.Callbacks) == 0
+}
+
+func (r *Root) addCallback(callback func() interface{}) {
+ r.Callbacks = append(r.Callbacks, callback)
+}
+func (r *Root) addNotificationCallback(callback func() interface{}) {
+ r.NotificationCallbacks = append(r.NotificationCallbacks, callback)
+}
+
+func (r *Root) Update(path string, data interface{}, strict bool, txid string, makeBranch t_makeBranch) *Revision {
+ var result *Revision
+ // FIXME: the more i look at this... i think i need to implement an interface for Node & root
+
+ if makeBranch == nil {
+ // TODO: raise error
+ }
+
+ if r.noCallbacks() {
+ // TODO: raise error
+ }
+
+ if txid != "" {
+ //dirtied := r.DirtyNodes[txid]
+
+ trackDirty := func(node *Node) *Branch {
+ //dirtied.Add(Node)
+ return node.makeTxBranch(txid)
+ }
+ result = r.Node.Update(path, data, strict, txid, trackDirty)
+ } else {
+ result = r.Node.Update(path, data, strict, "", nil)
+ }
+
+ r.executeCallbacks()
+
+ return result
+}
+
+func (r *Root) Add(path string, data interface{}, txid string, makeBranch t_makeBranch) *Revision {
+ var result *Revision
+ // FIXME: the more i look at this... i think i need to implement an interface for Node & root
+
+ if makeBranch == nil {
+ // TODO: raise error
+ }
+
+ if r.noCallbacks() {
+ // TODO: raise error
+ }
+
+ if txid != "" {
+ //dirtied := r.DirtyNodes[txid]
+
+ trackDirty := func(node *Node) *Branch {
+ //dirtied.Add(Node)
+ return node.makeTxBranch(txid)
+ }
+ result = r.Node.Add(path, data, txid, trackDirty)
+ } else {
+ result = r.Node.Add(path, data, "", nil)
+ }
+
+ r.executeCallbacks()
+
+ return result
+}
+
+func (r *Root) Remove(path string, txid string, makeBranch t_makeBranch) *Revision {
+ var result *Revision
+ // FIXME: the more i look at this... i think i need to implement an interface for Node & root
+
+ if makeBranch == nil {
+ // TODO: raise error
+ }
+
+ if r.noCallbacks() {
+ // TODO: raise error
+ }
+
+ if txid != "" {
+ //dirtied := r.DirtyNodes[txid]
+
+ trackDirty := func(node *Node) *Branch {
+ //dirtied.Add(Node)
+ return node.makeTxBranch(txid)
+ }
+ result = r.Node.Remove(path, txid, trackDirty)
+ } else {
+ result = r.Node.Remove(path, "", nil)
+ }
+
+ r.executeCallbacks()
+
+ return result
+}
+
+func (r *Root) Load(rootClass interface{}) *Root {
+ //fakeKvStore := &Backend{}
+ //root := NewRoot(rootClass, fakeKvStore, PersistedRevision{})
+ //r.KvStore = KvStore
+ r.loadFromPersistence(rootClass)
+ return r
+}
+
+func (r *Root) LoadLatest(hash string) {
+ r.Node.LoadLatest(r.KvStore, hash)
+}
+
+type rootData struct {
+ Latest string `json:GetLatest`
+ Tags map[string]string `json:Tags`
+}
+
+func (r *Root) loadFromPersistence(rootClass interface{}) {
+ var data rootData
+
+ r.Loading = true
+ blob, _ := r.KvStore.Get("root")
+
+ if err := json.Unmarshal(blob.Value.([]byte), &data); err != nil {
+ fmt.Errorf("problem to unmarshal blob - error:%s\n", err.Error())
+ }
+
+ for tag, hash := range data.Tags {
+ r.Node.LoadLatest(r.KvStore, hash)
+ r.Node.Tags[tag] = r.Node.Latest()
+ }
+
+ r.Node.LoadLatest(r.KvStore, data.Latest)
+ r.Loading = false
+}
diff --git a/db/model/root_test.go b/db/model/root_test.go
new file mode 100644
index 0000000..f9aeb67
--- /dev/null
+++ b/db/model/root_test.go
@@ -0,0 +1,44 @@
+package model
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/opencord/voltha/protos/go/voltha"
+ "testing"
+ "time"
+)
+
+var (
+ backend *Backend
+ //rootPrefix = "service/voltha/data/core/0001"
+
+ basePrefix = "service/voltha/service/vcores/data/devices"
+ deviceId = "00016f13befaedcc"
+ rootPrefix = basePrefix + "/" + deviceId
+ deviceProxy = "/devices/" + deviceId
+)
+
+func Test_NewRoot(t *testing.T) {
+ backend = NewBackend(ETCD_KV, etcd_host, etcd_port, timeout, rootPrefix)
+
+ //var msgClass *voltha.VolthaInstance
+ var msgClass *voltha.DeviceInstance
+ root := NewRoot(msgClass, backend, nil)
+
+ start := time.Now()
+
+ r := root.Load(msgClass)
+ afterLoad := time.Now()
+ fmt.Printf(">>>>>>>>>>>>> Time to load : %f\n", afterLoad.Sub(start).Seconds())
+
+ d := r.Node.Get(deviceProxy, "", 0, false, "")
+ afterGet := time.Now()
+ fmt.Printf(">>>>>>>>>>>>> Time to load and get: %f\n", afterGet.Sub(start).Seconds())
+
+ jr, _ := json.Marshal(r)
+ fmt.Printf("Content of ROOT --> \n%s\n", jr)
+
+ jd, _ := json.Marshal(d)
+ fmt.Printf("Content of GET --> \n%s\n", jd)
+
+}
diff --git a/db/model/transaction.go b/db/model/transaction.go
new file mode 100644
index 0000000..b7288ab
--- /dev/null
+++ b/db/model/transaction.go
@@ -0,0 +1,53 @@
+package model
+
+import "fmt"
+
+type Transaction struct {
+ proxy *Proxy
+ txid string
+}
+
+func NewTransaction(proxy *Proxy, txid string) *Transaction {
+ tx := &Transaction{
+ proxy: proxy,
+ txid: txid,
+ }
+ return tx
+}
+func (t *Transaction) Get(path string, depth int, deep bool) *Revision {
+ if t.txid == "" {
+ fmt.Errorf("closed transaction")
+ return nil
+ }
+ // TODO: need to review the return values at the different layers!!!!!
+ return t.proxy.Get(path, depth, deep, t.txid).(*Revision)
+}
+func (t *Transaction) Update(path string, data interface{}, strict bool) *Revision {
+ if t.txid == "" {
+ fmt.Errorf("closed transaction")
+ return nil
+ }
+ return t.proxy.Update(path, data, strict, t.txid).(*Revision)
+}
+func (t *Transaction) Add(path string, data interface{}) *Revision {
+ if t.txid == "" {
+ fmt.Errorf("closed transaction")
+ return nil
+ }
+ return t.proxy.Add(path, data, t.txid).(*Revision)
+}
+func (t *Transaction) Remove(path string) *Revision {
+ if t.txid == "" {
+ fmt.Errorf("closed transaction")
+ return nil
+ }
+ return t.proxy.Remove(path, t.txid).(*Revision)
+}
+func (t *Transaction) Cancel() {
+ t.proxy.cancelTransaction(t.txid)
+ t.txid = ""
+}
+func (t *Transaction) Commit() {
+ t.proxy.commitTransaction(t.txid)
+ t.txid = ""
+}
diff --git a/db/model/utils.go b/db/model/utils.go
new file mode 100644
index 0000000..d8079f9
--- /dev/null
+++ b/db/model/utils.go
@@ -0,0 +1,231 @@
+package model
+
+import (
+ "bytes"
+ "encoding/gob"
+ "reflect"
+ "strings"
+)
+
+func IsProtoMessage(object interface{}) bool {
+ var ok bool
+
+ if object != nil {
+ st := reflect.TypeOf(object)
+ _, ok = st.MethodByName("ProtoMessage")
+ }
+ return ok
+}
+
+func FindOwnerType(obj reflect.Value, name string, depth int, found bool) reflect.Type {
+ k := obj.Kind()
+ switch k {
+ case reflect.Ptr:
+ t := obj.Type().Elem()
+ n := reflect.New(t)
+
+ if rc := FindOwnerType(n.Elem(), name, depth+1, false); rc != nil {
+ return rc
+ }
+
+ case reflect.Struct:
+ for i := 0; i < obj.NumField(); i += 1 {
+ v := reflect.Indirect(obj)
+
+ json := strings.Split(v.Type().Field(i).Tag.Get("json"), ",")
+
+ if json[0] == name {
+ return FindOwnerType(obj.Field(i), name, depth+1, true)
+ }
+
+ if rc := FindOwnerType(obj.Field(i), name, depth+1, false); rc != nil {
+ return rc
+ }
+ }
+ case reflect.Slice:
+ s := reflect.MakeSlice(obj.Type(), 1, 1)
+ n := reflect.New(obj.Type())
+ n.Elem().Set(s)
+
+ for i := 0; i < n.Elem().Len(); i += 1 {
+ if found {
+ return reflect.ValueOf(n.Elem().Index(i).Interface()).Type()
+ }
+ }
+
+ for i := 0; i < obj.Len(); i += 1 {
+ if found {
+ return obj.Index(i).Type()
+ }
+
+ if rc := FindOwnerType(obj.Index(i), name, depth+1, false); rc != nil {
+ return rc
+ }
+ }
+ default:
+ //fmt.Printf("%s Unhandled <%+v> ... It's a %+v\n", prefix, obj, k)
+ }
+
+ return nil
+}
+
+func FindKeyOwner(iface interface{}, name string, depth int) interface{} {
+ obj := reflect.ValueOf(iface)
+ k := obj.Kind()
+ switch k {
+ case reflect.Ptr:
+ t := obj.Type().Elem()
+ n := reflect.New(t)
+
+ if rc := FindKeyOwner(n.Elem().Interface(), name, depth+1); rc != nil {
+ return rc
+ }
+
+ case reflect.Struct:
+ for i := 0; i < obj.NumField(); i++ {
+ json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
+
+ if json[0] == name {
+ return obj.Type().Field(i).Type
+ }
+
+ if rc := FindKeyOwner(obj.Field(i).Interface(), name, depth+1); rc != nil {
+ return rc
+ }
+ }
+
+ case reflect.Slice:
+ s := reflect.MakeSlice(obj.Type(), 1, 1)
+ n := reflect.New(obj.Type())
+ n.Elem().Set(s)
+
+ for i := 0; i < n.Elem().Len(); i += 1 {
+ if rc := FindKeyOwner(n.Elem().Index(i).Interface(), name, depth+1); rc != nil {
+ return rc
+ }
+ }
+ default:
+ //fmt.Printf("%s Unhandled <%+v> ... It's a %+v\n", prefix, obj, k)
+ }
+
+ return nil
+}
+
+// FIXME: Need to figure out if GetAttributeValue and GetAttributeStructure can become one
+// Code is repeated in both, but outputs have a different purpose
+// Left as-is for now to get things working
+func GetAttributeValue(data interface{}, name string, depth int) reflect.Value {
+ var result reflect.Value
+ obj := reflect.ValueOf(data)
+
+ if !obj.IsValid() {
+ return result
+ }
+
+ k := obj.Kind()
+ switch k {
+ case reflect.Ptr:
+ t := obj.Type().Elem()
+ n := reflect.New(t)
+
+ if rc := GetAttributeValue(n.Elem().Interface(), name, depth+1); rc.IsValid() {
+ return rc
+ }
+
+ case reflect.Struct:
+ for i := 0; i < obj.NumField(); i++ {
+ json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
+
+ if json[0] == name {
+ return obj.Field(i)
+ }
+
+ if obj.Field(i).IsValid() {
+ if rc := GetAttributeValue(obj.Field(i).Interface(), name, depth+1); rc.IsValid() {
+ return rc
+ }
+ }
+ }
+
+ case reflect.Slice:
+ s := reflect.MakeSlice(obj.Type(), 1, 1)
+ n := reflect.New(obj.Type())
+ n.Elem().Set(s)
+
+ for i := 0; i < obj.Len(); i += 1 {
+ if rc := GetAttributeValue(obj.Index(i).Interface(), name, depth+1); rc.IsValid() {
+ return rc
+ }
+ }
+ default:
+ //fmt.Printf("%s Unhandled <%+v> ... It's a %+v\n", prefix, obj, k)
+ }
+
+ return result
+
+}
+
+// FIXME: See GetAttributeValue(...) comment
+func GetAttributeStructure(data interface{}, name string, depth int) reflect.StructField {
+ var result reflect.StructField
+ obj := reflect.ValueOf(data)
+
+ if !obj.IsValid() {
+ return result
+ }
+
+ k := obj.Kind()
+ switch k {
+ case reflect.Ptr:
+ t := obj.Type().Elem()
+ n := reflect.New(t)
+
+ if rc := GetAttributeStructure(n.Elem().Interface(), name, depth+1); rc.Name != "" {
+ return rc
+ }
+
+ case reflect.Struct:
+ for i := 0; i < obj.NumField(); i++ {
+ v := reflect.Indirect(obj)
+ json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
+
+ if json[0] == name {
+ return v.Type().Field(i)
+ }
+
+ if obj.Field(i).IsValid() {
+ if rc := GetAttributeStructure(obj.Field(i).Interface(), name, depth+1); rc.Name != "" {
+ return rc
+ }
+ }
+ }
+
+ case reflect.Slice:
+ s := reflect.MakeSlice(obj.Type(), 1, 1)
+ n := reflect.New(obj.Type())
+ n.Elem().Set(s)
+
+ for i := 0; i < obj.Len(); i += 1 {
+ if rc := GetAttributeStructure(obj.Index(i).Interface(), name, depth+1); rc.Name != "" {
+ return rc
+ }
+
+ }
+ default:
+ //fmt.Printf("%s Unhandled <%+v> ... It's a %+v\n", prefix, obj, k)
+ }
+
+ return result
+
+}
+
+func Clone(a interface{}) interface{} {
+ b := reflect.ValueOf(a)
+ buff := new(bytes.Buffer)
+ enc := gob.NewEncoder(buff)
+ dec := gob.NewDecoder(buff)
+ enc.Encode(a)
+ dec.Decode(b.Elem().Interface())
+
+ return b.Interface()
+}