VOL-2154 : Moving back db/model to voltha-go repo
- Package imports and dockerfiles updated
- Left backend.go in voltha-lib-go
Amendments:
- Adjusted proto imports in unit tests
- Adjusted references to db/model in unit tests
- Integrate voltha-lib-go updates
Change-Id: I2d168c516a238222f0371a7bcb672d2b06796838
diff --git a/db/model/base_test.go b/db/model/base_test.go
new file mode 100644
index 0000000..45a6cdc
--- /dev/null
+++ b/db/model/base_test.go
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/db"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "runtime/debug"
+ "sync"
+)
+
+type ModelTestConfig struct {
+ Root *root
+ Backend *db.Backend
+ RootProxy *Proxy
+ DbPrefix string
+ DbType string
+ DbHost string
+ DbPort int
+ DbTimeout int
+}
+
+var callbackMutex sync.Mutex
+
+func commonChanCallback(args ...interface{}) interface{} {
+ log.Infof("Running common callback - arg count: %d", len(args))
+
+ //for i := 0; i < len(args); i++ {
+ // log.Infof("ARG %d : %+v", i, args[i])
+ //}
+
+ callbackMutex.Lock()
+ defer callbackMutex.Unlock()
+
+ execDoneChan := args[1].(*chan struct{})
+
+ // Inform the caller that the callback was executed
+ if *execDoneChan != nil {
+ log.Infof("Sending completion indication - stack:%s", string(debug.Stack()))
+ close(*execDoneChan)
+ *execDoneChan = nil
+ }
+
+ return nil
+}
+
+func commonCallback2(args ...interface{}) interface{} {
+ log.Infof("Running common2 callback - arg count: %d %+v", len(args), args)
+
+ return nil
+}
+
+func commonCallbackFunc(args ...interface{}) interface{} {
+ log.Infof("Running common callback - arg count: %d", len(args))
+
+ for i := 0; i < len(args); i++ {
+ log.Infof("ARG %d : %+v", i, args[i])
+ }
+ execStatusFunc := args[1].(func(bool))
+
+ // Inform the caller that the callback was executed
+ execStatusFunc(true)
+
+ return nil
+}
+
+func firstCallback(args ...interface{}) interface{} {
+ name := args[0]
+ id := args[1]
+ log.Infof("Running first callback - name: %s, id: %s\n", name, id)
+ return nil
+}
+
+func secondCallback(args ...interface{}) interface{} {
+ name := args[0].(map[string]string)
+ id := args[1]
+ log.Infof("Running second callback - name: %s, id: %f\n", name["name"], id)
+ // FIXME: the panic call seem to interfere with the logging mechanism
+ //panic("Generating a panic in second callback")
+ return nil
+}
+
+func thirdCallback(args ...interface{}) interface{} {
+ name := args[0]
+ id := args[1].(*voltha.Device)
+ log.Infof("Running third callback - name: %+v, id: %s\n", name, id.Id)
+ return nil
+}
diff --git a/db/model/branch.go b/db/model/branch.go
new file mode 100644
index 0000000..957e0ca
--- /dev/null
+++ b/db/model/branch.go
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "sync"
+)
+
+// TODO: implement weak references or something equivalent
+// TODO: missing proper logging
+
+// Branch structure is used to classify a collection of transaction based revisions
+type Branch struct {
+ mutex sync.RWMutex
+ Node *node
+ Txid string
+ Origin Revision
+ Revisions map[string]Revision
+ LatestLock sync.RWMutex
+ Latest Revision
+}
+
+// NewBranch creates a new instance of the Branch structure
+func NewBranch(node *node, txid string, origin Revision, autoPrune bool) *Branch {
+ b := &Branch{}
+ b.Node = node
+ b.Txid = txid
+ b.Origin = origin
+ b.Revisions = make(map[string]Revision)
+ b.Latest = origin
+
+ return b
+}
+
+// Utility function to extract all children names for a given revision (mostly for debugging purposes)
+func (b *Branch) retrieveChildrenNames(revision Revision) []string {
+ var childrenNames []string
+
+ for _, child := range revision.GetChildren("devices") {
+ childrenNames = append(childrenNames, child.GetName())
+ }
+
+ return childrenNames
+}
+
+// Utility function to compare children names and report the missing ones (mostly for debugging purposes)
+func (b *Branch) findMissingChildrenNames(previousNames, latestNames []string) []string {
+ var missingNames []string
+
+ for _, previousName := range previousNames {
+ found := false
+
+ if len(latestNames) == 0 {
+ break
+ }
+
+ for _, latestName := range latestNames {
+ if previousName == latestName {
+ found = true
+ break
+ }
+ }
+ if !found {
+ missingNames = append(missingNames, previousName)
+ }
+ }
+
+ return missingNames
+}
+
+// SetLatest assigns the latest revision for this branch
+func (b *Branch) SetLatest(latest Revision) {
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
+
+ if b.Latest != nil {
+ log.Debugw("updating-latest-revision", log.Fields{"current": b.Latest.GetHash(), "new": latest.GetHash()})
+
+ // Go through list of children names in current revision and new revision
+ // and then compare the resulting outputs to ensure that we have not lost any entries.
+
+ if level, _ := log.GetPackageLogLevel(); level == log.DebugLevel {
+ var previousNames, latestNames, missingNames []string
+
+ if previousNames = b.retrieveChildrenNames(b.Latest); len(previousNames) > 0 {
+ log.Debugw("children-of-previous-revision", log.Fields{"hash": b.Latest.GetHash(), "names": previousNames})
+ }
+
+ if latestNames = b.retrieveChildrenNames(b.Latest); len(latestNames) > 0 {
+ log.Debugw("children-of-latest-revision", log.Fields{"hash": latest.GetHash(), "names": latestNames})
+ }
+
+ if missingNames = b.findMissingChildrenNames(previousNames, latestNames); len(missingNames) > 0 {
+ log.Debugw("children-missing-in-latest-revision", log.Fields{"hash": latest.GetHash(), "names": missingNames})
+ }
+ }
+
+ } else {
+ log.Debugw("setting-latest-revision", log.Fields{"new": latest.GetHash()})
+ }
+
+ b.Latest = latest
+}
+
+// GetLatest retrieves the latest revision of the branch
+func (b *Branch) GetLatest() Revision {
+ b.mutex.RLock()
+ defer b.mutex.RUnlock()
+
+ return b.Latest
+}
+
+// GetOrigin retrieves the original revision of the branch
+func (b *Branch) GetOrigin() Revision {
+ b.mutex.RLock()
+ defer b.mutex.RUnlock()
+
+ return b.Origin
+}
+
+// AddRevision inserts a new revision to the branch
+func (b *Branch) AddRevision(revision Revision) {
+ if revision != nil && b.GetRevision(revision.GetHash()) == nil {
+ b.SetRevision(revision.GetHash(), revision)
+ }
+}
+
+// GetRevision pulls a revision entry at the specified hash
+func (b *Branch) GetRevision(hash string) Revision {
+ b.mutex.RLock()
+ defer b.mutex.RUnlock()
+
+ if revision, ok := b.Revisions[hash]; ok {
+ return revision
+ }
+
+ return nil
+}
+
+// SetRevision updates a revision entry at the specified hash
+func (b *Branch) SetRevision(hash string, revision Revision) {
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
+
+ b.Revisions[hash] = revision
+}
+
+// DeleteRevision removes a revision with the specified hash
+func (b *Branch) DeleteRevision(hash string) {
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
+
+ if _, ok := b.Revisions[hash]; ok {
+ delete(b.Revisions, hash)
+ }
+}
diff --git a/db/model/branch_test.go b/db/model/branch_test.go
new file mode 100644
index 0000000..cf8406c
--- /dev/null
+++ b/db/model/branch_test.go
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "crypto/md5"
+ "fmt"
+ "testing"
+)
+
+var (
+ TestBranch_BRANCH *Branch
+ TestBranch_HASH string
+)
+
+// Create a new branch and ensure that fields are populated
+func TestBranch_NewBranch(t *testing.T) {
+ node := &node{}
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("origin_hash")))
+ origin := &NonPersistedRevision{
+ Config: &DataRevision{},
+ Children: make(map[string][]Revision),
+ Hash: hash,
+ Branch: &Branch{},
+ }
+ txid := fmt.Sprintf("%x", md5.Sum([]byte("branch_transaction_id")))
+
+ TestBranch_BRANCH = NewBranch(node, txid, origin, true)
+ t.Logf("New Branch(txid:%s) created: %+v\n", txid, TestBranch_BRANCH)
+
+ if TestBranch_BRANCH.Latest == nil {
+ t.Errorf("Branch latest pointer is nil")
+ } else if TestBranch_BRANCH.Origin == nil {
+ t.Errorf("Branch origin pointer is nil")
+ } else if TestBranch_BRANCH.Node == nil {
+ t.Errorf("Branch node pointer is nil")
+ } else if TestBranch_BRANCH.Revisions == nil {
+ t.Errorf("Branch revisions map is nil")
+ } else if TestBranch_BRANCH.Txid == "" {
+ t.Errorf("Branch transaction id is empty")
+ }
+}
+
+// Add a new revision to the branch
+func TestBranch_AddRevision(t *testing.T) {
+ TestBranch_HASH = fmt.Sprintf("%x", md5.Sum([]byte("revision_hash")))
+ rev := &NonPersistedRevision{
+ Config: &DataRevision{},
+ Children: make(map[string][]Revision),
+ Hash: TestBranch_HASH,
+ Branch: &Branch{},
+ }
+
+ TestBranch_BRANCH.AddRevision(rev)
+ t.Logf("Added revision: %+v\n", rev)
+
+ if len(TestBranch_BRANCH.Revisions) == 0 {
+ t.Errorf("Branch revisions map is empty")
+ }
+}
+
+// Ensure that the added revision can be retrieved
+func TestBranch_GetRevision(t *testing.T) {
+ if rev := TestBranch_BRANCH.GetRevision(TestBranch_HASH); rev == nil {
+ t.Errorf("Unable to retrieve revision for hash:%s", TestBranch_HASH)
+ } else {
+ t.Logf("Got revision for hash:%s rev:%+v\n", TestBranch_HASH, rev)
+ }
+}
+
+// Set the added revision as the latest
+func TestBranch_LatestRevision(t *testing.T) {
+ addedRevision := TestBranch_BRANCH.GetRevision(TestBranch_HASH)
+ TestBranch_BRANCH.SetLatest(addedRevision)
+
+ rev := TestBranch_BRANCH.GetLatest()
+ t.Logf("Retrieved latest revision :%+v", rev)
+
+ if rev == nil {
+ t.Error("Unable to retrieve latest revision")
+ } else if rev.GetHash() != TestBranch_HASH {
+ t.Errorf("Latest revision does not match hash: %s", TestBranch_HASH)
+ }
+}
+
+// Ensure that the origin revision remains and differs from subsequent revisions
+func TestBranch_OriginRevision(t *testing.T) {
+ rev := TestBranch_BRANCH.Origin
+ t.Logf("Retrieved origin revision :%+v", rev)
+
+ if rev == nil {
+ t.Error("Unable to retrieve origin revision")
+ } else if rev.GetHash() == TestBranch_HASH {
+ t.Errorf("Origin revision should differ from added revision: %s", TestBranch_HASH)
+ }
+}
diff --git a/db/model/callback_type.go b/db/model/callback_type.go
new file mode 100644
index 0000000..b530dee
--- /dev/null
+++ b/db/model/callback_type.go
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+// CallbackType is an enumerated value to express when a callback should be executed
+type CallbackType uint8
+
+// Enumerated list of callback types
+const (
+ GET CallbackType = iota
+ PRE_UPDATE
+ POST_UPDATE
+ PRE_ADD
+ POST_ADD
+ PRE_REMOVE
+ POST_REMOVE
+ POST_LISTCHANGE
+)
+
+var enumCallbackTypes = []string{
+ "GET",
+ "PRE_UPDATE",
+ "POST_UPDATE",
+ "PRE_ADD",
+ "POST_ADD",
+ "PRE_REMOVE",
+ "POST_REMOVE",
+ "POST_LISTCHANGE",
+}
+
+func (t CallbackType) String() string {
+ return enumCallbackTypes[t]
+}
diff --git a/db/model/child_type.go b/db/model/child_type.go
new file mode 100644
index 0000000..5928192
--- /dev/null
+++ b/db/model/child_type.go
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ desc "github.com/golang/protobuf/descriptor"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/protoc-gen-go/descriptor"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/common"
+ "reflect"
+ "strconv"
+ "sync"
+)
+
+type childTypesSingleton struct {
+ mutex sync.RWMutex
+ Cache map[interface{}]map[string]*ChildType
+}
+
+var instanceChildTypes *childTypesSingleton
+var onceChildTypes sync.Once
+
+func getChildTypes() *childTypesSingleton {
+ onceChildTypes.Do(func() {
+ instanceChildTypes = &childTypesSingleton{}
+ })
+ return instanceChildTypes
+}
+
+func (s *childTypesSingleton) GetCache() map[interface{}]map[string]*ChildType {
+ s.mutex.RLock()
+ defer s.mutex.RUnlock()
+ return s.Cache
+}
+
+func (s *childTypesSingleton) SetCache(cache map[interface{}]map[string]*ChildType) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ s.Cache = cache
+}
+
+func (s *childTypesSingleton) GetCacheEntry(key interface{}) (map[string]*ChildType, bool) {
+ s.mutex.RLock()
+ defer s.mutex.RUnlock()
+ childTypeMap, exists := s.Cache[key]
+ return childTypeMap, exists
+}
+
+func (s *childTypesSingleton) SetCacheEntry(key interface{}, value map[string]*ChildType) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ s.Cache[key] = value
+}
+
+func (s *childTypesSingleton) ResetCache() {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ s.Cache = make(map[interface{}]map[string]*ChildType)
+}
+
+// ChildType structure contains construct details of an object
+type ChildType struct {
+ ClassModule string
+ ClassType reflect.Type
+ IsContainer bool
+ Key string
+ KeyFromStr func(s string) interface{}
+}
+
+// ChildrenFields retrieves list of child objects associated to a given interface
+func ChildrenFields(cls interface{}) map[string]*ChildType {
+ if cls == nil {
+ return nil
+ }
+ var names map[string]*ChildType
+ var namesExist bool
+
+ if getChildTypes().Cache == nil {
+ getChildTypes().Cache = make(map[interface{}]map[string]*ChildType)
+ }
+
+ msgType := reflect.TypeOf(cls)
+ inst := getChildTypes()
+
+ if names, namesExist = inst.Cache[msgType.String()]; !namesExist {
+ names = make(map[string]*ChildType)
+
+ _, md := desc.ForMessage(cls.(desc.Message))
+
+ // TODO: Do we need to validate MD for nil, panic or exception?
+ for _, field := range md.Field {
+ if options := field.GetOptions(); options != nil {
+ if proto.HasExtension(options, common.E_ChildNode) {
+ isContainer := *field.Label == descriptor.FieldDescriptorProto_LABEL_REPEATED
+ meta, _ := proto.GetExtension(options, common.E_ChildNode)
+
+ var keyFromStr func(string) interface{}
+ var ct ChildType
+
+ parentType := FindOwnerType(reflect.ValueOf(cls), field.GetName(), 0, false)
+ if meta.(*common.ChildNode).GetKey() != "" {
+ keyType := FindKeyOwner(reflect.New(parentType).Elem().Interface(), meta.(*common.ChildNode).GetKey(), 0)
+
+ switch keyType.(reflect.Type).Name() {
+ case "string":
+ keyFromStr = func(s string) interface{} {
+ return s
+ }
+ case "int32":
+ keyFromStr = func(s string) interface{} {
+ i, _ := strconv.Atoi(s)
+ return int32(i)
+ }
+ case "int64":
+ keyFromStr = func(s string) interface{} {
+ i, _ := strconv.Atoi(s)
+ return int64(i)
+ }
+ case "uint32":
+ keyFromStr = func(s string) interface{} {
+ i, _ := strconv.Atoi(s)
+ return uint32(i)
+ }
+ case "uint64":
+ keyFromStr = func(s string) interface{} {
+ i, _ := strconv.Atoi(s)
+ return uint64(i)
+ }
+ default:
+ log.Errorf("Key type not implemented - type: %s\n", keyType.(reflect.Type))
+ }
+ }
+
+ ct = ChildType{
+ ClassModule: parentType.String(),
+ ClassType: parentType,
+ IsContainer: isContainer,
+ Key: meta.(*common.ChildNode).GetKey(),
+ KeyFromStr: keyFromStr,
+ }
+
+ names[field.GetName()] = &ct
+ }
+ }
+ }
+
+ getChildTypes().Cache[msgType.String()] = names
+ } else {
+ entry, _ := inst.GetCacheEntry(msgType.String())
+ log.Debugf("Cache entry for %s: %+v", msgType.String(), entry)
+ }
+
+ return names
+}
diff --git a/db/model/child_type_test.go b/db/model/child_type_test.go
new file mode 100644
index 0000000..3836858
--- /dev/null
+++ b/db/model/child_type_test.go
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "reflect"
+ "testing"
+)
+
+// Dissect a proto message by extracting all the children fields
+func TestChildType_01_Device_Proto_ChildrenFields(t *testing.T) {
+ var cls *voltha.Device
+
+ t.Logf("Extracting children fields from proto type: %s", reflect.TypeOf(cls))
+ names := ChildrenFields(cls)
+ t.Logf("Extracting children field names: %+v", names)
+
+ expectedKeys := []string{"ports", "flows", "flow_groups", "image_downloads", "pm_configs"}
+ for _, key := range expectedKeys {
+ if _, exists := names[key]; !exists {
+ t.Errorf("Missing key:%s from class type:%s", key, reflect.TypeOf(cls))
+ }
+ }
+}
+
+// Verify that the cache contains an entry for types on which ChildrenFields was performed
+func TestChildType_02_Cache_Keys(t *testing.T) {
+ if _, exists := getChildTypes().Cache[reflect.TypeOf(&voltha.Device{}).String()]; !exists {
+ t.Errorf("getChildTypeCache().Cache should have an entry of type: %+v\n", reflect.TypeOf(&voltha.Device{}).String())
+ }
+ for k := range getChildTypes().Cache {
+ t.Logf("getChildTypeCache().Cache Key:%+v\n", k)
+ }
+}
diff --git a/db/model/data_revision.go b/db/model/data_revision.go
new file mode 100644
index 0000000..35f5958
--- /dev/null
+++ b/db/model/data_revision.go
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "bytes"
+ "crypto/md5"
+ "encoding/json"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "reflect"
+)
+
+// DataRevision stores the data associated to a revision along with its calculated checksum hash value
+type DataRevision struct {
+ Data interface{}
+ Hash string
+}
+
+// NewDataRevision creates a new instance of a DataRevision structure
+func NewDataRevision(root *root, data interface{}) *DataRevision {
+ dr := DataRevision{}
+ dr.Data = data
+ dr.Hash = dr.hashData(root, data)
+
+ return &dr
+}
+
+func (dr *DataRevision) hashData(root *root, data interface{}) string {
+ var buffer bytes.Buffer
+
+ if IsProtoMessage(data) {
+ if pbdata, err := proto.Marshal(data.(proto.Message)); err != nil {
+ log.Debugf("problem to marshal protobuf data --> err: %s", err.Error())
+ } else {
+ buffer.Write(pbdata)
+ // To ensure uniqueness in case data is nil, also include data type
+ buffer.Write([]byte(reflect.TypeOf(data).String()))
+ }
+
+ } else if reflect.ValueOf(data).IsValid() {
+ dataObj := reflect.New(reflect.TypeOf(data).Elem())
+ if json, err := json.Marshal(dataObj.Interface()); err != nil {
+ log.Debugf("problem to marshal data --> err: %s", err.Error())
+ } else {
+ buffer.Write(json)
+ }
+ } else {
+ dataObj := reflect.New(reflect.TypeOf(data).Elem())
+ buffer.Write(dataObj.Bytes())
+ }
+
+ // Add the root pointer that owns the current data for extra uniqueness
+ rootPtr := fmt.Sprintf("%p", root)
+ buffer.Write([]byte(rootPtr))
+
+ return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
+}
diff --git a/db/model/event_bus.go b/db/model/event_bus.go
new file mode 100644
index 0000000..d0a21f1
--- /dev/null
+++ b/db/model/event_bus.go
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "encoding/json"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+)
+
+// EventBus contains the details required to communicate with the event bus mechanism
+type EventBus struct {
+ client *EventBusClient
+ topic string
+}
+
+// ignoredCallbacks keeps a list of callbacks that should not be advertised on the event bus
+var (
+ ignoredCallbacks = map[CallbackType]struct{}{
+ PRE_ADD: {},
+ GET: {},
+ POST_LISTCHANGE: {},
+ PRE_REMOVE: {},
+ PRE_UPDATE: {},
+ }
+)
+
+// NewEventBus creates a new instance of the EventBus structure
+func NewEventBus() *EventBus {
+ bus := &EventBus{
+ client: NewEventBusClient(),
+ topic: "model-change-events",
+ }
+ return bus
+}
+
+// Advertise will publish the provided information to the event bus
+func (bus *EventBus) Advertise(args ...interface{}) interface{} {
+ eventType := args[0].(CallbackType)
+ hash := args[1].(string)
+ data := args[2:]
+
+ if _, ok := ignoredCallbacks[eventType]; ok {
+ log.Debugf("ignoring event - type:%s, data:%+v", eventType, data)
+ }
+ var kind voltha.ConfigEventType_ConfigEventType
+ switch eventType {
+ case POST_ADD:
+ kind = voltha.ConfigEventType_add
+ case POST_REMOVE:
+ kind = voltha.ConfigEventType_remove
+ default:
+ kind = voltha.ConfigEventType_update
+ }
+
+ var msg []byte
+ var err error
+ if IsProtoMessage(data) {
+ if msg, err = proto.Marshal(data[0].(proto.Message)); err != nil {
+ log.Debugf("problem marshalling proto data: %+v, err:%s", data[0], err.Error())
+ }
+ } else if data[0] != nil {
+ if msg, err = json.Marshal(data[0]); err != nil {
+ log.Debugf("problem marshalling json data: %+v, err:%s", data[0], err.Error())
+ }
+ } else {
+ log.Debugf("no data to advertise : %+v", data[0])
+ }
+
+ event := voltha.ConfigEvent{
+ Type: kind,
+ Hash: hash,
+ Data: string(msg),
+ }
+
+ bus.client.Publish(bus.topic, event)
+
+ return nil
+}
diff --git a/db/model/event_bus_client.go b/db/model/event_bus_client.go
new file mode 100644
index 0000000..d9a8d49
--- /dev/null
+++ b/db/model/event_bus_client.go
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+)
+
+// EventBusClient is an abstraction layer structure to communicate with an event bus mechanism
+type EventBusClient struct {
+}
+
+// NewEventBusClient creates a new EventBusClient instance
+func NewEventBusClient() *EventBusClient {
+ return &EventBusClient{}
+}
+
+// Publish sends a event to the bus
+func (ebc *EventBusClient) Publish(topic string, event voltha.ConfigEvent) {
+ log.Debugf("publishing event:%+v, topic:%s\n", event, topic)
+}
diff --git a/db/model/merge.go b/db/model/merge.go
new file mode 100644
index 0000000..07ae9b9
--- /dev/null
+++ b/db/model/merge.go
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+func revisionsAreEqual(a, b []Revision) bool {
+ // If one is nil, the other must also be nil.
+ if (a == nil) != (b == nil) {
+ return false
+ }
+
+ if len(a) != len(b) {
+ return false
+ }
+
+ for i := range a {
+ if a[i] != b[i] {
+ return false
+ }
+ }
+
+ return true
+}
+
+type changeAnalysis struct {
+ KeyMap1 map[string]int
+ KeyMap2 map[string]int
+ AddedKeys map[string]struct{}
+ RemovedKeys map[string]struct{}
+ ChangedKeys map[string]struct{}
+}
+
+func newChangeAnalysis(lst1, lst2 []Revision, keyName string) *changeAnalysis {
+ changes := &changeAnalysis{}
+
+ changes.KeyMap1 = make(map[string]int)
+ changes.KeyMap2 = make(map[string]int)
+
+ changes.AddedKeys = make(map[string]struct{})
+ changes.RemovedKeys = make(map[string]struct{})
+ changes.ChangedKeys = make(map[string]struct{})
+
+ for i, rev := range lst1 {
+ _, v := GetAttributeValue(rev.GetData(), keyName, 0)
+ changes.KeyMap1[v.String()] = i
+ }
+ for i, rev := range lst2 {
+ _, v := GetAttributeValue(rev.GetData(), keyName, 0)
+ changes.KeyMap2[v.String()] = i
+ }
+ for v := range changes.KeyMap2 {
+ if _, ok := changes.KeyMap1[v]; !ok {
+ changes.AddedKeys[v] = struct{}{}
+ }
+ }
+ for v := range changes.KeyMap1 {
+ if _, ok := changes.KeyMap2[v]; !ok {
+ changes.RemovedKeys[v] = struct{}{}
+ }
+ }
+ for v := range changes.KeyMap1 {
+ if _, ok := changes.KeyMap2[v]; ok && lst1[changes.KeyMap1[v]].GetHash() != lst2[changes.KeyMap2[v]].GetHash() {
+ changes.ChangedKeys[v] = struct{}{}
+ }
+ }
+
+ return changes
+}
+
+// Merge3Way takes care of combining the revision contents of the same data set
+func Merge3Way(
+ forkRev, srcRev, dstRev Revision,
+ mergeChildFunc func(Revision) Revision,
+ dryRun bool) (rev Revision, changes []ChangeTuple) {
+
+ log.Debugw("3-way-merge-request", log.Fields{"dryRun": dryRun})
+
+ var configChanged bool
+ var revsToDiscard []Revision
+
+ if dstRev.GetConfig() == forkRev.GetConfig() {
+ configChanged = dstRev.GetConfig() != srcRev.GetConfig()
+ } else {
+ if dstRev.GetConfig().Hash != srcRev.GetConfig().Hash {
+ log.Error("config-collision")
+ }
+ configChanged = true
+ }
+
+ //newChildren := reflect.ValueOf(dstRev.GetAllChildren()).Interface().(map[string][]Revision)
+ newChildren := make(map[string][]Revision)
+ for entryName, childrenEntry := range dstRev.GetAllChildren() {
+ //newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
+ newChildren[entryName] = make([]Revision, len(childrenEntry))
+ copy(newChildren[entryName], childrenEntry)
+ }
+
+ childrenFields := ChildrenFields(forkRev.GetData())
+
+ for fieldName, field := range childrenFields {
+ forkList := forkRev.GetChildren(fieldName)
+ srcList := srcRev.GetChildren(fieldName)
+ dstList := dstRev.GetChildren(fieldName)
+
+ if revisionsAreEqual(dstList, srcList) {
+ for _, rev := range srcList {
+ mergeChildFunc(rev)
+ }
+ continue
+ }
+
+ if field.Key == "" {
+ if revisionsAreEqual(dstList, forkList) {
+ if !revisionsAreEqual(srcList, forkList) {
+ log.Error("we should not be here")
+ } else {
+ for _, rev := range srcList {
+ newChildren[fieldName] = append(newChildren[fieldName], mergeChildFunc(rev))
+ }
+ if field.IsContainer {
+ changes = append(
+ changes, ChangeTuple{POST_LISTCHANGE,
+ NewOperationContext("", nil, fieldName, ""), nil},
+ )
+ }
+ }
+ } else {
+ if !revisionsAreEqual(srcList, forkList) {
+ log.Error("cannot merge - single child node or un-keyed children list has changed")
+ }
+ }
+ } else {
+ if revisionsAreEqual(dstList, forkList) {
+ src := newChangeAnalysis(forkList, srcList, field.Key)
+
+ newList := make([]Revision, len(srcList))
+ copy(newList, srcList)
+
+ for key := range src.AddedKeys {
+ idx := src.KeyMap2[key]
+ newRev := mergeChildFunc(newList[idx])
+
+ // FIXME: newRev may come back as nil... exclude those entries for now
+ if newRev != nil {
+ newList[idx] = newRev
+ changes = append(changes, ChangeTuple{POST_ADD, newList[idx].GetData(), newRev.GetData()})
+ }
+ }
+ for key := range src.RemovedKeys {
+ oldRev := forkList[src.KeyMap1[key]]
+ revsToDiscard = append(revsToDiscard, oldRev)
+ changes = append(changes, ChangeTuple{POST_REMOVE, oldRev.GetData(), nil})
+ }
+ for key := range src.ChangedKeys {
+ idx := src.KeyMap2[key]
+ newRev := mergeChildFunc(newList[idx])
+
+ // FIXME: newRev may come back as nil... exclude those entries for now
+ if newRev != nil {
+ newList[idx] = newRev
+ }
+ }
+
+ if !dryRun {
+ newChildren[fieldName] = newList
+ }
+ } else {
+ src := newChangeAnalysis(forkList, srcList, field.Key)
+ dst := newChangeAnalysis(forkList, dstList, field.Key)
+
+ newList := make([]Revision, len(dstList))
+ copy(newList, dstList)
+
+ for key := range src.AddedKeys {
+ if _, exists := dst.AddedKeys[key]; exists {
+ childDstRev := dstList[dst.KeyMap2[key]]
+ childSrcRev := srcList[src.KeyMap2[key]]
+ if childDstRev.GetHash() == childSrcRev.GetHash() {
+ mergeChildFunc(childDstRev)
+ } else {
+ log.Error("conflict error - revision has been added is different")
+ }
+ } else {
+ newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
+ newList = append(newList, newRev)
+ changes = append(changes, ChangeTuple{POST_ADD, srcList[src.KeyMap2[key]], newRev.GetData()})
+ }
+ }
+ for key := range src.ChangedKeys {
+ if _, removed := dst.RemovedKeys[key]; removed {
+ log.Error("conflict error - revision has been removed")
+ } else if _, changed := dst.ChangedKeys[key]; changed {
+ childDstRev := dstList[dst.KeyMap2[key]]
+ childSrcRev := srcList[src.KeyMap2[key]]
+ if childDstRev.GetHash() == childSrcRev.GetHash() {
+ mergeChildFunc(childSrcRev)
+ } else if childDstRev.GetConfig().Hash != childSrcRev.GetConfig().Hash {
+ log.Error("conflict error - revision has been changed and is different")
+ } else {
+ newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
+ newList[dst.KeyMap2[key]] = newRev
+ }
+ } else {
+ newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
+ newList[dst.KeyMap2[key]] = newRev
+ }
+ }
+
+ // TODO: how do i sort this map in reverse order?
+ for key := range src.RemovedKeys {
+ if _, changed := dst.ChangedKeys[key]; changed {
+ log.Error("conflict error - revision has changed")
+ }
+ if _, removed := dst.RemovedKeys[key]; !removed {
+ dstIdx := dst.KeyMap2[key]
+ oldRev := newList[dstIdx]
+ revsToDiscard = append(revsToDiscard, oldRev)
+
+ copy(newList[dstIdx:], newList[dstIdx+1:])
+ newList[len(newList)-1] = nil
+ newList = newList[:len(newList)-1]
+
+ changes = append(changes, ChangeTuple{POST_REMOVE, oldRev.GetData(), nil})
+ }
+ }
+
+ if !dryRun {
+ newChildren[fieldName] = newList
+ }
+ }
+ }
+ }
+
+ if !dryRun && len(newChildren) > 0 {
+ if configChanged {
+ rev = srcRev
+ } else {
+ rev = dstRev
+ }
+
+ for _, discarded := range revsToDiscard {
+ discarded.Drop("", true)
+ }
+
+ // FIXME: Do not discard the latest value for now
+ //dstRev.GetBranch().GetLatest().Drop("", configChanged)
+ rev = rev.UpdateAllChildren(newChildren, dstRev.GetBranch())
+
+ if configChanged {
+ changes = append(changes, ChangeTuple{POST_UPDATE, dstRev.GetBranch().GetLatest().GetData(), rev.GetData()})
+ }
+ return rev, changes
+ }
+
+ return nil, nil
+}
diff --git a/db/model/model.go b/db/model/model.go
new file mode 100644
index 0000000..ba4a9b1
--- /dev/null
+++ b/db/model/model.go
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+func init() {
+ log.AddPackage(log.JSON, log.InfoLevel, log.Fields{"instanceId": "DB_MODEL"})
+ defer log.CleanUp()
+}
+
+const (
+ // period to determine when data requires a refresh (in milliseconds)
+ // TODO: make this configurable?
+ DataRefreshPeriod int64 = 5000
+
+ // Attribute used to store a timestamp in the context object
+ RequestTimestamp = "request-timestamp"
+
+ // Time limit for a KV path reservation (in seconds)
+ ReservationTTL int64 = 180
+)
diff --git a/db/model/node.go b/db/model/node.go
new file mode 100644
index 0000000..264a9dd
--- /dev/null
+++ b/db/model/node.go
@@ -0,0 +1,1161 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+// TODO: proper error handling
+// TODO: proper logging
+
+import (
+ "context"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+)
+
+// When a branch has no transaction id, everything gets stored in NONE
+const (
+ NONE string = "none"
+)
+
+// Node interface is an abstraction of the node data structure
+type Node interface {
+ MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
+
+ // CRUD functions
+ Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
+ Get(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
+ List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
+ Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision
+ Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision
+ CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy
+
+ GetProxy() *Proxy
+
+ MakeBranch(txid string) *Branch
+ DeleteBranch(txid string)
+ MergeBranch(txid string, dryRun bool) (Revision, error)
+
+ MakeTxBranch() string
+ DeleteTxBranch(txid string)
+ FoldTxBranch(txid string)
+}
+
+type node struct {
+ mutex sync.RWMutex
+ Root *root
+ Type interface{}
+ Branches map[string]*Branch
+ Tags map[string]Revision
+ Proxy *Proxy
+ EventBus *EventBus
+ AutoPrune bool
+}
+
+// ChangeTuple holds details of modifications made to a revision
+type ChangeTuple struct {
+ Type CallbackType
+ PreviousData interface{}
+ LatestData interface{}
+}
+
+// NewNode creates a new instance of the node data structure
+func NewNode(root *root, initialData interface{}, autoPrune bool, txid string) *node {
+ n := &node{}
+
+ n.Root = root
+ n.Branches = make(map[string]*Branch)
+ n.Tags = make(map[string]Revision)
+ n.Proxy = nil
+ n.EventBus = nil
+ n.AutoPrune = autoPrune
+
+ if IsProtoMessage(initialData) {
+ n.Type = reflect.ValueOf(initialData).Interface()
+ dataCopy := proto.Clone(initialData.(proto.Message))
+ n.initialize(dataCopy, txid)
+ } else if reflect.ValueOf(initialData).IsValid() {
+ // FIXME: this block does not reflect the original implementation
+ // it should be checking if the provided initial_data is already a type!??!
+ // it should be checked before IsProtoMessage
+ n.Type = reflect.ValueOf(initialData).Interface()
+ } else {
+ // not implemented error
+ log.Errorf("cannot process initial data - %+v", initialData)
+ }
+
+ return n
+}
+
+// MakeNode creates a new node in the tree
+func (n *node) MakeNode(data interface{}, txid string) *node {
+ return NewNode(n.Root, data, true, txid)
+}
+
+// MakeRevision create a new revision of the node in the tree
+func (n *node) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ return n.GetRoot().MakeRevision(branch, data, children)
+}
+
+// makeLatest will mark the revision of a node as being the latest
+func (n *node) makeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
+ // Keep a reference to the current revision
+ var previous string
+ if branch.GetLatest() != nil {
+ previous = branch.GetLatest().GetHash()
+ }
+
+ branch.AddRevision(revision)
+
+ // If anything is new, then set the revision as the latest
+ if branch.GetLatest() == nil || revision.GetHash() != branch.GetLatest().GetHash() {
+ if revision.GetName() != "" {
+ log.Debugw("saving-latest-data", log.Fields{"hash": revision.GetHash(), "data": revision.GetData()})
+ // Tag a timestamp to that revision
+ revision.SetLastUpdate()
+ GetRevCache().Set(revision.GetName(), revision)
+ }
+ branch.SetLatest(revision)
+ }
+
+ // Delete the previous revision if anything has changed
+ if previous != "" && previous != branch.GetLatest().GetHash() {
+ branch.DeleteRevision(previous)
+ }
+
+ if changeAnnouncement != nil && branch.Txid == "" {
+ if n.Proxy != nil {
+ for _, change := range changeAnnouncement {
+ log.Debugw("adding-callback",
+ log.Fields{
+ "callbacks": n.GetProxy().getCallbacks(change.Type),
+ "type": change.Type,
+ "previousData": change.PreviousData,
+ "latestData": change.LatestData,
+ })
+ n.Root.AddCallback(
+ n.GetProxy().InvokeCallbacks,
+ change.Type,
+ true,
+ change.PreviousData,
+ change.LatestData)
+ }
+ }
+ }
+}
+
+// Latest returns the latest revision of node with or without the transaction id
+func (n *node) Latest(txid ...string) Revision {
+ var branch *Branch
+
+ if len(txid) > 0 && txid[0] != "" {
+ if branch = n.GetBranch(txid[0]); branch != nil {
+ return branch.GetLatest()
+ }
+ } else if branch = n.GetBranch(NONE); branch != nil {
+ return branch.GetLatest()
+ }
+ return nil
+}
+
+// initialize prepares the content of a node along with its possible ramifications
+func (n *node) initialize(data interface{}, txid string) {
+ children := make(map[string][]Revision)
+ for fieldName, field := range ChildrenFields(n.Type) {
+ _, fieldValue := GetAttributeValue(data, fieldName, 0)
+
+ if fieldValue.IsValid() {
+ if field.IsContainer {
+ if field.Key != "" {
+ for i := 0; i < fieldValue.Len(); i++ {
+ v := fieldValue.Index(i)
+
+ if rev := n.MakeNode(v.Interface(), txid).Latest(txid); rev != nil {
+ children[fieldName] = append(children[fieldName], rev)
+ }
+
+ // TODO: The following logic was ported from v1.0. Need to verify if it is required
+ //var keysSeen []string
+ //_, key := GetAttributeValue(v.Interface(), field.Key, 0)
+ //for _, k := range keysSeen {
+ // if k == key.String() {
+ // //log.Errorf("duplicate key - %s", k)
+ // }
+ //}
+ //keysSeen = append(keysSeen, key.String())
+ }
+
+ } else {
+ for i := 0; i < fieldValue.Len(); i++ {
+ v := fieldValue.Index(i)
+ if newNodeRev := n.MakeNode(v.Interface(), txid).Latest(); newNodeRev != nil {
+ children[fieldName] = append(children[fieldName], newNodeRev)
+ }
+ }
+ }
+ } else {
+ if newNodeRev := n.MakeNode(fieldValue.Interface(), txid).Latest(); newNodeRev != nil {
+ children[fieldName] = append(children[fieldName], newNodeRev)
+ }
+ }
+ } else {
+ log.Errorf("field is invalid - %+v", fieldValue)
+ }
+ }
+
+ branch := NewBranch(n, "", nil, n.AutoPrune)
+ rev := n.MakeRevision(branch, data, children)
+ n.makeLatest(branch, rev, nil)
+
+ if txid == "" {
+ n.SetBranch(NONE, branch)
+ } else {
+ n.SetBranch(txid, branch)
+ }
+}
+
+// findRevByKey retrieves a specific revision from a node tree
+func (n *node) findRevByKey(revs []Revision, keyName string, value interface{}) (int, Revision) {
+ for i, rev := range revs {
+ dataValue := reflect.ValueOf(rev.GetData())
+ dataStruct := GetAttributeStructure(rev.GetData(), keyName, 0)
+
+ fieldValue := dataValue.Elem().FieldByName(dataStruct.Name)
+
+ a := fmt.Sprintf("%s", fieldValue.Interface())
+ b := fmt.Sprintf("%s", value)
+ if a == b {
+ return i, revs[i]
+ }
+ }
+
+ return -1, nil
+}
+
+// Get retrieves the data from a node tree that resides at the specified path
+func (n *node) List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{} {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+
+ log.Debugw("node-list-request", log.Fields{"path": path, "hash": hash, "depth": depth, "deep": deep, "txid": txid})
+ if deep {
+ depth = -1
+ }
+
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+
+ var branch *Branch
+ var rev Revision
+
+ if branch = n.GetBranch(txid); txid == "" || branch == nil {
+ branch = n.GetBranch(NONE)
+ }
+
+ if hash != "" {
+ rev = branch.GetRevision(hash)
+ } else {
+ rev = branch.GetLatest()
+ }
+
+ var result interface{}
+ var prList []interface{}
+ if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil {
+ for _, revEntry := range pr {
+ prList = append(prList, revEntry.GetData())
+ }
+ result = prList
+ }
+
+ return result
+}
+
+// Get retrieves the data from a node tree that resides at the specified path
+func (n *node) Get(ctx context.Context, path string, hash string, depth int, reconcile bool, txid string) interface{} {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+
+ log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
+
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+
+ var branch *Branch
+ var rev Revision
+
+ if branch = n.GetBranch(txid); txid == "" || branch == nil {
+ branch = n.GetBranch(NONE)
+ }
+
+ if hash != "" {
+ rev = branch.GetRevision(hash)
+ } else {
+ rev = branch.GetLatest()
+ }
+
+ var result interface{}
+
+ // If there is no request to reconcile, try to get it from memory
+ if !reconcile {
+ // Try to find an entry matching the path value from one of these sources
+ // 1. Start with the cache which stores revisions by watch names
+ // 2. Then look in the revision tree, especially if it's a sub-path such as /devices/1234/flows
+ // 3. Move on to the KV store if that path cannot be found or if the entry has expired
+ if entry, exists := GetRevCache().Get(path); exists && entry.(Revision) != nil {
+ entryAge := time.Now().Sub(entry.(Revision).GetLastUpdate()).Nanoseconds() / int64(time.Millisecond)
+ if entryAge < DataRefreshPeriod {
+ log.Debugw("using-cache-entry", log.Fields{
+ "path": path,
+ "hash": hash,
+ "age": entryAge,
+ })
+ return proto.Clone(entry.(Revision).GetData().(proto.Message))
+ } else {
+ log.Debugw("cache-entry-expired", log.Fields{"path": path, "hash": hash, "age": entryAge})
+ }
+ } else if result = n.getPath(ctx, rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
+ log.Debugw("using-rev-tree-entry", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
+ return result
+ } else {
+ log.Debugw("not-using-cache-entry", log.Fields{
+ "path": path,
+ "hash": hash, "depth": depth,
+ "reconcile": reconcile,
+ "txid": txid,
+ })
+ }
+ } else {
+ log.Debugw("reconcile-requested", log.Fields{
+ "path": path,
+ "hash": hash,
+ "reconcile": reconcile,
+ })
+ }
+
+ // If we got to this point, we are either trying to reconcile with the db
+ // or we simply failed at getting information from memory
+ if n.Root.KvStore != nil {
+ if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil && len(pr) > 0 {
+ // Did we receive a single or multiple revisions?
+ if len(pr) > 1 {
+ var revs []interface{}
+ for _, revEntry := range pr {
+ revs = append(revs, revEntry.GetData())
+ }
+ result = revs
+ } else {
+ result = pr[0].GetData()
+ }
+ }
+ }
+
+ return result
+}
+
+//getPath traverses the specified path and retrieves the data associated to it
+func (n *node) getPath(ctx context.Context, rev Revision, path string, depth int) interface{} {
+ if path == "" {
+ return n.getData(rev, depth)
+ }
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+
+ names := ChildrenFields(n.Type)
+ field := names[name]
+
+ if field != nil && field.IsContainer {
+ children := make([]Revision, len(rev.GetChildren(name)))
+ copy(children, rev.GetChildren(name))
+
+ if field.Key != "" {
+ if path != "" {
+ partition = strings.SplitN(path, "/", 2)
+ key := partition[0]
+ path = ""
+ keyValue := field.KeyFromStr(key)
+ if _, childRev := n.findRevByKey(children, field.Key, keyValue); childRev == nil {
+ return nil
+ } else {
+ childNode := childRev.GetNode()
+ return childNode.getPath(ctx, childRev, path, depth)
+ }
+ } else {
+ var response []interface{}
+ for _, childRev := range children {
+ childNode := childRev.GetNode()
+ value := childNode.getData(childRev, depth)
+ response = append(response, value)
+ }
+ return response
+ }
+ } else {
+ var response []interface{}
+ if path != "" {
+ // TODO: raise error
+ return response
+ }
+ for _, childRev := range children {
+ childNode := childRev.GetNode()
+ value := childNode.getData(childRev, depth)
+ response = append(response, value)
+ }
+ return response
+ }
+ } else if children := rev.GetChildren(name); children != nil && len(children) > 0 {
+ childRev := children[0]
+ childNode := childRev.GetNode()
+ return childNode.getPath(ctx, childRev, path, depth)
+ }
+
+ return nil
+}
+
+// getData retrieves the data from a node revision
+func (n *node) getData(rev Revision, depth int) interface{} {
+ msg := rev.GetBranch().GetLatest().Get(depth)
+ var modifiedMsg interface{}
+
+ if n.GetProxy() != nil {
+ log.Debugw("invoking-get-callbacks", log.Fields{"data": msg})
+ if modifiedMsg = n.GetProxy().InvokeCallbacks(GET, false, msg); modifiedMsg != nil {
+ msg = modifiedMsg
+ }
+
+ }
+
+ return msg
+}
+
+// Update changes the content of a node at the specified path with the provided data
+func (n *node) Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+
+ log.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid": txid})
+
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+
+ var branch *Branch
+ if txid == "" {
+ branch = n.GetBranch(NONE)
+ } else if branch = n.GetBranch(txid); branch == nil {
+ branch = makeBranch(n)
+ }
+
+ if branch.GetLatest() != nil {
+ log.Debugf("Branch data : %+v, Passed data: %+v", branch.GetLatest().GetData(), data)
+ }
+ if path == "" {
+ return n.doUpdate(ctx, branch, data, strict)
+ }
+
+ rev := branch.GetLatest()
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+
+ field := ChildrenFields(n.Type)[name]
+ var children []Revision
+
+ if field == nil {
+ return n.doUpdate(ctx, branch, data, strict)
+ }
+
+ if field.IsContainer {
+ if path == "" {
+ log.Errorf("cannot update a list")
+ } else if field.Key != "" {
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+
+ children = make([]Revision, len(rev.GetChildren(name)))
+ copy(children, rev.GetChildren(name))
+
+ idx, childRev := n.findRevByKey(children, field.Key, keyValue)
+
+ if childRev == nil {
+ log.Debugw("child-revision-is-nil", log.Fields{"key": keyValue})
+ return branch.GetLatest()
+ }
+
+ childNode := childRev.GetNode()
+
+ // Save proxy in child node to ensure callbacks are called later on
+ // only assign in cases of non sub-folder proxies, i.e. "/"
+ if childNode.Proxy == nil && n.Proxy != nil && n.GetProxy().getFullPath() == "" {
+ childNode.Proxy = n.Proxy
+ }
+
+ newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
+
+ if newChildRev.GetHash() == childRev.GetHash() {
+ if newChildRev != childRev {
+ log.Debug("clear-hash - %s %+v", newChildRev.GetHash(), newChildRev)
+ newChildRev.ClearHash()
+ }
+ log.Debugw("child-revisions-have-matching-hash", log.Fields{"hash": childRev.GetHash(), "key": keyValue})
+ return branch.GetLatest()
+ }
+
+ _, newKey := GetAttributeValue(newChildRev.GetData(), field.Key, 0)
+
+ _newKeyType := fmt.Sprintf("%s", newKey)
+ _keyValueType := fmt.Sprintf("%s", keyValue)
+
+ if _newKeyType != _keyValueType {
+ log.Errorf("cannot change key field")
+ }
+
+ // Prefix the hash value with the data type (e.g. devices, logical_devices, adapters)
+ newChildRev.SetName(name + "/" + _keyValueType)
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ if idx >= 0 {
+ children[idx] = newChildRev
+ } else {
+ children = append(children, newChildRev)
+ }
+
+ updatedRev := rev.UpdateChildren(ctx, name, children, branch)
+
+ n.makeLatest(branch, updatedRev, nil)
+ updatedRev.ChildDrop(name, childRev.GetHash())
+
+ return newChildRev
+
+ } else {
+ log.Errorf("cannot index into container with no keys")
+ }
+ } else {
+ childRev := rev.GetChildren(name)[0]
+ childNode := childRev.GetNode()
+ newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ updatedRev := rev.UpdateChildren(ctx, name, []Revision{newChildRev}, branch)
+ n.makeLatest(branch, updatedRev, nil)
+
+ updatedRev.ChildDrop(name, childRev.GetHash())
+
+ return newChildRev
+ }
+
+ return nil
+}
+
+func (n *node) doUpdate(ctx context.Context, branch *Branch, data interface{}, strict bool) Revision {
+ log.Debugw("comparing-types", log.Fields{"expected": reflect.ValueOf(n.Type).Type(), "actual": reflect.TypeOf(data)})
+
+ if reflect.TypeOf(data) != reflect.ValueOf(n.Type).Type() {
+ // TODO raise error
+ log.Errorw("types-do-not-match: %+v", log.Fields{"actual": reflect.TypeOf(data), "expected": n.Type})
+ return nil
+ }
+
+ // TODO: validate that this actually works
+ //if n.hasChildren(data) {
+ // return nil
+ //}
+
+ if n.GetProxy() != nil {
+ log.Debug("invoking proxy PRE_UPDATE Callbacks")
+ n.GetProxy().InvokeCallbacks(PRE_UPDATE, false, branch.GetLatest(), data)
+ }
+
+ if branch.GetLatest().GetData().(proto.Message).String() != data.(proto.Message).String() {
+ if strict {
+ // TODO: checkAccessViolations(data, Branch.GetLatest.data)
+ log.Debugf("checking access violations")
+ }
+
+ rev := branch.GetLatest().UpdateData(ctx, data, branch)
+ changes := []ChangeTuple{{POST_UPDATE, branch.GetLatest().GetData(), rev.GetData()}}
+ n.makeLatest(branch, rev, changes)
+
+ return rev
+ }
+ return branch.GetLatest()
+}
+
+// Add inserts a new node at the specified path with the provided data
+func (n *node) Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+
+ log.Debugw("node-add-request", log.Fields{"path": path, "txid": txid})
+
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+ if path == "" {
+ // TODO raise error
+ log.Errorf("cannot add for non-container mode")
+ return nil
+ }
+
+ var branch *Branch
+ if txid == "" {
+ branch = n.GetBranch(NONE)
+ } else if branch = n.GetBranch(txid); branch == nil {
+ branch = makeBranch(n)
+ }
+
+ rev := branch.GetLatest()
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+
+ field := ChildrenFields(n.Type)[name]
+
+ var children []Revision
+
+ if field.IsContainer {
+ if path == "" {
+ if field.Key != "" {
+ if n.GetProxy() != nil {
+ log.Debug("invoking proxy PRE_ADD Callbacks")
+ n.GetProxy().InvokeCallbacks(PRE_ADD, false, data)
+ }
+
+ children = make([]Revision, len(rev.GetChildren(name)))
+ copy(children, rev.GetChildren(name))
+
+ _, key := GetAttributeValue(data, field.Key, 0)
+
+ if _, exists := n.findRevByKey(children, field.Key, key.String()); exists != nil {
+ // TODO raise error
+ log.Warnw("duplicate-key-found", log.Fields{"key": key.String()})
+ return exists
+ }
+ childRev := n.MakeNode(data, "").Latest()
+
+ // Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
+ childRev.SetName(name + "/" + key.String())
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ children = append(children, childRev)
+
+ updatedRev := rev.UpdateChildren(ctx, name, children, branch)
+ changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
+ childRev.SetupWatch(childRev.GetName())
+
+ n.makeLatest(branch, updatedRev, changes)
+
+ return childRev
+ }
+ log.Errorf("cannot add to non-keyed container")
+
+ } else if field.Key != "" {
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+
+ children = make([]Revision, len(rev.GetChildren(name)))
+ copy(children, rev.GetChildren(name))
+
+ idx, childRev := n.findRevByKey(children, field.Key, keyValue)
+
+ if childRev == nil {
+ return branch.GetLatest()
+ }
+
+ childNode := childRev.GetNode()
+ newChildRev := childNode.Add(ctx, path, data, txid, makeBranch)
+
+ // Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
+ newChildRev.SetName(name + "/" + keyValue.(string))
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ if idx >= 0 {
+ children[idx] = newChildRev
+ } else {
+ children = append(children, newChildRev)
+ }
+
+ updatedRev := rev.UpdateChildren(ctx, name, children, branch)
+ n.makeLatest(branch, updatedRev, nil)
+
+ updatedRev.ChildDrop(name, childRev.GetHash())
+
+ return newChildRev
+ } else {
+ log.Errorf("cannot add to non-keyed container")
+ }
+ } else {
+ log.Errorf("cannot add to non-container field")
+ }
+
+ return nil
+}
+
+// Remove eliminates a node at the specified path
+func (n *node) Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+
+ log.Debugw("node-remove-request", log.Fields{"path": path, "txid": txid, "makeBranch": makeBranch})
+
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+ if path == "" {
+ // TODO raise error
+ log.Errorf("cannot remove for non-container mode")
+ }
+ var branch *Branch
+ if txid == "" {
+ branch = n.GetBranch(NONE)
+ } else if branch = n.GetBranch(txid); branch == nil {
+ branch = makeBranch(n)
+ }
+
+ rev := branch.GetLatest()
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+
+ field := ChildrenFields(n.Type)[name]
+ var children []Revision
+ postAnnouncement := []ChangeTuple{}
+
+ if field.IsContainer {
+ if path == "" {
+ log.Errorw("cannot-remove-without-key", log.Fields{"name": name, "key": path})
+ } else if field.Key != "" {
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+
+ keyValue := field.KeyFromStr(key)
+ children = make([]Revision, len(rev.GetChildren(name)))
+ copy(children, rev.GetChildren(name))
+
+ if path != "" {
+ if idx, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil {
+ childNode := childRev.GetNode()
+ if childNode.Proxy == nil {
+ childNode.Proxy = n.Proxy
+ }
+ newChildRev := childNode.Remove(ctx, path, txid, makeBranch)
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ if idx >= 0 {
+ children[idx] = newChildRev
+ } else {
+ children = append(children, newChildRev)
+ }
+
+ rev.SetChildren(name, children)
+ branch.GetLatest().Drop(txid, false)
+ n.makeLatest(branch, rev, nil)
+ }
+ return branch.GetLatest()
+ }
+
+ if idx, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil && idx >= 0 {
+ if n.GetProxy() != nil {
+ data := childRev.GetData()
+ n.GetProxy().InvokeCallbacks(PRE_REMOVE, false, data)
+ postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, data, nil})
+ } else {
+ postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, childRev.GetData(), nil})
+ }
+
+ childRev.StorageDrop(txid, true)
+ GetRevCache().Delete(childRev.GetName())
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ children = append(children[:idx], children[idx+1:]...)
+ rev.SetChildren(name, children)
+
+ branch.GetLatest().Drop(txid, false)
+ n.makeLatest(branch, rev, postAnnouncement)
+
+ return rev
+ } else {
+ log.Errorw("failed-to-find-revision", log.Fields{"name": name, "key": keyValue.(string)})
+ }
+ }
+ log.Errorw("cannot-add-to-non-keyed-container", log.Fields{"name": name, "path": path, "fieldKey": field.Key})
+
+ } else {
+ log.Errorw("cannot-add-to-non-container-field", log.Fields{"name": name, "path": path})
+ }
+
+ return nil
+}
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+// MakeBranchFunction is a type for function references intented to create a branch
+type MakeBranchFunction func(*node) *Branch
+
+// MakeBranch creates a new branch for the provided transaction id
+func (n *node) MakeBranch(txid string) *Branch {
+ branchPoint := n.GetBranch(NONE).GetLatest()
+ branch := NewBranch(n, txid, branchPoint, true)
+ n.SetBranch(txid, branch)
+ return branch
+}
+
+// DeleteBranch removes a branch with the specified id
+func (n *node) DeleteBranch(txid string) {
+ delete(n.Branches, txid)
+}
+
+func (n *node) mergeChild(txid string, dryRun bool) func(Revision) Revision {
+ f := func(rev Revision) Revision {
+ childBranch := rev.GetBranch()
+
+ if childBranch.Txid == txid {
+ rev, _ = childBranch.Node.MergeBranch(txid, dryRun)
+ }
+
+ return rev
+ }
+ return f
+}
+
+// MergeBranch will integrate the contents of a transaction branch within the latest branch of a given node
+func (n *node) MergeBranch(txid string, dryRun bool) (Revision, error) {
+ srcBranch := n.GetBranch(txid)
+ dstBranch := n.GetBranch(NONE)
+
+ forkRev := srcBranch.Origin
+ srcRev := srcBranch.GetLatest()
+ dstRev := dstBranch.GetLatest()
+
+ rev, changes := Merge3Way(forkRev, srcRev, dstRev, n.mergeChild(txid, dryRun), dryRun)
+
+ if !dryRun {
+ if rev != nil {
+ rev.SetName(dstRev.GetName())
+ n.makeLatest(dstBranch, rev, changes)
+ }
+ n.DeleteBranch(txid)
+ }
+
+ // TODO: return proper error when one occurs
+ return rev, nil
+}
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+//func (n *node) diff(hash1, hash2, txid string) {
+// branch := n.Branches[txid]
+// rev1 := branch.GetHash(hash1)
+// rev2 := branch.GetHash(hash2)
+//
+// if rev1.GetHash() == rev2.GetHash() {
+// // empty patch
+// } else {
+// // translate data to json and generate patch
+// patch, err := jsonpatch.MakePatch(rev1.GetData(), rev2.GetData())
+// patch.
+// }
+//}
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tag utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+// TODO: is tag mgmt used in the python implementation? Need to validate
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+func (n *node) hasChildren(data interface{}) bool {
+ for fieldName, field := range ChildrenFields(n.Type) {
+ _, fieldValue := GetAttributeValue(data, fieldName, 0)
+
+ if (field.IsContainer && fieldValue.Len() > 0) || !fieldValue.IsNil() {
+ log.Error("cannot update external children")
+ return true
+ }
+ }
+
+ return false
+}
+
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ node Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+// CreateProxy returns a reference to a sub-tree of the data model
+func (n *node) CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy {
+ return n.createProxy(ctx, path, path, n, exclusive)
+}
+
+func (n *node) createProxy(ctx context.Context, path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+ log.Debugw("node-create-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "fullPath": fullPath,
+ })
+
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+ if path == "" {
+ return n.makeProxy(path, fullPath, parentNode, exclusive)
+ }
+
+ rev := n.GetBranch(NONE).GetLatest()
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+ var nodeType interface{}
+ if len(partition) < 2 {
+ path = ""
+ nodeType = n.Type
+ } else {
+ path = partition[1]
+ nodeType = parentNode.Type
+ }
+
+ field := ChildrenFields(nodeType)[name]
+
+ if field != nil {
+ if field.IsContainer {
+ log.Debugw("container-field", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "name": name,
+ })
+ if path == "" {
+ log.Debugw("folder-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ newNode := n.MakeNode(reflect.New(field.ClassType.Elem()).Interface(), "")
+ return newNode.makeProxy(path, fullPath, parentNode, exclusive)
+ } else if field.Key != "" {
+ log.Debugw("key-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+ var children []Revision
+ children = make([]Revision, len(rev.GetChildren(name)))
+ copy(children, rev.GetChildren(name))
+
+ var childRev Revision
+ if _, childRev = n.findRevByKey(children, field.Key, keyValue); childRev != nil {
+ log.Debugw("found-revision-matching-key-in-memory", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ } else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(ctx, fullPath, "", nil); revs != nil && len(revs) > 0 {
+ log.Debugw("found-revision-matching-key-in-db", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ childRev = revs[0]
+ } else {
+ log.Debugw("no-revision-matching-key", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ }
+ if childRev != nil {
+ childNode := childRev.GetNode()
+ return childNode.createProxy(ctx, path, fullPath, n, exclusive)
+ }
+ } else {
+ log.Errorw("cannot-access-index-of-empty-container", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "name": name,
+ })
+ }
+ } else {
+ log.Debugw("non-container-field", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "name": name,
+ })
+ childRev := rev.GetChildren(name)[0]
+ childNode := childRev.GetNode()
+ return childNode.createProxy(ctx, path, fullPath, n, exclusive)
+ }
+ } else {
+ log.Debugw("field-object-is-nil", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "fullPath": fullPath,
+ "name": name,
+ })
+ }
+
+ log.Warnw("cannot-create-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "fullPath": fullPath,
+ "latest-rev": rev.GetHash(),
+ })
+ return nil
+}
+
+func (n *node) makeProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+ log.Debugw("node-make-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "fullPath": fullPath,
+ })
+
+ r := &root{
+ node: n,
+ Callbacks: n.Root.GetCallbacks(),
+ NotificationCallbacks: n.Root.GetNotificationCallbacks(),
+ DirtyNodes: n.Root.DirtyNodes,
+ KvStore: n.Root.KvStore,
+ Loading: n.Root.Loading,
+ RevisionClass: n.Root.RevisionClass,
+ }
+
+ if n.Proxy == nil {
+ log.Debugw("constructing-new-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.Type).Type(),
+ "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
+ "path": path,
+ "fullPath": fullPath,
+ })
+ n.Proxy = NewProxy(r, n, parentNode, path, fullPath, exclusive)
+ } else {
+ log.Debugw("node-has-existing-proxy", log.Fields{
+ "node-type": reflect.ValueOf(n.GetProxy().Node.Type).Type(),
+ "parent-node-type": reflect.ValueOf(n.GetProxy().ParentNode.Type).Type(),
+ "path": n.GetProxy().Path,
+ "fullPath": n.GetProxy().FullPath,
+ })
+ if n.GetProxy().Exclusive {
+ log.Error("node is already owned exclusively")
+ }
+ }
+
+ return n.Proxy
+}
+
+func (n *node) makeEventBus() *EventBus {
+ if n.EventBus == nil {
+ n.EventBus = NewEventBus()
+ }
+ return n.EventBus
+}
+
+func (n *node) SetProxy(proxy *Proxy) {
+ n.Proxy = proxy
+}
+
+func (n *node) GetProxy() *Proxy {
+ return n.Proxy
+}
+
+func (n *node) GetBranch(key string) *Branch {
+ if n.Branches != nil {
+ if branch, exists := n.Branches[key]; exists {
+ return branch
+ }
+ }
+ return nil
+}
+
+func (n *node) SetBranch(key string, branch *Branch) {
+ n.Branches[key] = branch
+}
+
+func (n *node) GetRoot() *root {
+ return n.Root
+}
+func (n *node) SetRoot(root *root) {
+ n.Root = root
+}
diff --git a/db/model/node_test.go b/db/model/node_test.go
new file mode 100644
index 0000000..7e0a3ce
--- /dev/null
+++ b/db/model/node_test.go
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "crypto/md5"
+ "fmt"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-protos/v2/go/common"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "reflect"
+ "testing"
+)
+
+var (
+ TestNode_Port = []*voltha.Port{
+ {
+ PortNo: 123,
+ Label: "test-etcd_port-0",
+ Type: voltha.Port_PON_OLT,
+ AdminState: common.AdminState_ENABLED,
+ OperStatus: common.OperStatus_ACTIVE,
+ DeviceId: "etcd_port-0-device-id",
+ Peers: []*voltha.Port_PeerPort{},
+ },
+ }
+
+ TestNode_Device = &voltha.Device{
+ Id: "Config-SomeNode-01-new-test",
+ Type: "simulated_olt",
+ Root: true,
+ ParentId: "",
+ ParentPortNo: 0,
+ Vendor: "voltha-test",
+ Model: "GetLatest-voltha-simulated-olt",
+ HardwareVersion: "1.0.0",
+ FirmwareVersion: "1.0.0",
+ Images: &voltha.Images{},
+ SerialNumber: "abcdef-123456",
+ VendorId: "DEADBEEF-INC",
+ Adapter: "simulated_olt",
+ Vlan: 1234,
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ ExtraArgs: "",
+ ProxyAddress: &voltha.Device_ProxyAddress{},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ OperStatus: common.OperStatus_ACTIVE,
+ Reason: "",
+ ConnectStatus: common.ConnectStatus_REACHABLE,
+ Custom: &any.Any{},
+ Ports: TestNode_Port,
+ Flows: &openflow_13.Flows{},
+ FlowGroups: &openflow_13.FlowGroups{},
+ PmConfigs: &voltha.PmConfigs{},
+ ImageDownloads: []*voltha.ImageDownload{},
+ }
+
+ TestNode_Data = TestNode_Device
+
+ TestNode_Txid = fmt.Sprintf("%x", md5.Sum([]byte("node_transaction_id")))
+ TestNode_Root = &root{RevisionClass: reflect.TypeOf(NonPersistedRevision{})}
+)
+
+// Exercise node creation code
+// This test will
+func TestNode_01_NewNode(t *testing.T) {
+ node := NewNode(TestNode_Root, TestNode_Data, false, TestNode_Txid)
+
+ if reflect.ValueOf(node.Type).Type() != reflect.TypeOf(TestNode_Data) {
+ t.Errorf("Node type does not match original data type: %+v", reflect.ValueOf(node.Type).Type())
+ } else if node.GetBranch(TestNode_Txid) == nil || node.GetBranch(TestNode_Txid).Latest == nil {
+ t.Errorf("No branch associated to txid: %s", TestNode_Txid)
+ } else if node.GetBranch(TestNode_Txid).Latest == nil {
+ t.Errorf("Branch has no latest revision : %s", TestNode_Txid)
+ } else if node.GetBranch(TestNode_Txid).GetLatest().GetConfig() == nil {
+ t.Errorf("Latest revision has no assigned data: %+v", node.GetBranch(TestNode_Txid).GetLatest())
+ }
+
+ t.Logf("Created new node successfully : %+v\n", node)
+}
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
new file mode 100644
index 0000000..384caed
--- /dev/null
+++ b/db/model/non_persisted_revision.go
@@ -0,0 +1,514 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "reflect"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+)
+
+// TODO: Cache logic will have to be revisited to cleanup unused entries in memory (disabled for now)
+//
+type revCacheSingleton struct {
+ sync.RWMutex
+ Cache sync.Map
+}
+
+func (s *revCacheSingleton) Get(path string) (interface{}, bool) {
+ return s.Cache.Load(path)
+}
+func (s *revCacheSingleton) Set(path string, value interface{}) {
+ s.Cache.Store(path, value)
+}
+func (s *revCacheSingleton) Delete(path string) {
+ s.Cache.Delete(path)
+}
+
+var revCacheInstance *revCacheSingleton
+var revCacheOnce sync.Once
+
+func GetRevCache() *revCacheSingleton {
+ revCacheOnce.Do(func() {
+ revCacheInstance = &revCacheSingleton{Cache: sync.Map{}}
+ })
+ return revCacheInstance
+}
+
+type NonPersistedRevision struct {
+ mutex sync.RWMutex
+ Root *root
+ Config *DataRevision
+ childrenLock sync.RWMutex
+ Children map[string][]Revision
+ Hash string
+ Branch *Branch
+ WeakRef string
+ Name string
+ lastUpdate time.Time
+}
+
+func NewNonPersistedRevision(root *root, branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ r := &NonPersistedRevision{}
+ r.Root = root
+ r.Branch = branch
+ r.Config = NewDataRevision(root, data)
+ r.Children = children
+ r.Hash = r.hashContent()
+ return r
+}
+
+func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+ npr.Config = config
+}
+
+func (npr *NonPersistedRevision) GetConfig() *DataRevision {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+ return npr.Config
+}
+
+func (npr *NonPersistedRevision) SetAllChildren(children map[string][]Revision) {
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+ npr.Children = make(map[string][]Revision)
+
+ for key, value := range children {
+ npr.Children[key] = make([]Revision, len(value))
+ copy(npr.Children[key], value)
+ }
+}
+
+func (npr *NonPersistedRevision) SetChildren(name string, children []Revision) {
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
+ npr.Children[name] = make([]Revision, len(children))
+ copy(npr.Children[name], children)
+}
+
+func (npr *NonPersistedRevision) GetAllChildren() map[string][]Revision {
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
+ return npr.Children
+}
+
+func (npr *NonPersistedRevision) GetChildren(name string) []Revision {
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
+ if _, exists := npr.Children[name]; exists {
+ return npr.Children[name]
+ }
+ return nil
+}
+
+func (npr *NonPersistedRevision) SetHash(hash string) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+ npr.Hash = hash
+}
+
+func (npr *NonPersistedRevision) GetHash() string {
+ //npr.mutex.Lock()
+ //defer npr.mutex.Unlock()
+ return npr.Hash
+}
+
+func (npr *NonPersistedRevision) ClearHash() {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+ npr.Hash = ""
+}
+
+func (npr *NonPersistedRevision) GetName() string {
+ //npr.mutex.Lock()
+ //defer npr.mutex.Unlock()
+ return npr.Name
+}
+
+func (npr *NonPersistedRevision) SetName(name string) {
+ //npr.mutex.Lock()
+ //defer npr.mutex.Unlock()
+ npr.Name = name
+}
+func (npr *NonPersistedRevision) SetBranch(branch *Branch) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+ npr.Branch = branch
+}
+
+func (npr *NonPersistedRevision) GetBranch() *Branch {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+ return npr.Branch
+}
+
+func (npr *NonPersistedRevision) GetData() interface{} {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+ if npr.Config == nil {
+ return nil
+ }
+ return npr.Config.Data
+}
+
+func (npr *NonPersistedRevision) GetNode() *node {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+ return npr.Branch.Node
+}
+
+func (npr *NonPersistedRevision) Finalize(skipOnExist bool) {
+ npr.Hash = npr.hashContent()
+}
+
+// hashContent generates a hash string based on the contents of the revision.
+// The string should be unique to avoid conflicts with other revisions
+func (npr *NonPersistedRevision) hashContent() string {
+ var buffer bytes.Buffer
+ var childrenKeys []string
+
+ if npr.Config != nil {
+ buffer.WriteString(npr.Config.Hash)
+ }
+
+ if npr.Name != "" {
+ buffer.WriteString(npr.Name)
+ }
+
+ for key := range npr.Children {
+ childrenKeys = append(childrenKeys, key)
+ }
+
+ sort.Strings(childrenKeys)
+
+ if len(npr.Children) > 0 {
+ // Loop through sorted Children keys
+ for _, key := range childrenKeys {
+ for _, child := range npr.Children[key] {
+ if child != nil && child.GetHash() != "" {
+ buffer.WriteString(child.GetHash())
+ }
+ }
+ }
+ }
+
+ return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
+}
+
+// Get will retrieve the data for the current revision
+func (npr *NonPersistedRevision) Get(depth int) interface{} {
+ // 1. Clone the data to avoid any concurrent access issues
+ // 2. The current rev might still be pointing to an old config
+ // thus, force the revision to get its latest value
+ latestRev := npr.GetBranch().GetLatest()
+ originalData := proto.Clone(latestRev.GetData().(proto.Message))
+ data := originalData
+
+ if depth != 0 {
+ // FIXME: Traversing the struct through reflection sometimes corrupts the data.
+ // Unlike the original python implementation, golang structs are not lazy loaded.
+ // Keeping this non-critical logic for now, but Get operations should be forced to
+ // depth=0 to avoid going through the following loop.
+ for fieldName, field := range ChildrenFields(latestRev.GetData()) {
+ childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
+ if field.IsContainer {
+ for _, rev := range latestRev.GetChildren(fieldName) {
+ childData := rev.Get(depth - 1)
+ foundEntry := false
+ for i := 0; i < childDataHolder.Len(); i++ {
+ cdh_if := childDataHolder.Index(i).Interface()
+ if cdh_if.(proto.Message).String() == childData.(proto.Message).String() {
+ foundEntry = true
+ break
+ }
+ }
+ if !foundEntry {
+ // avoid duplicates by adding it only if the child was not found in the holder
+ childDataHolder = reflect.Append(childDataHolder, reflect.ValueOf(childData))
+ }
+ }
+ } else {
+ if revs := npr.GetBranch().GetLatest().GetChildren(fieldName); revs != nil && len(revs) > 0 {
+ rev := revs[0]
+ if rev != nil {
+ childData := rev.Get(depth - 1)
+ if reflect.TypeOf(childData) == reflect.TypeOf(childDataHolder.Interface()) {
+ childDataHolder = reflect.ValueOf(childData)
+ }
+ }
+ }
+ }
+ // Merge child data with cloned object
+ reflect.ValueOf(data).Elem().FieldByName(childDataName).Set(childDataHolder)
+ }
+ }
+
+ result := data
+
+ if result != nil {
+ // We need to send back a copy of the retrieved object
+ result = proto.Clone(data.(proto.Message))
+ }
+
+ return result
+}
+
+// UpdateData will refresh the data content of the revision
+func (npr *NonPersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+
+ log.Debugw("update-data", log.Fields{"hash": npr.GetHash(), "current": npr.Config.Data, "provided": data})
+
+ // Do not update the revision if data is the same
+ if npr.Config.Data != nil && npr.Config.hashData(npr.Root, data) == npr.Config.Hash {
+ log.Debugw("stored-data-matches-latest", log.Fields{"stored": npr.Config.Data, "provided": data})
+ return npr
+ }
+
+ // Construct a new revision based on the current one
+ newRev := NonPersistedRevision{}
+ newRev.Config = NewDataRevision(npr.Root, data)
+ newRev.Hash = npr.Hash
+ newRev.Root = npr.Root
+ newRev.Name = npr.Name
+ newRev.Branch = branch
+ newRev.lastUpdate = npr.lastUpdate
+
+ newRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
+ newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
+ }
+
+ newRev.Finalize(false)
+
+ log.Debugw("update-data-complete", log.Fields{"updated": newRev.Config.Data, "provided": data})
+
+ return &newRev
+}
+
+// UpdateChildren will refresh the list of children with the provided ones
+// It will carefully go through the list and ensure that no child is lost
+func (npr *NonPersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+
+ // Construct a new revision based on the current one
+ updatedRev := &NonPersistedRevision{}
+ updatedRev.Config = NewDataRevision(npr.Root, npr.Config.Data)
+ updatedRev.Hash = npr.Hash
+ updatedRev.Branch = branch
+ updatedRev.Name = npr.Name
+ updatedRev.lastUpdate = npr.lastUpdate
+
+ updatedRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
+ updatedRev.Children[entryName] = append(updatedRev.Children[entryName], childrenEntry...)
+ }
+
+ var updatedChildren []Revision
+
+ // Verify if the map contains already contains an entry matching the name value
+ // If so, we need to retain the contents of that entry and merge them with the provided children revision list
+ if existingChildren := branch.GetLatest().GetChildren(name); existingChildren != nil {
+ // Construct a map of unique child names with the respective index value
+ // for the children in the existing revision as well as the new ones
+ existingNames := make(map[string]int)
+ newNames := make(map[string]int)
+
+ for i, newChild := range children {
+ newNames[newChild.GetName()] = i
+ }
+
+ for i, existingChild := range existingChildren {
+ existingNames[existingChild.GetName()] = i
+
+ // If an existing entry is not in the new list, add it to the updated list, so it is not forgotten
+ if _, exists := newNames[existingChild.GetName()]; !exists {
+ updatedChildren = append(updatedChildren, existingChild)
+ }
+ }
+
+ log.Debugw("existing-children-names", log.Fields{"hash": npr.GetHash(), "names": existingNames})
+
+ // Merge existing and new children
+ for _, newChild := range children {
+ nameIndex, nameExists := existingNames[newChild.GetName()]
+
+ // Does the existing list contain a child with that name?
+ if nameExists {
+ // Check if the data has changed or not
+ if existingChildren[nameIndex].GetData().(proto.Message).String() != newChild.GetData().(proto.Message).String() {
+ log.Debugw("replacing-existing-child", log.Fields{
+ "old-hash": existingChildren[nameIndex].GetHash(),
+ "old-data": existingChildren[nameIndex].GetData(),
+ "new-hash": newChild.GetHash(),
+ "new-data": newChild.GetData(),
+ })
+
+ // replace entry
+ newChild.GetNode().SetRoot(existingChildren[nameIndex].GetNode().GetRoot())
+ updatedChildren = append(updatedChildren, newChild)
+ } else {
+ log.Debugw("keeping-existing-child", log.Fields{
+ "old-hash": existingChildren[nameIndex].GetHash(),
+ "old-data": existingChildren[nameIndex].GetData(),
+ "new-hash": newChild.GetHash(),
+ "new-data": newChild.GetData(),
+ })
+
+ // keep existing entry
+ updatedChildren = append(updatedChildren, existingChildren[nameIndex])
+ }
+ } else {
+ log.Debugw("adding-unknown-child", log.Fields{
+ "hash": newChild.GetHash(),
+ "data": newChild.GetData(),
+ })
+
+ // new entry ... just add it
+ updatedChildren = append(updatedChildren, newChild)
+ }
+ }
+
+ // Save children in new revision
+ updatedRev.SetChildren(name, updatedChildren)
+
+ updatedNames := make(map[string]int)
+ for i, updatedChild := range updatedChildren {
+ updatedNames[updatedChild.GetName()] = i
+ }
+
+ log.Debugw("updated-children-names", log.Fields{"hash": npr.GetHash(), "names": updatedNames})
+
+ } else {
+ // There are no children available, just save the provided ones
+ updatedRev.SetChildren(name, children)
+ }
+
+ updatedRev.Finalize(false)
+
+ return updatedRev
+}
+
+// UpdateAllChildren will replace the current list of children with the provided ones
+func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+
+ newRev := npr
+ newRev.Config = npr.Config
+ newRev.Hash = npr.Hash
+ newRev.Branch = branch
+ newRev.Name = npr.Name
+ newRev.lastUpdate = npr.lastUpdate
+
+ newRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range children {
+ newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
+ }
+ newRev.Finalize(false)
+
+ return newRev
+}
+
+// Drop is used to indicate when a revision is no longer required
+func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
+ log.Debugw("dropping-revision", log.Fields{"hash": npr.GetHash(), "name": npr.GetName()})
+}
+
+// ChildDrop will remove a child entry matching the provided parameters from the current revision
+func (npr *NonPersistedRevision) ChildDrop(childType string, childHash string) {
+ if childType != "" {
+ children := make([]Revision, len(npr.GetChildren(childType)))
+ copy(children, npr.GetChildren(childType))
+ for i, child := range children {
+ if child.GetHash() == childHash {
+ children = append(children[:i], children[i+1:]...)
+ npr.SetChildren(childType, children)
+ break
+ }
+ }
+ }
+}
+
+/// ChildDropByName will remove a child entry matching the type and name
+func (npr *NonPersistedRevision) ChildDropByName(childName string) {
+ // Extract device type
+ parts := strings.SplitN(childName, "/", 2)
+ childType := parts[0]
+
+ if childType != "" {
+ children := make([]Revision, len(npr.GetChildren(childType)))
+ copy(children, npr.GetChildren(childType))
+ for i, child := range children {
+ if child.GetName() == childName {
+ children = append(children[:i], children[i+1:]...)
+ npr.SetChildren(childType, children)
+ break
+ }
+ }
+ }
+}
+
+func (npr *NonPersistedRevision) SetLastUpdate(ts ...time.Time) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+
+ if ts != nil && len(ts) > 0 {
+ npr.lastUpdate = ts[0]
+ } else {
+ npr.lastUpdate = time.Now()
+ }
+}
+
+func (npr *NonPersistedRevision) GetLastUpdate() time.Time {
+ npr.mutex.RLock()
+ defer npr.mutex.RUnlock()
+
+ return npr.lastUpdate
+}
+
+func (npr *NonPersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+ // stub... required by interface
+ return nil
+}
+
+func (npr *NonPersistedRevision) SetupWatch(key string) {
+ // stub ... required by interface
+}
+
+func (npr *NonPersistedRevision) StorageDrop(txid string, includeConfig bool) {
+ // stub ... required by interface
+}
+
+func (npr *NonPersistedRevision) getVersion() int64 {
+ return -1
+}
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
new file mode 100644
index 0000000..b5f1d09
--- /dev/null
+++ b/db/model/persisted_revision.go
@@ -0,0 +1,626 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "bytes"
+ "compress/gzip"
+ "context"
+ "github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v2/pkg/db"
+ "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "reflect"
+ "strings"
+ "sync"
+)
+
+// PersistedRevision holds information of revision meant to be saved in a persistent storage
+type PersistedRevision struct {
+ Revision
+ Compress bool
+
+ events chan *kvstore.Event
+ kvStore *db.Backend
+ mutex sync.RWMutex
+ versionMutex sync.RWMutex
+ Version int64
+ isStored bool
+ isWatched bool
+}
+
+type watchCache struct {
+ Cache sync.Map
+}
+
+var watchCacheInstance *watchCache
+var watchCacheOne sync.Once
+
+func Watches() *watchCache {
+ watchCacheOne.Do(func() {
+ watchCacheInstance = &watchCache{Cache: sync.Map{}}
+ })
+ return watchCacheInstance
+}
+
+// NewPersistedRevision creates a new instance of a PersistentRevision structure
+func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ pr := &PersistedRevision{}
+ pr.kvStore = branch.Node.GetRoot().KvStore
+ pr.Version = 1
+ pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
+ return pr
+}
+
+func (pr *PersistedRevision) getVersion() int64 {
+ pr.versionMutex.RLock()
+ defer pr.versionMutex.RUnlock()
+ return pr.Version
+}
+
+func (pr *PersistedRevision) setVersion(version int64) {
+ pr.versionMutex.Lock()
+ defer pr.versionMutex.Unlock()
+ pr.Version = version
+}
+
+// Finalize is responsible of saving the revision in the persistent storage
+func (pr *PersistedRevision) Finalize(skipOnExist bool) {
+ pr.store(skipOnExist)
+}
+
+func (pr *PersistedRevision) store(skipOnExist bool) {
+ if pr.GetBranch().Txid != "" {
+ return
+ }
+
+ log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
+
+ // clone the revision data to avoid any race conditions with processes
+ // accessing the same data
+ cloned := proto.Clone(pr.GetConfig().Data.(proto.Message))
+
+ if blob, err := proto.Marshal(cloned); err != nil {
+ log.Errorw("problem-to-marshal", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
+ } else {
+ if pr.Compress {
+ var b bytes.Buffer
+ w := gzip.NewWriter(&b)
+ w.Write(blob)
+ w.Close()
+ blob = b.Bytes()
+ }
+
+ GetRevCache().Set(pr.GetName(), pr)
+ if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
+ log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
+ } else {
+ log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
+ pr.isStored = true
+ }
+ }
+}
+
+func (pr *PersistedRevision) SetupWatch(key string) {
+ if key == "" {
+ log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+ return
+ }
+
+ if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
+ return
+ }
+
+ if pr.events == nil {
+ pr.events = make(chan *kvstore.Event)
+
+ log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+
+ pr.SetName(key)
+ pr.events = pr.kvStore.CreateWatch(key)
+ }
+
+ if !pr.isWatched {
+ pr.isWatched = true
+
+ log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+
+ // Start watching
+ go pr.startWatching()
+ }
+}
+
+func (pr *PersistedRevision) startWatching() {
+ log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+
+StopWatchLoop:
+ for {
+ latestRev := pr.GetBranch().GetLatest()
+
+ select {
+ case event, ok := <-pr.events:
+ if !ok {
+ log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+ break StopWatchLoop
+ }
+ log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
+
+ switch event.EventType {
+ case kvstore.DELETE:
+ log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+
+ // Remove reference from cache
+ GetRevCache().Delete(latestRev.GetName())
+
+ // Remove reference from parent
+ parent := pr.GetBranch().Node.GetRoot()
+ parent.GetBranch(NONE).Latest.ChildDropByName(latestRev.GetName())
+
+ break StopWatchLoop
+
+ case kvstore.PUT:
+ log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+ if latestRev.getVersion() >= event.Version {
+ log.Debugw("skipping-matching-or-older-revision", log.Fields{
+ "watch": latestRev.GetName(),
+ "watch-version": event.Version,
+ "latest-version": latestRev.getVersion(),
+ })
+ continue
+ } else {
+ log.Debugw("watch-revision-is-newer", log.Fields{
+ "watch": latestRev.GetName(),
+ "watch-version": event.Version,
+ "latest-version": latestRev.getVersion(),
+ })
+ }
+
+ data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
+
+ if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
+ log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
+ } else {
+ log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
+
+ var pathLock string
+ var blobs map[string]*kvstore.KVPair
+
+ // The watch reported new persistence data.
+ // Construct an object that will be used to update the memory
+ blobs = make(map[string]*kvstore.KVPair)
+ key, _ := kvstore.ToString(event.Key)
+ blobs[key] = &kvstore.KVPair{
+ Key: key,
+ Value: event.Value,
+ Session: "",
+ Lease: 0,
+ Version: event.Version,
+ }
+
+ if latestRev.GetNode().GetProxy() != nil {
+ //
+ // If a proxy exists for this revision, use it to lock access to the path
+ // and prevent simultaneous updates to the object in memory
+ //
+
+ //If the proxy already has a request in progress, then there is no need to process the watch
+ if latestRev.GetNode().GetProxy().GetOperation() != PROXY_NONE {
+ log.Debugw("operation-in-progress", log.Fields{
+ "key": latestRev.GetHash(),
+ "path": latestRev.GetNode().GetProxy().getFullPath(),
+ "operation": latestRev.GetNode().GetProxy().operation.String(),
+ })
+ continue
+ }
+
+ pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
+
+ // Reserve the path to prevent others to modify while we reload from persistence
+ latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ latestRev.GetNode().GetProxy().SetOperation(PROXY_WATCH)
+
+ // Load changes and apply to memory
+ latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
+
+ // Release path
+ latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+
+ } else {
+ // This block should be reached only if coming from a non-proxied request
+ log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+
+ // Load changes and apply to memory
+ latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
+ }
+ }
+
+ default:
+ log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
+ }
+ }
+ }
+
+ Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
+
+ log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
+}
+
+// UpdateData modifies the information in the data model and saves it in the persistent storage
+func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
+ log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
+
+ newNPR := pr.Revision.UpdateData(ctx, data, branch)
+
+ newPR := &PersistedRevision{
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ events: pr.events,
+ Version: pr.getVersion(),
+ isWatched: pr.isWatched,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ pr.Drop(branch.Txid, false)
+ } else {
+ newPR.isStored = true
+ }
+
+ return newPR
+}
+
+// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
+func (pr *PersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
+ log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
+
+ newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
+
+ newPR := &PersistedRevision{
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ events: pr.events,
+ Version: pr.getVersion(),
+ isWatched: pr.isWatched,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ } else {
+ newPR.isStored = true
+ }
+
+ return newPR
+}
+
+// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
+func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
+ log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
+
+ newNPR := pr.Revision.UpdateAllChildren(children, branch)
+
+ newPR := &PersistedRevision{
+ Revision: newNPR,
+ Compress: pr.Compress,
+ kvStore: pr.kvStore,
+ events: pr.events,
+ Version: pr.getVersion(),
+ isWatched: pr.isWatched,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ } else {
+ newPR.isStored = true
+ }
+
+ return newPR
+}
+
+// Drop takes care of eliminating a revision hash that is no longer needed
+// and its associated config when required
+func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
+ pr.Revision.Drop(txid, includeConfig)
+}
+
+// Drop takes care of eliminating a revision hash that is no longer needed
+// and its associated config when required
+func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
+ log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
+
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
+ if pr.kvStore != nil && txid == "" {
+ if pr.isStored {
+ if pr.isWatched {
+ pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
+ pr.isWatched = false
+ }
+
+ if err := pr.kvStore.Delete(pr.GetName()); err != nil {
+ log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
+ } else {
+ pr.isStored = false
+ }
+ }
+
+ } else {
+ if includeConfig {
+ log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
+ }
+ log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
+ }
+
+ pr.Revision.Drop(txid, includeConfig)
+}
+
+// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
+func (pr *PersistedRevision) verifyPersistedEntry(ctx context.Context, data interface{}, typeName string, keyName string,
+ keyValue string, txid string, version int64) (response Revision) {
+ // Parent which holds the current node entry
+ parent := pr.GetBranch().Node.GetRoot()
+
+ // Get a copy of the parent's children
+ children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
+ copy(children, parent.GetBranch(NONE).Latest.GetChildren(typeName))
+
+ // Verify if a child with the provided key value can be found
+ if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
+ // A child matching the provided key exists in memory
+ // Verify if the data differs from what was retrieved from persistence
+ // Also check if we are treating a newer revision of the data or not
+ if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() && childRev.getVersion() < version {
+ log.Debugw("revision-data-is-different", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ "data": childRev.GetData(),
+ "in-memory-version": childRev.getVersion(),
+ "persisted-version": version,
+ })
+
+ //
+ // Data has changed; replace the child entry and update the parent revision
+ //
+
+ // BEGIN Lock child -- prevent any incoming changes
+ childRev.GetBranch().LatestLock.Lock()
+
+ // Update child
+ updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
+
+ updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
+ updatedChildRev.SetupWatch(updatedChildRev.GetName())
+ updatedChildRev.SetLastUpdate()
+ updatedChildRev.(*PersistedRevision).setVersion(version)
+
+ // Update cache
+ GetRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
+ childRev.Drop(txid, false)
+
+ childRev.GetBranch().LatestLock.Unlock()
+ // END lock child
+
+ // Update child entry
+ children[childIdx] = updatedChildRev
+
+ // BEGIN lock parent -- Update parent
+ parent.GetBranch(NONE).LatestLock.Lock()
+
+ updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
+ parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
+
+ parent.GetBranch(NONE).LatestLock.Unlock()
+ // END lock parent
+
+ // Drop the previous child revision
+ parent.GetBranch(NONE).Latest.ChildDrop(typeName, childRev.GetHash())
+
+ if updatedChildRev != nil {
+ log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+ "key": updatedChildRev.GetHash(),
+ "name": updatedChildRev.GetName(),
+ "data": updatedChildRev.GetData(),
+ })
+ response = updatedChildRev
+ }
+ } else {
+ if childRev != nil {
+ log.Debugw("keeping-revision-data", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ "data": childRev.GetData(),
+ "in-memory-version": childRev.getVersion(),
+ "persistence-version": version,
+ })
+
+ // Update timestamp to reflect when it was last read and to reset tracked timeout
+ childRev.SetLastUpdate()
+ if childRev.getVersion() < version {
+ childRev.(*PersistedRevision).setVersion(version)
+ }
+ GetRevCache().Set(childRev.GetName(), childRev)
+ response = childRev
+ }
+ }
+
+ } else {
+ // There is no available child with that key value.
+ // Create a new child and update the parent revision.
+ log.Debugw("no-such-revision-entry", log.Fields{
+ "key": keyValue,
+ "name": typeName,
+ "data": data,
+ "version": version,
+ })
+
+ // BEGIN child lock
+ pr.GetBranch().LatestLock.Lock()
+
+ // Construct a new child node with the retrieved persistence data
+ childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
+
+ // We need to start watching this entry for future changes
+ childRev.SetName(typeName + "/" + keyValue)
+ childRev.SetupWatch(childRev.GetName())
+ childRev.(*PersistedRevision).setVersion(version)
+
+ // Add entry to cache
+ GetRevCache().Set(childRev.GetName(), childRev)
+
+ pr.GetBranch().LatestLock.Unlock()
+ // END child lock
+
+ //
+ // Add the child to the parent revision
+ //
+
+ // BEGIN parent lock
+ parent.GetBranch(NONE).LatestLock.Lock()
+ children = append(children, childRev)
+ updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
+ updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
+ parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
+ parent.GetBranch(NONE).LatestLock.Unlock()
+ // END parent lock
+
+ // Child entry is valid and can be included in the response object
+ if childRev != nil {
+ log.Debugw("adding-revision-to-response", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ "data": childRev.GetData(),
+ })
+ response = childRev
+ }
+ }
+
+ return response
+}
+
+// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
+// by adding missing entries, updating changed entries and ignoring unchanged ones
+func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
+
+ log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
+
+ var response []Revision
+
+ for strings.HasPrefix(path, "/") {
+ path = path[1:]
+ }
+
+ if pr.kvStore != nil && path != "" {
+ if blobs == nil || len(blobs) == 0 {
+ log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
+ blobs, _ = pr.kvStore.List(path)
+ }
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ var nodeType interface{}
+ if len(partition) < 2 {
+ path = ""
+ nodeType = pr.GetBranch().Node.Type
+ } else {
+ path = partition[1]
+ nodeType = pr.GetBranch().Node.GetRoot().Type
+ }
+
+ field := ChildrenFields(nodeType)[name]
+
+ if field != nil && field.IsContainer {
+ log.Debugw("parsing-data-blobs", log.Fields{
+ "path": path,
+ "name": name,
+ "size": len(blobs),
+ })
+
+ for _, blob := range blobs {
+ output := blob.Value.([]byte)
+
+ data := reflect.New(field.ClassType.Elem())
+
+ if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
+ log.Errorw("failed-to-unmarshal", log.Fields{
+ "path": path,
+ "txid": txid,
+ "error": err,
+ })
+ } else if path == "" {
+ if field.Key != "" {
+ log.Debugw("no-path-with-container-key", log.Fields{
+ "path": path,
+ "txid": txid,
+ "data": data.Interface(),
+ })
+
+ // Retrieve the key identifier value from the data structure
+ // based on the field's key attribute
+ _, key := GetAttributeValue(data.Interface(), field.Key, 0)
+
+ if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, key.String(), txid, blob.Version); entry != nil {
+ response = append(response, entry)
+ }
+ } else {
+ log.Debugw("path-with-no-container-key", log.Fields{
+ "path": path,
+ "txid": txid,
+ "data": data.Interface(),
+ })
+ }
+
+ } else if field.Key != "" {
+ log.Debugw("path-with-container-key", log.Fields{
+ "path": path,
+ "txid": txid,
+ "data": data.Interface(),
+ })
+ // The request is for a specific entry/id
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+
+ if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, keyValue.(string), txid, blob.Version); entry != nil {
+ response = append(response, entry)
+ }
+ }
+ }
+
+ log.Debugw("no-more-data-blobs", log.Fields{"path": path, "name": name})
+ } else {
+ log.Debugw("cannot-process-field", log.Fields{
+ "type": pr.GetBranch().Node.Type,
+ "name": name,
+ })
+ }
+ }
+
+ return response
+}
diff --git a/db/model/profiling.go b/db/model/profiling.go
new file mode 100644
index 0000000..f8e9f7a
--- /dev/null
+++ b/db/model/profiling.go
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "sync"
+)
+
+// Profiling is used to store performance details collected at runtime
+type profiling struct {
+ sync.RWMutex
+ DatabaseRetrieveTime float64
+ DatabaseRetrieveCount int
+ InMemoryModelTime float64
+ InMemoryModelCount int
+ InMemoryProcessTime float64
+ DatabaseStoreTime float64
+ InMemoryLockTime float64
+ InMemoryLockCount int
+}
+
+var profilingInstance *profiling
+var profilingOnce sync.Once
+
+// GetProfiling returns a singleton instance of the Profiling structure
+func GetProfiling() *profiling {
+ profilingOnce.Do(func() {
+ profilingInstance = &profiling{}
+ })
+ return profilingInstance
+}
+
+// AddToDatabaseRetrieveTime appends a time period to retrieve data from the database
+func (p *profiling) AddToDatabaseRetrieveTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
+ p.DatabaseRetrieveTime += period
+ p.DatabaseRetrieveCount++
+}
+
+// AddToInMemoryModelTime appends a time period to construct/deconstruct data in memory
+func (p *profiling) AddToInMemoryModelTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
+ p.InMemoryModelTime += period
+ p.InMemoryModelCount++
+}
+
+// AddToInMemoryProcessTime appends a time period to process data
+func (p *profiling) AddToInMemoryProcessTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
+ p.InMemoryProcessTime += period
+}
+
+// AddToDatabaseStoreTime appends a time period to store data in the database
+func (p *profiling) AddToDatabaseStoreTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
+ p.DatabaseStoreTime += period
+}
+
+// AddToInMemoryLockTime appends a time period when a code block was locked
+func (p *profiling) AddToInMemoryLockTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
+ p.InMemoryLockTime += period
+ p.InMemoryLockCount++
+}
+
+// Reset initializes the profile counters
+func (p *profiling) Reset() {
+ p.Lock()
+ defer p.Unlock()
+
+ p.DatabaseRetrieveTime = 0
+ p.DatabaseRetrieveCount = 0
+ p.InMemoryModelTime = 0
+ p.InMemoryModelCount = 0
+ p.InMemoryProcessTime = 0
+ p.DatabaseStoreTime = 0
+ p.InMemoryLockTime = 0
+ p.InMemoryLockCount = 0
+}
+
+// Report will provide the current profile counter status
+func (p *profiling) Report() {
+ p.Lock()
+ defer p.Unlock()
+
+ log.Infof("[ Profiling Report ]")
+ log.Infof("Database Retrieval : %f", p.DatabaseRetrieveTime)
+ log.Infof("Database Retrieval Count : %d", p.DatabaseRetrieveCount)
+ log.Infof("Avg Database Retrieval : %f", p.DatabaseRetrieveTime/float64(p.DatabaseRetrieveCount))
+ log.Infof("In-Memory Modeling : %f", p.InMemoryModelTime)
+ log.Infof("In-Memory Modeling Count: %d", p.InMemoryModelCount)
+ log.Infof("Avg In-Memory Modeling : %f", p.InMemoryModelTime/float64(p.InMemoryModelCount))
+ log.Infof("In-Memory Locking : %f", p.InMemoryLockTime)
+ log.Infof("In-Memory Locking Count: %d", p.InMemoryLockCount)
+ log.Infof("Avg In-Memory Locking : %f", p.InMemoryLockTime/float64(p.InMemoryLockCount))
+
+}
diff --git a/db/model/proxy.go b/db/model/proxy.go
new file mode 100644
index 0000000..b5378fe
--- /dev/null
+++ b/db/model/proxy.go
@@ -0,0 +1,598 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "context"
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "reflect"
+ "runtime"
+ "strings"
+ "sync"
+)
+
+// OperationContext holds details on the information used during an operation
+type OperationContext struct {
+ Path string
+ Data interface{}
+ FieldName string
+ ChildKey string
+}
+
+// NewOperationContext instantiates a new OperationContext structure
+func NewOperationContext(path string, data interface{}, fieldName string, childKey string) *OperationContext {
+ oc := &OperationContext{
+ Path: path,
+ Data: data,
+ FieldName: fieldName,
+ ChildKey: childKey,
+ }
+ return oc
+}
+
+// Update applies new data to the context structure
+func (oc *OperationContext) Update(data interface{}) *OperationContext {
+ oc.Data = data
+ return oc
+}
+
+// Proxy holds the information for a specific location with the data model
+type Proxy struct {
+ mutex sync.RWMutex
+ Root *root
+ Node *node
+ ParentNode *node
+ Path string
+ FullPath string
+ Exclusive bool
+ Callbacks map[CallbackType]map[string]*CallbackTuple
+ operation ProxyOperation
+}
+
+// NewProxy instantiates a new proxy to a specific location
+func NewProxy(root *root, node *node, parentNode *node, path string, fullPath string, exclusive bool) *Proxy {
+ callbacks := make(map[CallbackType]map[string]*CallbackTuple)
+ if fullPath == "/" {
+ fullPath = ""
+ }
+ p := &Proxy{
+ Root: root,
+ Node: node,
+ ParentNode: parentNode,
+ Exclusive: exclusive,
+ Path: path,
+ FullPath: fullPath,
+ Callbacks: callbacks,
+ }
+ return p
+}
+
+// GetRoot returns the root attribute of the proxy
+func (p *Proxy) GetRoot() *root {
+ return p.Root
+}
+
+// getPath returns the path attribute of the proxy
+func (p *Proxy) getPath() string {
+ return p.Path
+}
+
+// getFullPath returns the full path attribute of the proxy
+func (p *Proxy) getFullPath() string {
+ return p.FullPath
+}
+
+// getCallbacks returns the full list of callbacks associated to the proxy
+func (p *Proxy) getCallbacks(callbackType CallbackType) map[string]*CallbackTuple {
+ p.mutex.RLock()
+ defer p.mutex.RUnlock()
+
+ if p != nil {
+ if cb, exists := p.Callbacks[callbackType]; exists {
+ return cb
+ }
+ } else {
+ log.Debugw("proxy-is-nil", log.Fields{"callback-type": callbackType.String()})
+ }
+ return nil
+}
+
+// getCallback returns a specific callback matching the type and function hash
+func (p *Proxy) getCallback(callbackType CallbackType, funcHash string) *CallbackTuple {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ if tuple, exists := p.Callbacks[callbackType][funcHash]; exists {
+ return tuple
+ }
+ return nil
+}
+
+// setCallbacks applies a callbacks list to a type
+func (p *Proxy) setCallbacks(callbackType CallbackType, callbacks map[string]*CallbackTuple) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ p.Callbacks[callbackType] = callbacks
+}
+
+// setCallback applies a callback to a type and hash value
+func (p *Proxy) setCallback(callbackType CallbackType, funcHash string, tuple *CallbackTuple) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ p.Callbacks[callbackType][funcHash] = tuple
+}
+
+// DeleteCallback removes a callback matching the type and hash
+func (p *Proxy) DeleteCallback(callbackType CallbackType, funcHash string) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ delete(p.Callbacks[callbackType], funcHash)
+}
+
+// CallbackType is an enumerated value to express when a callback should be executed
+type ProxyOperation uint8
+
+// Enumerated list of callback types
+const (
+ PROXY_NONE ProxyOperation = iota
+ PROXY_GET
+ PROXY_LIST
+ PROXY_ADD
+ PROXY_UPDATE
+ PROXY_REMOVE
+ PROXY_CREATE
+ PROXY_WATCH
+)
+
+var proxyOperationTypes = []string{
+ "PROXY_NONE",
+ "PROXY_GET",
+ "PROXY_LIST",
+ "PROXY_ADD",
+ "PROXY_UPDATE",
+ "PROXY_REMOVE",
+ "PROXY_CREATE",
+ "PROXY_WATCH",
+}
+
+func (t ProxyOperation) String() string {
+ return proxyOperationTypes[t]
+}
+
+func (p *Proxy) GetOperation() ProxyOperation {
+ p.mutex.RLock()
+ defer p.mutex.RUnlock()
+ return p.operation
+}
+
+func (p *Proxy) SetOperation(operation ProxyOperation) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ p.operation = operation
+}
+
+// parseForControlledPath verifies if a proxy path matches a pattern
+// for locations that need to be access controlled.
+func (p *Proxy) parseForControlledPath(path string) (pathLock string, controlled bool) {
+ // TODO: Add other path prefixes that may need control
+ if strings.HasPrefix(path, "/devices") ||
+ strings.HasPrefix(path, "/logical_devices") ||
+ strings.HasPrefix(path, "/adapters") {
+
+ split := strings.SplitN(path, "/", -1)
+ switch len(split) {
+ case 2:
+ controlled = false
+ pathLock = ""
+ break
+ case 3:
+ fallthrough
+ default:
+ pathLock = fmt.Sprintf("%s/%s", split[1], split[2])
+ controlled = true
+ }
+ }
+ return pathLock, controlled
+}
+
+// List will retrieve information from the data model at the specified path location
+// A list operation will force access to persistence storage
+func (p *Proxy) List(ctx context.Context, path string, depth int, deep bool, txid string) interface{} {
+ var effectivePath string
+ if path == "/" {
+ effectivePath = p.getFullPath()
+ } else {
+ effectivePath = p.getFullPath() + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ p.SetOperation(PROXY_LIST)
+ defer p.SetOperation(PROXY_NONE)
+
+ log.Debugw("proxy-list", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ "operation": p.GetOperation(),
+ })
+
+ rv := p.GetRoot().List(ctx, path, "", depth, deep, txid)
+
+ return rv
+}
+
+// Get will retrieve information from the data model at the specified path location
+func (p *Proxy) Get(ctx context.Context, path string, depth int, deep bool, txid string) interface{} {
+ var effectivePath string
+ if path == "/" {
+ effectivePath = p.getFullPath()
+ } else {
+ effectivePath = p.getFullPath() + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ p.SetOperation(PROXY_GET)
+ defer p.SetOperation(PROXY_NONE)
+
+ log.Debugw("proxy-get", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ "operation": p.GetOperation(),
+ })
+
+ rv := p.GetRoot().Get(ctx, path, "", depth, deep, txid)
+
+ return rv
+}
+
+// Update will modify information in the data model at the specified location with the provided data
+func (p *Proxy) Update(ctx context.Context, path string, data interface{}, strict bool, txid string) interface{} {
+ if !strings.HasPrefix(path, "/") {
+ log.Errorf("invalid path: %s", path)
+ return nil
+ }
+ var fullPath string
+ var effectivePath string
+ if path == "/" {
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
+ } else {
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ p.SetOperation(PROXY_UPDATE)
+ defer p.SetOperation(PROXY_NONE)
+
+ log.Debugw("proxy-update", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ "operation": p.GetOperation(),
+ })
+
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
+
+ result := p.GetRoot().Update(ctx, fullPath, data, strict, txid, nil)
+
+ if result != nil {
+ return result.GetData()
+ }
+
+ return nil
+}
+
+// AddWithID will insert new data at specified location.
+// This method also allows the user to specify the ID of the data entry to ensure
+// that access control is active while inserting the information.
+func (p *Proxy) AddWithID(ctx context.Context, path string, id string, data interface{}, txid string) interface{} {
+ if !strings.HasPrefix(path, "/") {
+ log.Errorf("invalid path: %s", path)
+ return nil
+ }
+ var fullPath string
+ var effectivePath string
+ if path == "/" {
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
+ } else {
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path + "/" + id
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ p.SetOperation(PROXY_ADD)
+ defer p.SetOperation(PROXY_NONE)
+
+ log.Debugw("proxy-add-with-id", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ "operation": p.GetOperation(),
+ })
+
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
+
+ result := p.GetRoot().Add(ctx, fullPath, data, txid, nil)
+
+ if result != nil {
+ return result.GetData()
+ }
+
+ return nil
+}
+
+// Add will insert new data at specified location.
+func (p *Proxy) Add(ctx context.Context, path string, data interface{}, txid string) interface{} {
+ if !strings.HasPrefix(path, "/") {
+ log.Errorf("invalid path: %s", path)
+ return nil
+ }
+ var fullPath string
+ var effectivePath string
+ if path == "/" {
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
+ } else {
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ p.SetOperation(PROXY_ADD)
+ defer p.SetOperation(PROXY_NONE)
+
+ log.Debugw("proxy-add", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ "operation": p.GetOperation(),
+ })
+
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
+
+ result := p.GetRoot().Add(ctx, fullPath, data, txid, nil)
+
+ if result != nil {
+ return result.GetData()
+ }
+
+ return nil
+}
+
+// Remove will delete an entry at the specified location
+func (p *Proxy) Remove(ctx context.Context, path string, txid string) interface{} {
+ if !strings.HasPrefix(path, "/") {
+ log.Errorf("invalid path: %s", path)
+ return nil
+ }
+ var fullPath string
+ var effectivePath string
+ if path == "/" {
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
+ } else {
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ p.SetOperation(PROXY_REMOVE)
+ defer p.SetOperation(PROXY_NONE)
+
+ log.Debugw("proxy-remove", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ "operation": p.GetOperation(),
+ })
+
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
+
+ result := p.GetRoot().Remove(ctx, fullPath, txid, nil)
+
+ if result != nil {
+ return result.GetData()
+ }
+
+ return nil
+}
+
+// CreateProxy to interact with specific path directly
+func (p *Proxy) CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy {
+ if !strings.HasPrefix(path, "/") {
+ log.Errorf("invalid path: %s", path)
+ return nil
+ }
+
+ var fullPath string
+ var effectivePath string
+ if path == "/" {
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
+ } else {
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ p.SetOperation(PROXY_CREATE)
+ defer p.SetOperation(PROXY_NONE)
+
+ log.Debugw("proxy-create", log.Fields{
+ "path": path,
+ "effective": effectivePath,
+ "full": fullPath,
+ "pathLock": pathLock,
+ "controlled": controlled,
+ "operation": p.GetOperation(),
+ })
+
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
+
+ return p.GetRoot().CreateProxy(ctx, fullPath, exclusive)
+}
+
+// OpenTransaction creates a new transaction branch to isolate operations made to the data model
+func (p *Proxy) OpenTransaction() *Transaction {
+ txid := p.GetRoot().MakeTxBranch()
+ return NewTransaction(p, txid)
+}
+
+// commitTransaction will apply and merge modifications made in the transaction branch to the data model
+func (p *Proxy) commitTransaction(txid string) {
+ p.GetRoot().FoldTxBranch(txid)
+}
+
+// cancelTransaction will terminate a transaction branch along will all changes within it
+func (p *Proxy) cancelTransaction(txid string) {
+ p.GetRoot().DeleteTxBranch(txid)
+}
+
+// CallbackFunction is a type used to define callback functions
+type CallbackFunction func(args ...interface{}) interface{}
+
+// CallbackTuple holds the function and arguments details of a callback
+type CallbackTuple struct {
+ callback CallbackFunction
+ args []interface{}
+}
+
+// Execute will process the a callback with its provided arguments
+func (tuple *CallbackTuple) Execute(contextArgs []interface{}) interface{} {
+ args := []interface{}{}
+
+ for _, ta := range tuple.args {
+ args = append(args, ta)
+ }
+
+ if contextArgs != nil {
+ for _, ca := range contextArgs {
+ args = append(args, ca)
+ }
+ }
+
+ return tuple.callback(args...)
+}
+
+// RegisterCallback associates a callback to the proxy
+func (p *Proxy) RegisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
+ if p.getCallbacks(callbackType) == nil {
+ p.setCallbacks(callbackType, make(map[string]*CallbackTuple))
+ }
+ funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
+ log.Debugf("value of function: %s", funcName)
+ funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
+
+ p.setCallback(callbackType, funcHash, &CallbackTuple{callback, args})
+}
+
+// UnregisterCallback removes references to a callback within a proxy
+func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
+ if p.getCallbacks(callbackType) == nil {
+ log.Errorf("no such callback type - %s", callbackType.String())
+ return
+ }
+
+ funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
+ funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
+
+ log.Debugf("value of function: %s", funcName)
+
+ if p.getCallback(callbackType, funcHash) == nil {
+ log.Errorf("function with hash value: '%s' not registered with callback type: '%s'", funcHash, callbackType)
+ return
+ }
+
+ p.DeleteCallback(callbackType, funcHash)
+}
+
+func (p *Proxy) invoke(callback *CallbackTuple, context []interface{}) (result interface{}, err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ errStr := fmt.Sprintf("callback error occurred: %+v", r)
+ err = errors.New(errStr)
+ log.Error(errStr)
+ }
+ }()
+
+ result = callback.Execute(context)
+
+ return result, err
+}
+
+// InvokeCallbacks executes all callbacks associated to a specific type
+func (p *Proxy) InvokeCallbacks(args ...interface{}) (result interface{}) {
+ callbackType := args[0].(CallbackType)
+ proceedOnError := args[1].(bool)
+ context := args[2:]
+
+ var err error
+
+ if callbacks := p.getCallbacks(callbackType); callbacks != nil {
+ p.mutex.Lock()
+ for _, callback := range callbacks {
+ if result, err = p.invoke(callback, context); err != nil {
+ if !proceedOnError {
+ log.Info("An error occurred. Stopping callback invocation")
+ break
+ }
+ log.Info("An error occurred. Invoking next callback")
+ }
+ }
+ p.mutex.Unlock()
+ }
+
+ return result
+}
diff --git a/db/model/proxy_load_test.go b/db/model/proxy_load_test.go
new file mode 100644
index 0000000..3f3327b
--- /dev/null
+++ b/db/model/proxy_load_test.go
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "context"
+ "encoding/hex"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/common"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "math/rand"
+ "reflect"
+ "strconv"
+ "sync"
+ "testing"
+)
+
+var (
+ BenchmarkProxy_Root *root
+ BenchmarkProxy_DeviceProxy *Proxy
+ BenchmarkProxy_PLT *proxyLoadTest
+ BenchmarkProxy_Logger log.Logger
+)
+
+type proxyLoadChanges struct {
+ ID string
+ Before interface{}
+ After interface{}
+}
+type proxyLoadTest struct {
+ mutex sync.RWMutex
+
+ addMutex sync.RWMutex
+ addedDevices []string
+
+ firmwareMutex sync.RWMutex
+ updatedFirmwares []proxyLoadChanges
+ flowMutex sync.RWMutex
+ updatedFlows []proxyLoadChanges
+
+ preAddExecuted bool
+ postAddExecuted bool
+ preUpdateExecuted bool
+ postUpdateExecuted bool
+}
+
+func (plt *proxyLoadTest) SetPreAddExecuted(status bool) {
+ plt.mutex.Lock()
+ defer plt.mutex.Unlock()
+ plt.preAddExecuted = status
+}
+func (plt *proxyLoadTest) SetPostAddExecuted(status bool) {
+ plt.mutex.Lock()
+ defer plt.mutex.Unlock()
+ plt.postAddExecuted = status
+}
+func (plt *proxyLoadTest) SetPreUpdateExecuted(status bool) {
+ plt.mutex.Lock()
+ defer plt.mutex.Unlock()
+ plt.preUpdateExecuted = status
+}
+func (plt *proxyLoadTest) SetPostUpdateExecuted(status bool) {
+ plt.mutex.Lock()
+ defer plt.mutex.Unlock()
+ plt.postUpdateExecuted = status
+}
+
+func init() {
+ BenchmarkProxy_Root = NewRoot(&voltha.Voltha{}, nil)
+
+ BenchmarkProxy_Logger, _ = log.AddPackage(log.JSON, log.DebugLevel, log.Fields{"instanceId": "PLT"})
+ //log.UpdateAllLoggers(log.Fields{"instanceId": "PROXY_LOAD_TEST"})
+ //Setup default logger - applies for packages that do not have specific logger set
+ if _, err := log.SetDefaultLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId": "PLT"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ // Update all loggers (provisioned via init) with a common field
+ if err := log.UpdateAllLoggers(log.Fields{"instanceId": "PLT"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
+
+ BenchmarkProxy_DeviceProxy = BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/", false)
+ // Register ADD instructions callbacks
+ BenchmarkProxy_PLT = &proxyLoadTest{}
+
+ BenchmarkProxy_DeviceProxy.RegisterCallback(PRE_ADD, commonCallbackFunc, "PRE_ADD", BenchmarkProxy_PLT.SetPreAddExecuted)
+ BenchmarkProxy_DeviceProxy.RegisterCallback(POST_ADD, commonCallbackFunc, "POST_ADD", BenchmarkProxy_PLT.SetPostAddExecuted)
+
+ //// Register UPDATE instructions callbacks
+ BenchmarkProxy_DeviceProxy.RegisterCallback(PRE_UPDATE, commonCallbackFunc, "PRE_UPDATE", BenchmarkProxy_PLT.SetPreUpdateExecuted)
+ BenchmarkProxy_DeviceProxy.RegisterCallback(POST_UPDATE, commonCallbackFunc, "POST_UPDATE", BenchmarkProxy_PLT.SetPostUpdateExecuted)
+
+}
+
+func BenchmarkProxy_AddDevice(b *testing.B) {
+ defer GetProfiling().Report()
+ b.RunParallel(func(pb *testing.PB) {
+ b.Log("Started adding devices")
+ for pb.Next() {
+ ltPorts := []*voltha.Port{
+ {
+ PortNo: 123,
+ Label: "lt-port-0",
+ Type: voltha.Port_PON_OLT,
+ AdminState: common.AdminState_ENABLED,
+ OperStatus: common.OperStatus_ACTIVE,
+ DeviceId: "lt-port-0-device-id",
+ Peers: []*voltha.Port_PeerPort{},
+ },
+ }
+
+ ltStats := &openflow_13.OfpFlowStats{
+ Id: 1000,
+ }
+ ltFlows := &openflow_13.Flows{
+ Items: []*openflow_13.OfpFlowStats{ltStats},
+ }
+ ltDevice := &voltha.Device{
+ Id: "",
+ Type: "simulated_olt",
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ Flows: ltFlows,
+ Ports: ltPorts,
+ }
+
+ ltDevIDBin, _ := uuid.New().MarshalBinary()
+ ltDevID := "0001" + hex.EncodeToString(ltDevIDBin)[:12]
+ ltDevice.Id = ltDevID
+
+ BenchmarkProxy_PLT.SetPreAddExecuted(false)
+ BenchmarkProxy_PLT.SetPostAddExecuted(false)
+
+ var added interface{}
+ // Add the device
+ if added = BenchmarkProxy_DeviceProxy.AddWithID(context.Background(), "/devices", ltDevID, ltDevice, ""); added == nil {
+ BenchmarkProxy_Logger.Errorf("Failed to add device: %+v", ltDevice)
+ continue
+ } else {
+ BenchmarkProxy_Logger.Infof("Device was added 1: %+v", added)
+ }
+
+ BenchmarkProxy_PLT.addMutex.Lock()
+ BenchmarkProxy_PLT.addedDevices = append(BenchmarkProxy_PLT.addedDevices, added.(*voltha.Device).Id)
+ BenchmarkProxy_PLT.addMutex.Unlock()
+ }
+ })
+
+ BenchmarkProxy_Logger.Infof("Number of added devices : %d", len(BenchmarkProxy_PLT.addedDevices))
+}
+
+func BenchmarkProxy_UpdateFirmware(b *testing.B) {
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ //for i:=0; i < b.N; i++ {
+
+ if len(BenchmarkProxy_PLT.addedDevices) > 0 {
+ var target interface{}
+ randomID := BenchmarkProxy_PLT.addedDevices[rand.Intn(len(BenchmarkProxy_PLT.addedDevices))]
+ firmProxy := BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/", false)
+ if target = firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false,
+ ""); !reflect.ValueOf(target).IsValid() {
+ BenchmarkProxy_Logger.Errorf("Failed to find device: %s %+v", randomID, target)
+ continue
+ }
+
+ BenchmarkProxy_PLT.SetPreUpdateExecuted(false)
+ BenchmarkProxy_PLT.SetPostUpdateExecuted(false)
+ firmProxy.RegisterCallback(PRE_UPDATE, commonCallbackFunc, "PRE_UPDATE", BenchmarkProxy_PLT.SetPreUpdateExecuted)
+ firmProxy.RegisterCallback(POST_UPDATE, commonCallbackFunc, "POST_UPDATE", BenchmarkProxy_PLT.SetPostUpdateExecuted)
+
+ var fwVersion int
+
+ before := target.(*voltha.Device).FirmwareVersion
+ if target.(*voltha.Device).FirmwareVersion == "n/a" {
+ fwVersion = 0
+ } else {
+ fwVersion, _ = strconv.Atoi(target.(*voltha.Device).FirmwareVersion)
+ fwVersion++
+ }
+
+ target.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
+ after := target.(*voltha.Device).FirmwareVersion
+
+ var updated interface{}
+ if updated = firmProxy.Update(context.Background(), "/devices/"+randomID, target.(*voltha.Device), false,
+ ""); updated == nil {
+ BenchmarkProxy_Logger.Errorf("Failed to update device: %+v", target)
+ continue
+ } else {
+ BenchmarkProxy_Logger.Infof("Device was updated : %+v", updated)
+
+ }
+
+ if d := firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false,
+ ""); !reflect.ValueOf(d).IsValid() {
+ BenchmarkProxy_Logger.Errorf("Failed to get device: %s", randomID)
+ continue
+ } else if d.(*voltha.Device).FirmwareVersion == after {
+ BenchmarkProxy_Logger.Infof("Imm Device was updated with new value: %s %+v", randomID, d)
+ } else if d.(*voltha.Device).FirmwareVersion == before {
+ BenchmarkProxy_Logger.Errorf("Imm Device kept old value: %s %+v %+v", randomID, d, target)
+ } else {
+ BenchmarkProxy_Logger.Errorf("Imm Device has unknown value: %s %+v %+v", randomID, d, target)
+ }
+
+ BenchmarkProxy_PLT.firmwareMutex.Lock()
+
+ BenchmarkProxy_PLT.updatedFirmwares = append(
+ BenchmarkProxy_PLT.updatedFirmwares,
+ proxyLoadChanges{ID: randomID, Before: before, After: after},
+ )
+ BenchmarkProxy_PLT.firmwareMutex.Unlock()
+ }
+ }
+ })
+}
+
+func traverseBranches(revision Revision, depth int) {
+ if revision == nil {
+ return
+ }
+ prefix := strconv.Itoa(depth) + " ~~~~ "
+ for i := 0; i < depth; i++ {
+ prefix += " "
+ }
+
+ BenchmarkProxy_Logger.Debugf("%sRevision: %s %+v", prefix, revision.GetHash(), revision.GetData())
+
+ //for brIdx, brRev := range revision.GetBranch().Revisions {
+ // BenchmarkProxy_Logger.Debugf("%sbranchIndex: %s", prefix, brIdx)
+ // traverseBranches(brRev, depth+1)
+ //}
+ for childrenI, children := range revision.GetAllChildren() {
+ BenchmarkProxy_Logger.Debugf("%schildrenIndex: %s, length: %d", prefix, childrenI, len(children))
+
+ for _, subrev := range children {
+ //subrev.GetBranch().Latest
+ traverseBranches(subrev, depth+1)
+ }
+ }
+
+}
+func BenchmarkProxy_UpdateFlows(b *testing.B) {
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ if len(BenchmarkProxy_PLT.addedDevices) > 0 {
+ randomID := BenchmarkProxy_PLT.addedDevices[rand.Intn(len(BenchmarkProxy_PLT.addedDevices))]
+
+ flowsProxy := BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/devices/"+randomID+"/flows", false)
+ flows := flowsProxy.Get(context.Background(), "/", 0, false, "")
+
+ before := flows.(*openflow_13.Flows).Items[0].TableId
+ flows.(*openflow_13.Flows).Items[0].TableId = uint32(rand.Intn(3000))
+ after := flows.(*openflow_13.Flows).Items[0].TableId
+
+ flowsProxy.RegisterCallback(
+ PRE_UPDATE,
+ commonCallback2,
+ )
+ flowsProxy.RegisterCallback(
+ POST_UPDATE,
+ commonCallback2,
+ )
+
+ var updated interface{}
+ if updated = flowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ b.Errorf("Failed to update flows for device: %+v", flows)
+ } else {
+ BenchmarkProxy_Logger.Infof("Flows were updated : %+v", updated)
+ }
+ BenchmarkProxy_PLT.flowMutex.Lock()
+ BenchmarkProxy_PLT.updatedFlows = append(
+ BenchmarkProxy_PLT.updatedFlows,
+ proxyLoadChanges{ID: randomID, Before: before, After: after},
+ )
+ BenchmarkProxy_PLT.flowMutex.Unlock()
+ }
+ }
+ })
+}
+
+func BenchmarkProxy_GetDevices(b *testing.B) {
+ //traverseBranches(BenchmarkProxy_DeviceProxy.Root.node.Branches[NONE].GetLatest(), 0)
+
+ for i := 0; i < len(BenchmarkProxy_PLT.addedDevices); i++ {
+ devToGet := BenchmarkProxy_PLT.addedDevices[i]
+ // Verify that the added device can now be retrieved
+ if d := BenchmarkProxy_DeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false,
+ ""); !reflect.ValueOf(d).IsValid() {
+ BenchmarkProxy_Logger.Errorf("Failed to get device: %s", devToGet)
+ continue
+ } else {
+ BenchmarkProxy_Logger.Infof("Got device: %s %+v", devToGet, d)
+ }
+ }
+}
+
+func BenchmarkProxy_GetUpdatedFirmware(b *testing.B) {
+ for i := 0; i < len(BenchmarkProxy_PLT.updatedFirmwares); i++ {
+ devToGet := BenchmarkProxy_PLT.updatedFirmwares[i].ID
+ // Verify that the updated device can be retrieved and that the updates were actually applied
+ if d := BenchmarkProxy_DeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false,
+ ""); !reflect.ValueOf(d).IsValid() {
+ BenchmarkProxy_Logger.Errorf("Failed to get device: %s", devToGet)
+ continue
+ } else if d.(*voltha.Device).FirmwareVersion == BenchmarkProxy_PLT.updatedFirmwares[i].After.(string) {
+ BenchmarkProxy_Logger.Infof("Device was updated with new value: %s %+v", devToGet, d)
+ } else if d.(*voltha.Device).FirmwareVersion == BenchmarkProxy_PLT.updatedFirmwares[i].Before.(string) {
+ BenchmarkProxy_Logger.Errorf("Device kept old value: %s %+v %+v", devToGet, d, BenchmarkProxy_PLT.updatedFirmwares[i])
+ } else {
+ BenchmarkProxy_Logger.Errorf("Device has unknown value: %s %+v %+v", devToGet, d, BenchmarkProxy_PLT.updatedFirmwares[i])
+ }
+ }
+}
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
new file mode 100644
index 0000000..785c65b
--- /dev/null
+++ b/db/model/proxy_test.go
@@ -0,0 +1,661 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "context"
+ "encoding/hex"
+ "encoding/json"
+ "github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-protos/v2/go/common"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "math/rand"
+ "reflect"
+ "strconv"
+ "testing"
+ "time"
+)
+
+var (
+ TestProxy_Root *root
+ TestProxy_Root_LogicalDevice *Proxy
+ TestProxy_Root_Device *Proxy
+ TestProxy_Root_Adapter *Proxy
+ TestProxy_DeviceId string
+ TestProxy_AdapterId string
+ TestProxy_LogicalDeviceId string
+ TestProxy_TargetDeviceId string
+ TestProxy_TargetLogicalDeviceId string
+ TestProxy_LogicalPorts []*voltha.LogicalPort
+ TestProxy_Ports []*voltha.Port
+ TestProxy_Stats *openflow_13.OfpFlowStats
+ TestProxy_Flows *openflow_13.Flows
+ TestProxy_Device *voltha.Device
+ TestProxy_LogicalDevice *voltha.LogicalDevice
+ TestProxy_Adapter *voltha.Adapter
+)
+
+func init() {
+ //log.AddPackage(log.JSON, log.InfoLevel, log.Fields{"instanceId": "DB_MODEL"})
+ //log.UpdateAllLoggers(log.Fields{"instanceId": "PROXY_LOAD_TEST"})
+ TestProxy_Root = NewRoot(&voltha.Voltha{}, nil)
+ TestProxy_Root_LogicalDevice = TestProxy_Root.CreateProxy(context.Background(), "/", false)
+ TestProxy_Root_Device = TestProxy_Root.CreateProxy(context.Background(), "/", false)
+ TestProxy_Root_Adapter = TestProxy_Root.CreateProxy(context.Background(), "/", false)
+
+ TestProxy_LogicalPorts = []*voltha.LogicalPort{
+ {
+ Id: "123",
+ DeviceId: "logicalport-0-device-id",
+ DevicePortNo: 123,
+ RootPort: false,
+ },
+ }
+ TestProxy_Ports = []*voltha.Port{
+ {
+ PortNo: 123,
+ Label: "test-port-0",
+ Type: voltha.Port_PON_OLT,
+ AdminState: common.AdminState_ENABLED,
+ OperStatus: common.OperStatus_ACTIVE,
+ DeviceId: "etcd_port-0-device-id",
+ Peers: []*voltha.Port_PeerPort{},
+ },
+ }
+
+ TestProxy_Stats = &openflow_13.OfpFlowStats{
+ Id: 1111,
+ }
+ TestProxy_Flows = &openflow_13.Flows{
+ Items: []*openflow_13.OfpFlowStats{TestProxy_Stats},
+ }
+ TestProxy_Device = &voltha.Device{
+ Id: TestProxy_DeviceId,
+ Type: "simulated_olt",
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ Flows: TestProxy_Flows,
+ Ports: TestProxy_Ports,
+ }
+
+ TestProxy_LogicalDevice = &voltha.LogicalDevice{
+ Id: TestProxy_DeviceId,
+ DatapathId: 0,
+ Ports: TestProxy_LogicalPorts,
+ Flows: TestProxy_Flows,
+ }
+
+ TestProxy_Adapter = &voltha.Adapter{
+ Id: TestProxy_AdapterId,
+ Vendor: "test-adapter-vendor",
+ Version: "test-adapter-version",
+ }
+}
+
+func TestProxy_1_1_1_Add_NewDevice(t *testing.T) {
+ devIDBin, _ := uuid.New().MarshalBinary()
+ TestProxy_DeviceId = "0001" + hex.EncodeToString(devIDBin)[:12]
+ TestProxy_Device.Id = TestProxy_DeviceId
+
+ preAddExecuted := make(chan struct{})
+ postAddExecuted := make(chan struct{})
+ preAddExecutedPtr, postAddExecutedPtr := preAddExecuted, postAddExecuted
+
+ devicesProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/devices", false)
+ devicesProxy.RegisterCallback(PRE_ADD, commonCallback2, "PRE_ADD Device container changes")
+ devicesProxy.RegisterCallback(POST_ADD, commonCallback2, "POST_ADD Device container changes")
+
+ // Register ADD instructions callbacks
+ TestProxy_Root_Device.RegisterCallback(PRE_ADD, commonChanCallback, "PRE_ADD instructions", &preAddExecutedPtr)
+ TestProxy_Root_Device.RegisterCallback(POST_ADD, commonChanCallback, "POST_ADD instructions", &postAddExecutedPtr)
+
+ if added := TestProxy_Root_Device.Add(context.Background(), "/devices", TestProxy_Device, ""); added == nil {
+ t.Error("Failed to add device")
+ } else {
+ t.Logf("Added device : %+v", added)
+ }
+
+ if !verifyGotResponse(preAddExecuted) {
+ t.Error("PRE_ADD callback was not executed")
+ }
+ if !verifyGotResponse(postAddExecuted) {
+ t.Error("POST_ADD callback was not executed")
+ }
+
+ // Verify that the added device can now be retrieved
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find added device")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found device: %s", string(djson))
+ }
+}
+
+func TestProxy_1_1_2_Add_ExistingDevice(t *testing.T) {
+ TestProxy_Device.Id = TestProxy_DeviceId
+
+ added := TestProxy_Root_Device.Add(context.Background(), "/devices", TestProxy_Device, "")
+ if added.(proto.Message).String() != reflect.ValueOf(TestProxy_Device).Interface().(proto.Message).String() {
+ t.Errorf("Devices don't match - existing: %+v returned: %+v", TestProxy_LogicalDevice, added)
+ }
+}
+
+func verifyGotResponse(callbackIndicator <-chan struct{}) bool {
+ timeout := time.After(1 * time.Second)
+ // Wait until the channel closes, or we time out
+ select {
+ case <-callbackIndicator:
+ // Received response successfully
+ return true
+
+ case <-timeout:
+ // Got a timeout! fail with a timeout error
+ return false
+ }
+}
+
+func TestProxy_1_1_3_Add_NewAdapter(t *testing.T) {
+ TestProxy_AdapterId = "test-adapter"
+ TestProxy_Adapter.Id = TestProxy_AdapterId
+ preAddExecuted := make(chan struct{})
+ postAddExecuted := make(chan struct{})
+ preAddExecutedPtr, postAddExecutedPtr := preAddExecuted, postAddExecuted
+
+ // Register ADD instructions callbacks
+ TestProxy_Root_Adapter.RegisterCallback(PRE_ADD, commonChanCallback, "PRE_ADD instructions for adapters", &preAddExecutedPtr)
+ TestProxy_Root_Adapter.RegisterCallback(POST_ADD, commonChanCallback, "POST_ADD instructions for adapters", &postAddExecutedPtr)
+
+ // Add the adapter
+ if added := TestProxy_Root_Adapter.Add(context.Background(), "/adapters", TestProxy_Adapter, ""); added == nil {
+ t.Error("Failed to add adapter")
+ } else {
+ t.Logf("Added adapter : %+v", added)
+ }
+
+ verifyGotResponse(postAddExecuted)
+
+ // Verify that the added device can now be retrieved
+ if d := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find added adapter")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found adapter: %s", string(djson))
+ }
+
+ if !verifyGotResponse(preAddExecuted) {
+ t.Error("PRE_ADD callback was not executed")
+ }
+ if !verifyGotResponse(postAddExecuted) {
+ t.Error("POST_ADD callback was not executed")
+ }
+}
+
+func TestProxy_1_2_1_Get_AllDevices(t *testing.T) {
+ devices := TestProxy_Root_Device.Get(context.Background(), "/devices", 1, false, "")
+
+ if len(devices.([]interface{})) == 0 {
+ t.Error("there are no available devices to retrieve")
+ } else {
+ // Save the target device id for later tests
+ TestProxy_TargetDeviceId = devices.([]interface{})[0].(*voltha.Device).Id
+ t.Logf("retrieved all devices: %+v", devices)
+ }
+}
+
+func TestProxy_1_2_2_Get_SingleDevice(t *testing.T) {
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Errorf("Failed to find device : %s", TestProxy_TargetDeviceId)
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found device: %s", string(djson))
+ }
+}
+
+func TestProxy_1_3_1_Update_Device(t *testing.T) {
+ var fwVersion int
+
+ preUpdateExecuted := make(chan struct{})
+ postUpdateExecuted := make(chan struct{})
+ preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
+
+ if retrieved := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 1, false, ""); retrieved == nil {
+ t.Error("Failed to get device")
+ } else {
+ t.Logf("Found raw device (root proxy): %+v", retrieved)
+
+ if retrieved.(*voltha.Device).FirmwareVersion == "n/a" {
+ fwVersion = 0
+ } else {
+ fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
+ fwVersion++
+ }
+
+ retrieved.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
+
+ TestProxy_Root_Device.RegisterCallback(
+ PRE_UPDATE,
+ commonChanCallback,
+ "PRE_UPDATE instructions (root proxy)", &preUpdateExecutedPtr,
+ )
+ TestProxy_Root_Device.RegisterCallback(
+ POST_UPDATE,
+ commonChanCallback,
+ "POST_UPDATE instructions (root proxy)", &postUpdateExecutedPtr,
+ )
+
+ if afterUpdate := TestProxy_Root_Device.Update(context.Background(), "/devices/"+TestProxy_TargetDeviceId, retrieved, false, ""); afterUpdate == nil {
+ t.Error("Failed to update device")
+ } else {
+ t.Logf("Updated device : %+v", afterUpdate)
+ }
+
+ if !verifyGotResponse(preUpdateExecuted) {
+ t.Error("PRE_UPDATE callback was not executed")
+ }
+ if !verifyGotResponse(postUpdateExecuted) {
+ t.Error("POST_UPDATE callback was not executed")
+ }
+
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated device (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found device (root proxy): %s raw: %+v", string(djson), d)
+ }
+ }
+}
+
+func TestProxy_1_3_2_Update_DeviceFlows(t *testing.T) {
+ // Get a device proxy and update a specific port
+ devFlowsProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/devices/"+TestProxy_DeviceId+"/flows", false)
+ flows := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ flows.(*openflow_13.Flows).Items[0].TableId = 2244
+
+ preUpdateExecuted := make(chan struct{})
+ postUpdateExecuted := make(chan struct{})
+ preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
+
+ devFlowsProxy.RegisterCallback(
+ PRE_UPDATE,
+ commonChanCallback,
+ "PRE_UPDATE instructions (flows proxy)", &preUpdateExecutedPtr,
+ )
+ devFlowsProxy.RegisterCallback(
+ POST_UPDATE,
+ commonChanCallback,
+ "POST_UPDATE instructions (flows proxy)", &postUpdateExecutedPtr,
+ )
+
+ kvFlows := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
+
+ if reflect.DeepEqual(flows, kvFlows) {
+ t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
+ }
+
+ if updated := devFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ t.Error("Failed to update flow")
+ } else {
+ t.Logf("Updated flows : %+v", updated)
+ }
+
+ if !verifyGotResponse(preUpdateExecuted) {
+ t.Error("PRE_UPDATE callback was not executed")
+ }
+ if !verifyGotResponse(postUpdateExecuted) {
+ t.Error("POST_UPDATE callback was not executed")
+ }
+
+ if d := devFlowsProxy.Get(context.Background(), "/", 0, false, ""); d == nil {
+ t.Error("Failed to find updated flows (flows proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found flows (flows proxy): %s", string(djson))
+ }
+
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId+"/flows", 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated flows (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found flows (root proxy): %s", string(djson))
+ }
+}
+
+func TestProxy_1_3_3_Update_Adapter(t *testing.T) {
+ preUpdateExecuted := make(chan struct{})
+ postUpdateExecuted := make(chan struct{})
+ preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
+
+ adaptersProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/adapters", false)
+
+ if retrieved := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 1, false, ""); retrieved == nil {
+ t.Error("Failed to get adapter")
+ } else {
+ t.Logf("Found raw adapter (root proxy): %+v", retrieved)
+
+ retrieved.(*voltha.Adapter).Version = "test-adapter-version-2"
+
+ adaptersProxy.RegisterCallback(
+ PRE_UPDATE,
+ commonChanCallback,
+ "PRE_UPDATE instructions for adapters", &preUpdateExecutedPtr,
+ )
+ adaptersProxy.RegisterCallback(
+ POST_UPDATE,
+ commonChanCallback,
+ "POST_UPDATE instructions for adapters", &postUpdateExecutedPtr,
+ )
+
+ if afterUpdate := adaptersProxy.Update(context.Background(), "/"+TestProxy_AdapterId, retrieved, false, ""); afterUpdate == nil {
+ t.Error("Failed to update adapter")
+ } else {
+ t.Logf("Updated adapter : %+v", afterUpdate)
+ }
+
+ if !verifyGotResponse(preUpdateExecuted) {
+ t.Error("PRE_UPDATE callback for adapter was not executed")
+ }
+ if !verifyGotResponse(postUpdateExecuted) {
+ t.Error("POST_UPDATE callback for adapter was not executed")
+ }
+
+ if d := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated adapter (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found adapter (root proxy): %s raw: %+v", string(djson), d)
+ }
+ }
+}
+
+func TestProxy_1_4_1_Remove_Device(t *testing.T) {
+ preRemoveExecuted := make(chan struct{})
+ postRemoveExecuted := make(chan struct{})
+ preRemoveExecutedPtr, postRemoveExecutedPtr := preRemoveExecuted, postRemoveExecuted
+
+ TestProxy_Root_Device.RegisterCallback(
+ PRE_REMOVE,
+ commonChanCallback,
+ "PRE_REMOVE instructions (root proxy)", &preRemoveExecutedPtr,
+ )
+ TestProxy_Root_Device.RegisterCallback(
+ POST_REMOVE,
+ commonChanCallback,
+ "POST_REMOVE instructions (root proxy)", &postRemoveExecutedPtr,
+ )
+
+ if removed := TestProxy_Root_Device.Remove(context.Background(), "/devices/"+TestProxy_DeviceId, ""); removed == nil {
+ t.Error("Failed to remove device")
+ } else {
+ t.Logf("Removed device : %+v", removed)
+ }
+
+ if !verifyGotResponse(preRemoveExecuted) {
+ t.Error("PRE_REMOVE callback was not executed")
+ }
+ if !verifyGotResponse(postRemoveExecuted) {
+ t.Error("POST_REMOVE callback was not executed")
+ }
+
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ djson, _ := json.Marshal(d)
+ t.Errorf("Device was not removed - %s", djson)
+ } else {
+ t.Logf("Device was removed: %s", TestProxy_DeviceId)
+ }
+}
+
+func TestProxy_2_1_1_Add_NewLogicalDevice(t *testing.T) {
+
+ ldIDBin, _ := uuid.New().MarshalBinary()
+ TestProxy_LogicalDeviceId = "0001" + hex.EncodeToString(ldIDBin)[:12]
+ TestProxy_LogicalDevice.Id = TestProxy_LogicalDeviceId
+
+ preAddExecuted := make(chan struct{})
+ postAddExecuted := make(chan struct{})
+ preAddExecutedPtr, postAddExecutedPtr := preAddExecuted, postAddExecuted
+
+ // Register
+ TestProxy_Root_LogicalDevice.RegisterCallback(PRE_ADD, commonChanCallback, "PRE_ADD instructions", &preAddExecutedPtr)
+ TestProxy_Root_LogicalDevice.RegisterCallback(POST_ADD, commonChanCallback, "POST_ADD instructions", &postAddExecutedPtr)
+
+ if added := TestProxy_Root_LogicalDevice.Add(context.Background(), "/logical_devices", TestProxy_LogicalDevice, ""); added == nil {
+ t.Error("Failed to add logical device")
+ } else {
+ t.Logf("Added logical device : %+v", added)
+ }
+
+ verifyGotResponse(postAddExecuted)
+
+ if ld := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
+ t.Error("Failed to find added logical device")
+ } else {
+ ldJSON, _ := json.Marshal(ld)
+ t.Logf("Found logical device: %s", string(ldJSON))
+ }
+
+ if !verifyGotResponse(preAddExecuted) {
+ t.Error("PRE_ADD callback was not executed")
+ }
+ if !verifyGotResponse(postAddExecuted) {
+ t.Error("POST_ADD callback was not executed")
+ }
+}
+
+func TestProxy_2_1_2_Add_ExistingLogicalDevice(t *testing.T) {
+ TestProxy_LogicalDevice.Id = TestProxy_LogicalDeviceId
+
+ added := TestProxy_Root_LogicalDevice.Add(context.Background(), "/logical_devices", TestProxy_LogicalDevice, "")
+ if added.(proto.Message).String() != reflect.ValueOf(TestProxy_LogicalDevice).Interface().(proto.Message).String() {
+ t.Errorf("Logical devices don't match - existing: %+v returned: %+v", TestProxy_LogicalDevice, added)
+ }
+}
+
+func TestProxy_2_2_1_Get_AllLogicalDevices(t *testing.T) {
+ logicalDevices := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices", 1, false, "")
+
+ if len(logicalDevices.([]interface{})) == 0 {
+ t.Error("there are no available logical devices to retrieve")
+ } else {
+ // Save the target device id for later tests
+ TestProxy_TargetLogicalDeviceId = logicalDevices.([]interface{})[0].(*voltha.LogicalDevice).Id
+ t.Logf("retrieved all logical devices: %+v", logicalDevices)
+ }
+}
+
+func TestProxy_2_2_2_Get_SingleLogicalDevice(t *testing.T) {
+ if ld := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
+ t.Errorf("Failed to find logical device : %s", TestProxy_TargetLogicalDeviceId)
+ } else {
+ ldJSON, _ := json.Marshal(ld)
+ t.Logf("Found logical device: %s", string(ldJSON))
+ }
+
+}
+
+func TestProxy_2_3_1_Update_LogicalDevice(t *testing.T) {
+ var fwVersion int
+ preUpdateExecuted := make(chan struct{})
+ postUpdateExecuted := make(chan struct{})
+ preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
+
+ if retrieved := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, ""); retrieved == nil {
+ t.Error("Failed to get logical device")
+ } else {
+ t.Logf("Found raw logical device (root proxy): %+v", retrieved)
+
+ if retrieved.(*voltha.LogicalDevice).RootDeviceId == "" {
+ fwVersion = 0
+ } else {
+ fwVersion, _ = strconv.Atoi(retrieved.(*voltha.LogicalDevice).RootDeviceId)
+ fwVersion++
+ }
+
+ TestProxy_Root_LogicalDevice.RegisterCallback(
+ PRE_UPDATE,
+ commonChanCallback,
+ "PRE_UPDATE instructions (root proxy)", &preUpdateExecutedPtr,
+ )
+ TestProxy_Root_LogicalDevice.RegisterCallback(
+ POST_UPDATE,
+ commonChanCallback,
+ "POST_UPDATE instructions (root proxy)", &postUpdateExecutedPtr,
+ )
+
+ retrieved.(*voltha.LogicalDevice).RootDeviceId = strconv.Itoa(fwVersion)
+
+ if afterUpdate := TestProxy_Root_LogicalDevice.Update(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, retrieved, false,
+ ""); afterUpdate == nil {
+ t.Error("Failed to update logical device")
+ } else {
+ t.Logf("Updated logical device : %+v", afterUpdate)
+ }
+
+ if !verifyGotResponse(preUpdateExecuted) {
+ t.Error("PRE_UPDATE callback was not executed")
+ }
+ if !verifyGotResponse(postUpdateExecuted) {
+ t.Error("POST_UPDATE callback was not executed")
+ }
+
+ if d := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated logical device (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+
+ t.Logf("Found logical device (root proxy): %s raw: %+v", string(djson), d)
+ }
+ }
+}
+
+func TestProxy_2_3_2_Update_LogicalDeviceFlows(t *testing.T) {
+ // Get a device proxy and update a specific port
+ ldFlowsProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", false)
+ flows := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ flows.(*openflow_13.Flows).Items[0].TableId = rand.Uint32()
+ t.Logf("before updated flows: %+v", flows)
+
+ ldFlowsProxy.RegisterCallback(
+ PRE_UPDATE,
+ commonCallback2,
+ )
+ ldFlowsProxy.RegisterCallback(
+ POST_UPDATE,
+ commonCallback2,
+ )
+
+ kvFlows := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
+
+ if reflect.DeepEqual(flows, kvFlows) {
+ t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
+ }
+
+ if updated := ldFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ t.Error("Failed to update logical device flows")
+ } else {
+ t.Logf("Updated logical device flows : %+v", updated)
+ }
+
+ if d := ldFlowsProxy.Get(context.Background(), "/", 0, false, ""); d == nil {
+ t.Error("Failed to find updated logical device flows (flows proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found flows (flows proxy): %s", string(djson))
+ }
+
+ if d := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", 0, false,
+ ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated logical device flows (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found logical device flows (root proxy): %s", string(djson))
+ }
+}
+
+func TestProxy_2_4_1_Remove_Device(t *testing.T) {
+ preRemoveExecuted := make(chan struct{})
+ postRemoveExecuted := make(chan struct{})
+ preRemoveExecutedPtr, postRemoveExecutedPtr := preRemoveExecuted, postRemoveExecuted
+
+ TestProxy_Root_LogicalDevice.RegisterCallback(
+ PRE_REMOVE,
+ commonChanCallback,
+ "PRE_REMOVE instructions (root proxy)", &preRemoveExecutedPtr,
+ )
+ TestProxy_Root_LogicalDevice.RegisterCallback(
+ POST_REMOVE,
+ commonChanCallback,
+ "POST_REMOVE instructions (root proxy)", &postRemoveExecutedPtr,
+ )
+
+ if removed := TestProxy_Root_LogicalDevice.Remove(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, ""); removed == nil {
+ t.Error("Failed to remove logical device")
+ } else {
+ t.Logf("Removed device : %+v", removed)
+ }
+
+ if !verifyGotResponse(preRemoveExecuted) {
+ t.Error("PRE_REMOVE callback was not executed")
+ }
+ if !verifyGotResponse(postRemoveExecuted) {
+ t.Error("POST_REMOVE callback was not executed")
+ }
+
+ if d := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ djson, _ := json.Marshal(d)
+ t.Errorf("Device was not removed - %s", djson)
+ } else {
+ t.Logf("Device was removed: %s", TestProxy_LogicalDeviceId)
+ }
+}
+
+// -----------------------------
+// Callback tests
+// -----------------------------
+
+func TestProxy_Callbacks_1_Register(t *testing.T) {
+ TestProxy_Root_Device.RegisterCallback(PRE_ADD, firstCallback, "abcde", "12345")
+
+ m := make(map[string]string)
+ m["name"] = "fghij"
+ TestProxy_Root_Device.RegisterCallback(PRE_ADD, secondCallback, m, 1.2345)
+
+ d := &voltha.Device{Id: "12345"}
+ TestProxy_Root_Device.RegisterCallback(PRE_ADD, thirdCallback, "klmno", d)
+}
+
+func TestProxy_Callbacks_2_Invoke_WithNoInterruption(t *testing.T) {
+ TestProxy_Root_Device.InvokeCallbacks(PRE_ADD, false, nil)
+}
+
+func TestProxy_Callbacks_3_Invoke_WithInterruption(t *testing.T) {
+ TestProxy_Root_Device.InvokeCallbacks(PRE_ADD, true, nil)
+}
+
+func TestProxy_Callbacks_4_Unregister(t *testing.T) {
+ TestProxy_Root_Device.UnregisterCallback(PRE_ADD, firstCallback)
+ TestProxy_Root_Device.UnregisterCallback(PRE_ADD, secondCallback)
+ TestProxy_Root_Device.UnregisterCallback(PRE_ADD, thirdCallback)
+}
+
+//func TestProxy_Callbacks_5_Add(t *testing.T) {
+// TestProxy_Root_Device.Root.AddCallback(TestProxy_Root_Device.InvokeCallbacks, POST_UPDATE, false, "some data", "some new data")
+//}
+//
+//func TestProxy_Callbacks_6_Execute(t *testing.T) {
+// TestProxy_Root_Device.Root.ExecuteCallbacks()
+//}
diff --git a/db/model/revision.go b/db/model/revision.go
new file mode 100644
index 0000000..29fc5e9
--- /dev/null
+++ b/db/model/revision.go
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "context"
+ "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
+ "time"
+)
+
+type Revision interface {
+ Finalize(bool)
+ SetConfig(revision *DataRevision)
+ GetConfig() *DataRevision
+ Drop(txid string, includeConfig bool)
+ StorageDrop(txid string, includeConfig bool)
+ ChildDrop(childType string, childHash string)
+ ChildDropByName(childName string)
+ SetChildren(name string, children []Revision)
+ GetChildren(name string) []Revision
+ SetAllChildren(children map[string][]Revision)
+ GetAllChildren() map[string][]Revision
+ SetHash(hash string)
+ GetHash() string
+ ClearHash()
+ getVersion() int64
+ SetupWatch(key string)
+ SetName(name string)
+ GetName() string
+ SetBranch(branch *Branch)
+ GetBranch() *Branch
+ Get(int) interface{}
+ GetData() interface{}
+ GetNode() *node
+ SetLastUpdate(ts ...time.Time)
+ GetLastUpdate() time.Time
+ LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision
+ UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision
+ UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision
+ UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision
+}
diff --git a/db/model/root.go b/db/model/root.go
new file mode 100644
index 0000000..8ac1311
--- /dev/null
+++ b/db/model/root.go
@@ -0,0 +1,311 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "context"
+ "encoding/hex"
+ "encoding/json"
+ "github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v2/pkg/db"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "reflect"
+ "sync"
+)
+
+// Root is used to provide an abstraction to the base root structure
+type Root interface {
+ Node
+
+ ExecuteCallbacks()
+ AddCallback(callback CallbackFunction, args ...interface{})
+ AddNotificationCallback(callback CallbackFunction, args ...interface{})
+}
+
+// root points to the top of the data model tree or sub-tree identified by a proxy
+type root struct {
+ *node
+
+ Callbacks []CallbackTuple
+ NotificationCallbacks []CallbackTuple
+
+ DirtyNodes map[string][]*node
+ KvStore *db.Backend
+ Loading bool
+ RevisionClass interface{}
+
+ mutex sync.RWMutex
+}
+
+// NewRoot creates an new instance of a root object
+func NewRoot(initialData interface{}, kvStore *db.Backend) *root {
+ root := &root{}
+
+ root.KvStore = kvStore
+ root.DirtyNodes = make(map[string][]*node)
+ root.Loading = false
+
+ // If there is no storage in place just revert to
+ // a non persistent mechanism
+ if kvStore != nil {
+ root.RevisionClass = reflect.TypeOf(PersistedRevision{})
+ } else {
+ root.RevisionClass = reflect.TypeOf(NonPersistedRevision{})
+ }
+
+ root.Callbacks = []CallbackTuple{}
+ root.NotificationCallbacks = []CallbackTuple{}
+
+ root.node = NewNode(root, initialData, false, "")
+
+ return root
+}
+
+// MakeTxBranch creates a new transaction branch
+func (r *root) MakeTxBranch() string {
+ txidBin, _ := uuid.New().MarshalBinary()
+ txid := hex.EncodeToString(txidBin)[:12]
+
+ r.DirtyNodes[txid] = []*node{r.node}
+ r.node.MakeBranch(txid)
+
+ return txid
+}
+
+// DeleteTxBranch removes a transaction branch
+func (r *root) DeleteTxBranch(txid string) {
+ for _, dirtyNode := range r.DirtyNodes[txid] {
+ dirtyNode.DeleteBranch(txid)
+ }
+ delete(r.DirtyNodes, txid)
+ r.node.DeleteBranch(txid)
+}
+
+// FoldTxBranch will merge the contents of a transaction branch with the root object
+func (r *root) FoldTxBranch(txid string) {
+ // Start by doing a dry run of the merge
+ // If that fails, it bails out and the branch is deleted
+ if _, err := r.node.MergeBranch(txid, true); err != nil {
+ // Merge operation fails
+ r.DeleteTxBranch(txid)
+ } else {
+ r.node.MergeBranch(txid, false)
+ r.node.GetRoot().ExecuteCallbacks()
+ r.DeleteTxBranch(txid)
+ }
+}
+
+// ExecuteCallbacks will invoke all the callbacks linked to root object
+func (r *root) ExecuteCallbacks() {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ for len(r.Callbacks) > 0 {
+ callback := r.Callbacks[0]
+ r.Callbacks = r.Callbacks[1:]
+ go callback.Execute(nil)
+ }
+ //for len(r.NotificationCallbacks) > 0 {
+ // callback := r.NotificationCallbacks[0]
+ // r.NotificationCallbacks = r.NotificationCallbacks[1:]
+ // go callback.Execute(nil)
+ //}
+}
+
+func (r *root) hasCallbacks() bool {
+ return len(r.Callbacks) == 0
+}
+
+// getCallbacks returns the available callbacks
+func (r *root) GetCallbacks() []CallbackTuple {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ return r.Callbacks
+}
+
+// getCallbacks returns the available notification callbacks
+func (r *root) GetNotificationCallbacks() []CallbackTuple {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ return r.NotificationCallbacks
+}
+
+// AddCallback inserts a new callback with its arguments
+func (r *root) AddCallback(callback CallbackFunction, args ...interface{}) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ r.Callbacks = append(r.Callbacks, CallbackTuple{callback, args})
+}
+
+// AddNotificationCallback inserts a new notification callback with its arguments
+func (r *root) AddNotificationCallback(callback CallbackFunction, args ...interface{}) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
+}
+
+func (r *root) syncParent(childRev Revision, txid string) {
+ data := proto.Clone(r.GetProxy().ParentNode.Latest().GetData().(proto.Message))
+
+ for fieldName, _ := range ChildrenFields(data) {
+ childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
+ if reflect.TypeOf(childRev.GetData()) == reflect.TypeOf(childDataHolder.Interface()) {
+ childDataHolder = reflect.ValueOf(childRev.GetData())
+ reflect.ValueOf(data).Elem().FieldByName(childDataName).Set(childDataHolder)
+ }
+ }
+
+ r.GetProxy().ParentNode.Latest().SetConfig(NewDataRevision(r.GetProxy().ParentNode.GetRoot(), data))
+ r.GetProxy().ParentNode.Latest(txid).Finalize(false)
+}
+
+// Update modifies the content of an object at a given path with the provided data
+func (r *root) Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
+ var result Revision
+
+ if makeBranch != nil {
+ // TODO: raise error
+ }
+
+ if r.hasCallbacks() {
+ // TODO: raise error
+ }
+
+ if txid != "" {
+ trackDirty := func(node *node) *Branch {
+ r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
+ return node.MakeBranch(txid)
+ }
+ result = r.node.Update(ctx, path, data, strict, txid, trackDirty)
+ } else {
+ result = r.node.Update(ctx, path, data, strict, "", nil)
+ }
+
+ if result != nil {
+ if r.GetProxy().FullPath != r.GetProxy().Path {
+ r.syncParent(result, txid)
+ } else {
+ result.Finalize(false)
+ }
+ }
+
+ r.node.GetRoot().ExecuteCallbacks()
+
+ return result
+}
+
+// Add creates a new object at the given path with the provided data
+func (r *root) Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
+ var result Revision
+
+ if makeBranch != nil {
+ // TODO: raise error
+ }
+
+ if r.hasCallbacks() {
+ // TODO: raise error
+ }
+
+ if txid != "" {
+ trackDirty := func(node *node) *Branch {
+ r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
+ return node.MakeBranch(txid)
+ }
+ result = r.node.Add(ctx, path, data, txid, trackDirty)
+ } else {
+ result = r.node.Add(ctx, path, data, "", nil)
+ }
+
+ if result != nil {
+ result.Finalize(true)
+ r.node.GetRoot().ExecuteCallbacks()
+ }
+ return result
+}
+
+// Remove discards an object at a given path
+func (r *root) Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision {
+ var result Revision
+
+ if makeBranch != nil {
+ // TODO: raise error
+ }
+
+ if r.hasCallbacks() {
+ // TODO: raise error
+ }
+
+ if txid != "" {
+ trackDirty := func(node *node) *Branch {
+ r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
+ return node.MakeBranch(txid)
+ }
+ result = r.node.Remove(ctx, path, txid, trackDirty)
+ } else {
+ result = r.node.Remove(ctx, path, "", nil)
+ }
+
+ r.node.GetRoot().ExecuteCallbacks()
+
+ return result
+}
+
+// MakeLatest updates a branch with the latest node revision
+func (r *root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
+ r.makeLatest(branch, revision, changeAnnouncement)
+}
+
+func (r *root) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ if r.RevisionClass.(reflect.Type) == reflect.TypeOf(PersistedRevision{}) {
+ return NewPersistedRevision(branch, data, children)
+ }
+
+ return NewNonPersistedRevision(r, branch, data, children)
+}
+
+func (r *root) makeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
+ r.node.makeLatest(branch, revision, changeAnnouncement)
+
+ if r.KvStore != nil && branch.Txid == "" {
+ tags := make(map[string]string)
+ for k, v := range r.node.Tags {
+ tags[k] = v.GetHash()
+ }
+ data := &rootData{
+ Latest: branch.GetLatest().GetHash(),
+ Tags: tags,
+ }
+ if blob, err := json.Marshal(data); err != nil {
+ // TODO report error
+ } else {
+ log.Debugf("Changing root to : %s", string(blob))
+ if err := r.KvStore.Put("root", blob); err != nil {
+ log.Errorf("failed to properly put value in kvstore - err: %s", err.Error())
+ }
+ }
+ }
+}
+
+type rootData struct {
+ Latest string `json:"latest"`
+ Tags map[string]string `json:"tags"`
+}
diff --git a/db/model/transaction.go b/db/model/transaction.go
new file mode 100644
index 0000000..d7a34e7
--- /dev/null
+++ b/db/model/transaction.go
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "context"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+type Transaction struct {
+ proxy *Proxy
+ txid string
+}
+
+func NewTransaction(proxy *Proxy, txid string) *Transaction {
+ tx := &Transaction{
+ proxy: proxy,
+ txid: txid,
+ }
+ return tx
+}
+func (t *Transaction) Get(ctx context.Context, path string, depth int, deep bool) interface{} {
+ if t.txid == "" {
+ log.Errorf("closed transaction")
+ return nil
+ }
+ // TODO: need to review the return values at the different layers!!!!!
+ return t.proxy.Get(ctx, path, depth, deep, t.txid)
+}
+func (t *Transaction) Update(ctx context.Context, path string, data interface{}, strict bool) interface{} {
+ if t.txid == "" {
+ log.Errorf("closed transaction")
+ return nil
+ }
+ return t.proxy.Update(ctx, path, data, strict, t.txid)
+}
+func (t *Transaction) Add(ctx context.Context, path string, data interface{}) interface{} {
+ if t.txid == "" {
+ log.Errorf("closed transaction")
+ return nil
+ }
+ return t.proxy.Add(ctx, path, data, t.txid)
+}
+func (t *Transaction) Remove(ctx context.Context, path string) interface{} {
+ if t.txid == "" {
+ log.Errorf("closed transaction")
+ return nil
+ }
+ return t.proxy.Remove(ctx, path, t.txid)
+}
+func (t *Transaction) Cancel() {
+ t.proxy.cancelTransaction(t.txid)
+ t.txid = ""
+}
+func (t *Transaction) Commit() {
+ t.proxy.commitTransaction(t.txid)
+ t.txid = ""
+}
diff --git a/db/model/transaction_test.go b/db/model/transaction_test.go
new file mode 100644
index 0000000..c33e5be
--- /dev/null
+++ b/db/model/transaction_test.go
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "context"
+ "encoding/hex"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-protos/v2/go/common"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "strconv"
+ "testing"
+)
+
+var (
+ TestTransaction_Root *root
+ TestTransaction_RootProxy *Proxy
+ TestTransaction_TargetDeviceId string
+ TestTransaction_DeviceId string
+)
+
+func init() {
+ TestTransaction_Root = NewRoot(&voltha.Voltha{}, nil)
+ TestTransaction_RootProxy = TestTransaction_Root.node.CreateProxy(context.Background(), "/", false)
+}
+
+//func TestTransaction_1_GetDevices(t *testing.T) {
+// getTx := TestTransaction_RootProxy.OpenTransaction()
+//
+// devices := getTx.Get("/devices", 1, false)
+//
+// if len(devices.([]interface{})) == 0 {
+// t.Error("there are no available devices to retrieve")
+// } else {
+// // Save the target device id for later tests
+// TestTransaction_TargetDeviceId = devices.([]interface{})[0].(*voltha.Device).Id
+// t.Logf("retrieved devices: %+v", devices)
+// }
+//
+// getTx.Commit()
+//}
+
+func TestTransaction_2_AddDevice(t *testing.T) {
+ devIDBin, _ := uuid.New().MarshalBinary()
+ TestTransaction_DeviceId = "0001" + hex.EncodeToString(devIDBin)[:12]
+
+ ports := []*voltha.Port{
+ {
+ PortNo: 123,
+ Label: "test-port-0",
+ Type: voltha.Port_PON_OLT,
+ AdminState: common.AdminState_ENABLED,
+ OperStatus: common.OperStatus_ACTIVE,
+ DeviceId: "etcd_port-0-device-id",
+ Peers: []*voltha.Port_PeerPort{},
+ },
+ }
+
+ device := &voltha.Device{
+ Id: TestTransaction_DeviceId,
+ Type: "simulated_olt",
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ Ports: ports,
+ }
+
+ addTx := TestTransaction_RootProxy.OpenTransaction()
+
+ if added := addTx.Add(context.Background(), "/devices", device); added == nil {
+ t.Error("Failed to add device")
+ } else {
+ TestTransaction_TargetDeviceId = added.(*voltha.Device).Id
+ t.Logf("Added device : %+v", added)
+ }
+ addTx.Commit()
+}
+
+func TestTransaction_3_GetDevice_PostAdd(t *testing.T) {
+
+ basePath := "/devices/" + TestTransaction_DeviceId
+
+ getDevWithPortsTx := TestTransaction_RootProxy.OpenTransaction()
+ device1 := getDevWithPortsTx.Get(context.Background(), basePath+"/ports", 1, false)
+ t.Logf("retrieved device with ports: %+v", device1)
+ getDevWithPortsTx.Commit()
+
+ getDevTx := TestTransaction_RootProxy.OpenTransaction()
+ device2 := getDevTx.Get(context.Background(), basePath, 0, false)
+ t.Logf("retrieved device: %+v", device2)
+
+ getDevTx.Commit()
+}
+
+func TestTransaction_4_UpdateDevice(t *testing.T) {
+ updateTx := TestTransaction_RootProxy.OpenTransaction()
+ if retrieved := updateTx.Get(context.Background(), "/devices/"+TestTransaction_TargetDeviceId, 1, false); retrieved == nil {
+ t.Error("Failed to get device")
+ } else {
+ var fwVersion int
+ if retrieved.(*voltha.Device).FirmwareVersion == "n/a" {
+ fwVersion = 0
+ } else {
+ fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
+ fwVersion++
+ }
+
+ //cloned := reflect.ValueOf(retrieved).Elem().Interface().(voltha.Device)
+ retrieved.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
+ t.Logf("Before update : %+v", retrieved)
+
+ // FIXME: The makeBranch passed in function is nil or not being executed properly!!!!!
+ if afterUpdate := updateTx.Update(context.Background(), "/devices/"+TestTransaction_TargetDeviceId, retrieved, false); afterUpdate == nil {
+ t.Error("Failed to update device")
+ } else {
+ t.Logf("Updated device : %+v", afterUpdate)
+ }
+ }
+ updateTx.Commit()
+}
+
+func TestTransaction_5_GetDevice_PostUpdate(t *testing.T) {
+
+ basePath := "/devices/" + TestTransaction_DeviceId
+
+ getDevWithPortsTx := TestTransaction_RootProxy.OpenTransaction()
+ device1 := getDevWithPortsTx.Get(context.Background(), basePath+"/ports", 1, false)
+ t.Logf("retrieved device with ports: %+v", device1)
+ getDevWithPortsTx.Commit()
+
+ getDevTx := TestTransaction_RootProxy.OpenTransaction()
+ device2 := getDevTx.Get(context.Background(), basePath, 0, false)
+ t.Logf("retrieved device: %+v", device2)
+
+ getDevTx.Commit()
+}
+
+func TestTransaction_6_RemoveDevice(t *testing.T) {
+ removeTx := TestTransaction_RootProxy.OpenTransaction()
+ if removed := removeTx.Remove(context.Background(), "/devices/"+TestTransaction_DeviceId); removed == nil {
+ t.Error("Failed to remove device")
+ } else {
+ t.Logf("Removed device : %+v", removed)
+ }
+ removeTx.Commit()
+}
+
+func TestTransaction_7_GetDevice_PostRemove(t *testing.T) {
+
+ basePath := "/devices/" + TestTransaction_DeviceId
+
+ getDevTx := TestTransaction_RootProxy.OpenTransaction()
+ device := TestTransaction_RootProxy.Get(context.Background(), basePath, 0, false, "")
+ t.Logf("retrieved device: %+v", device)
+
+ getDevTx.Commit()
+}
diff --git a/db/model/utils.go b/db/model/utils.go
new file mode 100644
index 0000000..b28e92f
--- /dev/null
+++ b/db/model/utils.go
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package model
+
+import (
+ "bytes"
+ "encoding/gob"
+ "reflect"
+ "strings"
+)
+
+// IsProtoMessage determines if the specified implements proto.Message type
+func IsProtoMessage(object interface{}) bool {
+ var ok = false
+
+ if object != nil {
+ st := reflect.TypeOf(object)
+ _, ok = st.MethodByName("ProtoMessage")
+ }
+ return ok
+}
+
+// FindOwnerType will traverse a data structure and find the parent type of the specified object
+func FindOwnerType(obj reflect.Value, name string, depth int, found bool) reflect.Type {
+ prefix := ""
+ for d := 0; d < depth; d++ {
+ prefix += ">>"
+ }
+ k := obj.Kind()
+ switch k {
+ case reflect.Ptr:
+ if found {
+ return obj.Type()
+ }
+
+ t := obj.Type().Elem()
+ n := reflect.New(t)
+
+ if rc := FindOwnerType(n.Elem(), name, depth+1, found); rc != nil {
+ return rc
+ }
+
+ case reflect.Struct:
+ if found {
+ return obj.Type()
+ }
+
+ for i := 0; i < obj.NumField(); i++ {
+ v := reflect.Indirect(obj)
+
+ json := strings.Split(v.Type().Field(i).Tag.Get("json"), ",")
+
+ if json[0] == name {
+ return FindOwnerType(obj.Field(i), name, depth+1, true)
+ }
+
+ if rc := FindOwnerType(obj.Field(i), name, depth+1, found); rc != nil {
+ return rc
+ }
+ }
+ case reflect.Slice:
+ s := reflect.MakeSlice(obj.Type(), 1, 1)
+ n := reflect.New(obj.Type())
+ n.Elem().Set(s)
+
+ for i := 0; i < n.Elem().Len(); i++ {
+ if found {
+ return reflect.ValueOf(n.Elem().Index(i).Interface()).Type()
+ }
+ }
+
+ for i := 0; i < obj.Len(); i++ {
+ if found {
+ return obj.Index(i).Type()
+ }
+
+ if rc := FindOwnerType(obj.Index(i), name, depth+1, found); rc != nil {
+ return rc
+ }
+ }
+ default:
+ //log.Debugf("%s Unhandled <%+v> ... It's a %+v\n", prefix, obj, k)
+ }
+
+ return nil
+}
+
+// FindKeyOwner will traverse a structure to find the owner type of the specified name
+func FindKeyOwner(iface interface{}, name string, depth int) interface{} {
+ obj := reflect.ValueOf(iface)
+ k := obj.Kind()
+ switch k {
+ case reflect.Ptr:
+ t := obj.Type().Elem()
+ n := reflect.New(t)
+
+ if rc := FindKeyOwner(n.Elem().Interface(), name, depth+1); rc != nil {
+ return rc
+ }
+
+ case reflect.Struct:
+ for i := 0; i < obj.NumField(); i++ {
+ json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
+
+ if json[0] == name {
+ return obj.Type().Field(i).Type
+ }
+
+ if rc := FindKeyOwner(obj.Field(i).Interface(), name, depth+1); rc != nil {
+ return rc
+ }
+ }
+
+ case reflect.Slice:
+ s := reflect.MakeSlice(obj.Type(), 1, 1)
+ n := reflect.New(obj.Type())
+ n.Elem().Set(s)
+
+ for i := 0; i < n.Elem().Len(); i++ {
+ if rc := FindKeyOwner(n.Elem().Index(i).Interface(), name, depth+1); rc != nil {
+ return rc
+ }
+ }
+ default:
+ //log.Debugf("%s Unhandled <%+v> ... It's a %+v\n", prefix, obj, k)
+ }
+
+ return nil
+}
+
+// GetAttributeValue traverse a structure to find the value of an attribute
+// FIXME: Need to figure out if GetAttributeValue and GetAttributeStructure can become one
+// Code is repeated in both, but outputs have a different purpose
+// Left as-is for now to get things working
+func GetAttributeValue(data interface{}, name string, depth int) (string, reflect.Value) {
+ var attribName string
+ var attribValue reflect.Value
+ obj := reflect.ValueOf(data)
+
+ if !obj.IsValid() {
+ return attribName, attribValue
+ }
+
+ k := obj.Kind()
+ switch k {
+ case reflect.Ptr:
+ if obj.IsNil() {
+ return attribName, attribValue
+ }
+
+ if attribName, attribValue = GetAttributeValue(obj.Elem().Interface(), name, depth+1); attribValue.IsValid() {
+ return attribName, attribValue
+ }
+
+ case reflect.Struct:
+ for i := 0; i < obj.NumField(); i++ {
+ json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
+
+ if json[0] == name {
+ return obj.Type().Field(i).Name, obj.Field(i)
+ }
+
+ if obj.Field(i).IsValid() {
+ if attribName, attribValue = GetAttributeValue(obj.Field(i).Interface(), name, depth+1); attribValue.IsValid() {
+ return attribName, attribValue
+ }
+ }
+ }
+
+ case reflect.Slice:
+ s := reflect.MakeSlice(obj.Type(), 1, 1)
+ n := reflect.New(obj.Type())
+ n.Elem().Set(s)
+
+ for i := 0; i < obj.Len(); i++ {
+ if attribName, attribValue = GetAttributeValue(obj.Index(i).Interface(), name, depth+1); attribValue.IsValid() {
+ return attribName, attribValue
+ }
+ }
+ default:
+ //log.Debugf("%s Unhandled <%+v> ... It's a %+v\n", prefix, obj, k)
+ }
+
+ return attribName, attribValue
+
+}
+
+// GetAttributeStructure will traverse a structure to find the data structure for the named attribute
+// FIXME: See GetAttributeValue(...) comment
+func GetAttributeStructure(data interface{}, name string, depth int) reflect.StructField {
+ var result reflect.StructField
+ obj := reflect.ValueOf(data)
+
+ if !obj.IsValid() {
+ return result
+ }
+
+ k := obj.Kind()
+ switch k {
+ case reflect.Ptr:
+ t := obj.Type().Elem()
+ n := reflect.New(t)
+
+ if rc := GetAttributeStructure(n.Elem().Interface(), name, depth+1); rc.Name != "" {
+ return rc
+ }
+
+ case reflect.Struct:
+ for i := 0; i < obj.NumField(); i++ {
+ v := reflect.Indirect(obj)
+ json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
+
+ if json[0] == name {
+ return v.Type().Field(i)
+ }
+
+ if obj.Field(i).IsValid() {
+ if rc := GetAttributeStructure(obj.Field(i).Interface(), name, depth+1); rc.Name != "" {
+ return rc
+ }
+ }
+ }
+
+ case reflect.Slice:
+ s := reflect.MakeSlice(obj.Type(), 1, 1)
+ n := reflect.New(obj.Type())
+ n.Elem().Set(s)
+
+ for i := 0; i < obj.Len(); i++ {
+ if rc := GetAttributeStructure(obj.Index(i).Interface(), name, depth+1); rc.Name != "" {
+ return rc
+ }
+
+ }
+ default:
+ //log.Debugf("%s Unhandled <%+v> ... It's a %+v\n", prefix, obj, k)
+ }
+
+ return result
+
+}
+
+func clone2(a interface{}) interface{} {
+ b := reflect.ValueOf(a)
+ buff := new(bytes.Buffer)
+ enc := gob.NewEncoder(buff)
+ dec := gob.NewDecoder(buff)
+ enc.Encode(a)
+ dec.Decode(b.Elem().Interface())
+
+ return b.Interface()
+}
+
+func clone(a, b interface{}) interface{} {
+ buff := new(bytes.Buffer)
+ enc := gob.NewEncoder(buff)
+ dec := gob.NewDecoder(buff)
+ enc.Encode(a)
+ dec.Decode(b)
+ return b
+}