VOL-2868 Model simplification/removal.
Reduced the model to its most commonly used functions. (Removed unused logic & test cases.)
Reworked remaining functions to be more intuitive to use, and to more closely follow golang conventions.
Change-Id: Ibbb267ff37e039b73489b4379aa2654208614d5b
diff --git a/db/model/base_test.go b/db/model/base_test.go
deleted file mode 100644
index 94bb185..0000000
--- a/db/model/base_test.go
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package model
-
-import (
- "context"
- "runtime/debug"
- "sync"
-
- "github.com/opencord/voltha-protos/v3/go/voltha"
-)
-
-var callbackMutex sync.Mutex
-
-func commonChanCallback(ctx context.Context, args ...interface{}) interface{} {
- logger.Infof("Running common callback - arg count: %d", len(args))
-
- //for i := 0; i < len(args); i++ {
- // logger.Infof("ARG %d : %+v", i, args[i])
- //}
-
- callbackMutex.Lock()
- defer callbackMutex.Unlock()
-
- execDoneChan := args[1].(*chan struct{})
-
- // Inform the caller that the callback was executed
- if *execDoneChan != nil {
- logger.Infof("Sending completion indication - stack:%s", string(debug.Stack()))
- close(*execDoneChan)
- *execDoneChan = nil
- }
-
- return nil
-}
-
-func commonCallback2(ctx context.Context, args ...interface{}) interface{} {
- logger.Infof("Running common2 callback - arg count: %d %+v", len(args), args)
-
- return nil
-}
-
-func commonCallbackFunc(ctx context.Context, args ...interface{}) interface{} {
- logger.Infof("Running common callback - arg count: %d", len(args))
-
- for i := 0; i < len(args); i++ {
- logger.Infof("ARG %d : %+v", i, args[i])
- }
- execStatusFunc := args[1].(func(bool))
-
- // Inform the caller that the callback was executed
- execStatusFunc(true)
-
- return nil
-}
-
-func firstCallback(ctx context.Context, args ...interface{}) interface{} {
- name := args[0]
- id := args[1]
- logger.Infof("Running first callback - name: %s, id: %s\n", name, id)
- return nil
-}
-
-func secondCallback(ctx context.Context, args ...interface{}) interface{} {
- name := args[0].(map[string]string)
- id := args[1]
- logger.Infof("Running second callback - name: %s, id: %f\n", name["name"], id)
- // FIXME: the panic call seem to interfere with the logging mechanism
- //panic("Generating a panic in second callback")
- return nil
-}
-
-func thirdCallback(ctx context.Context, args ...interface{}) interface{} {
- name := args[0]
- id := args[1].(*voltha.Device)
- logger.Infof("Running third callback - name: %+v, id: %s\n", name, id.Id)
- return nil
-}
diff --git a/db/model/branch.go b/db/model/branch.go
deleted file mode 100644
index 17d9ece..0000000
--- a/db/model/branch.go
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "sync"
-
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
-)
-
-// TODO: implement weak references or something equivalent
-// TODO: missing proper logging
-
-// Branch structure is used to classify a collection of transaction based revisions
-type Branch struct {
- mutex sync.RWMutex
- Node *node
- Txid string
- Origin Revision
- Revisions map[string]Revision
- LatestLock sync.RWMutex
- Latest Revision
-}
-
-// NewBranch creates a new instance of the Branch structure
-func NewBranch(node *node, txid string, origin Revision, autoPrune bool) *Branch {
- b := &Branch{}
- b.Node = node
- b.Txid = txid
- b.Origin = origin
- b.Revisions = make(map[string]Revision)
- b.Latest = origin
-
- return b
-}
-
-// Utility function to extract all children names for a given revision (mostly for debugging purposes)
-func (b *Branch) retrieveChildrenNames(revision Revision) []string {
- var childrenNames []string
-
- for _, child := range revision.GetChildren("devices") {
- childrenNames = append(childrenNames, child.GetName())
- }
-
- return childrenNames
-}
-
-// Utility function to compare children names and report the missing ones (mostly for debugging purposes)
-func (b *Branch) findMissingChildrenNames(previousNames, latestNames []string) []string {
- var missingNames []string
-
- for _, previousName := range previousNames {
- found := false
-
- if len(latestNames) == 0 {
- break
- }
-
- for _, latestName := range latestNames {
- if previousName == latestName {
- found = true
- break
- }
- }
- if !found {
- missingNames = append(missingNames, previousName)
- }
- }
-
- return missingNames
-}
-
-// SetLatest assigns the latest revision for this branch
-func (b *Branch) SetLatest(latest Revision) {
- b.mutex.Lock()
- defer b.mutex.Unlock()
-
- if b.Latest != nil {
- logger.Debugw("updating-latest-revision", log.Fields{"current": b.Latest.GetHash(), "new": latest.GetHash()})
-
- // Go through list of children names in current revision and new revision
- // and then compare the resulting outputs to ensure that we have not lost any entries.
-
- if level, _ := log.GetPackageLogLevel(); level == log.DebugLevel {
- var previousNames, latestNames, missingNames []string
-
- if previousNames = b.retrieveChildrenNames(b.Latest); len(previousNames) > 0 {
- logger.Debugw("children-of-previous-revision", log.Fields{"hash": b.Latest.GetHash(), "names": previousNames})
- }
-
- if latestNames = b.retrieveChildrenNames(b.Latest); len(latestNames) > 0 {
- logger.Debugw("children-of-latest-revision", log.Fields{"hash": latest.GetHash(), "names": latestNames})
- }
-
- if missingNames = b.findMissingChildrenNames(previousNames, latestNames); len(missingNames) > 0 {
- logger.Debugw("children-missing-in-latest-revision", log.Fields{"hash": latest.GetHash(), "names": missingNames})
- }
- }
-
- } else {
- logger.Debugw("setting-latest-revision", log.Fields{"new": latest.GetHash()})
- }
-
- b.Latest = latest
-}
-
-// GetLatest retrieves the latest revision of the branch
-func (b *Branch) GetLatest() Revision {
- b.mutex.RLock()
- defer b.mutex.RUnlock()
-
- return b.Latest
-}
-
-// GetOrigin retrieves the original revision of the branch
-func (b *Branch) GetOrigin() Revision {
- b.mutex.RLock()
- defer b.mutex.RUnlock()
-
- return b.Origin
-}
-
-// AddRevision inserts a new revision to the branch
-func (b *Branch) AddRevision(revision Revision) {
- if revision != nil && b.GetRevision(revision.GetHash()) == nil {
- b.SetRevision(revision.GetHash(), revision)
- }
-}
-
-// GetRevision pulls a revision entry at the specified hash
-func (b *Branch) GetRevision(hash string) Revision {
- b.mutex.RLock()
- defer b.mutex.RUnlock()
-
- if revision, ok := b.Revisions[hash]; ok {
- return revision
- }
-
- return nil
-}
-
-// SetRevision updates a revision entry at the specified hash
-func (b *Branch) SetRevision(hash string, revision Revision) {
- b.mutex.Lock()
- defer b.mutex.Unlock()
-
- b.Revisions[hash] = revision
-}
-
-// DeleteRevision removes a revision with the specified hash
-func (b *Branch) DeleteRevision(hash string) {
- b.mutex.Lock()
- defer b.mutex.Unlock()
-
- delete(b.Revisions, hash)
-}
diff --git a/db/model/branch_test.go b/db/model/branch_test.go
deleted file mode 100644
index d91d15e..0000000
--- a/db/model/branch_test.go
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package model
-
-import (
- "crypto/md5"
- "fmt"
- "testing"
-)
-
-var (
- TestBranchBranch *Branch
- TestBranchHash string
-)
-
-// Create a new branch and ensure that fields are populated
-func TestBranch_NewBranch(t *testing.T) {
- node := &node{}
- hash := fmt.Sprintf("%x", md5.Sum([]byte("origin_hash")))
- origin := &NonPersistedRevision{
- Config: &DataRevision{},
- Children: make(map[string][]Revision),
- Hash: hash,
- Branch: &Branch{},
- }
- txid := fmt.Sprintf("%x", md5.Sum([]byte("branch_transaction_id")))
-
- TestBranchBranch = NewBranch(node, txid, origin, true)
- t.Logf("New Branch(txid:%s) created: %+v\n", txid, TestBranchBranch)
-
- if TestBranchBranch.Latest == nil {
- t.Errorf("Branch latest pointer is nil")
- } else if TestBranchBranch.Origin == nil {
- t.Errorf("Branch origin pointer is nil")
- } else if TestBranchBranch.Node == nil {
- t.Errorf("Branch node pointer is nil")
- } else if TestBranchBranch.Revisions == nil {
- t.Errorf("Branch revisions map is nil")
- } else if TestBranchBranch.Txid == "" {
- t.Errorf("Branch transaction id is empty")
- }
-}
-
-// Add a new revision to the branch
-func TestBranch_AddRevision(t *testing.T) {
- TestBranchHash = fmt.Sprintf("%x", md5.Sum([]byte("revision_hash")))
- rev := &NonPersistedRevision{
- Config: &DataRevision{},
- Children: make(map[string][]Revision),
- Hash: TestBranchHash,
- Branch: &Branch{},
- }
-
- TestBranchBranch.AddRevision(rev)
- t.Logf("Added revision: %+v\n", rev)
-
- if len(TestBranchBranch.Revisions) == 0 {
- t.Errorf("Branch revisions map is empty")
- }
-}
-
-// Ensure that the added revision can be retrieved
-func TestBranch_GetRevision(t *testing.T) {
- if rev := TestBranchBranch.GetRevision(TestBranchHash); rev == nil {
- t.Errorf("Unable to retrieve revision for hash:%s", TestBranchHash)
- } else {
- t.Logf("Got revision for hash:%s rev:%+v\n", TestBranchHash, rev)
- }
-}
-
-// Set the added revision as the latest
-func TestBranch_LatestRevision(t *testing.T) {
- addedRevision := TestBranchBranch.GetRevision(TestBranchHash)
- TestBranchBranch.SetLatest(addedRevision)
-
- rev := TestBranchBranch.GetLatest()
- t.Logf("Retrieved latest revision :%+v", rev)
-
- if rev == nil {
- t.Error("Unable to retrieve latest revision")
- } else if rev.GetHash() != TestBranchHash {
- t.Errorf("Latest revision does not match hash: %s", TestBranchHash)
- }
-}
-
-// Ensure that the origin revision remains and differs from subsequent revisions
-func TestBranch_OriginRevision(t *testing.T) {
- rev := TestBranchBranch.Origin
- t.Logf("Retrieved origin revision :%+v", rev)
-
- if rev == nil {
- t.Error("Unable to retrieve origin revision")
- } else if rev.GetHash() == TestBranchHash {
- t.Errorf("Origin revision should differ from added revision: %s", TestBranchHash)
- }
-}
diff --git a/db/model/callback_type.go b/db/model/callback_type.go
deleted file mode 100644
index 796a6ce..0000000
--- a/db/model/callback_type.go
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-// CallbackType is an enumerated value to express when a callback should be executed
-type CallbackType uint8
-
-// Enumerated list of callback types
-const (
- Get CallbackType = iota
- PreUpdate
- PostUpdate
- PreAdd
- PostAdd
- PreRemove
- PostRemove
- PostListchange
-)
-
-var enumCallbackTypes = []string{
- "GET",
- "PRE_UPDATE",
- "POST_UPDATE",
- "PRE_ADD",
- "POST_ADD",
- "PRE_REMOVE",
- "POST_REMOVE",
- "POST_LISTCHANGE",
-}
-
-func (t CallbackType) String() string {
- return enumCallbackTypes[t]
-}
diff --git a/db/model/child_type.go b/db/model/child_type.go
deleted file mode 100644
index 494c0ef..0000000
--- a/db/model/child_type.go
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- desc "github.com/golang/protobuf/descriptor"
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/protoc-gen-go/descriptor"
- "github.com/opencord/voltha-protos/v3/go/common"
- "reflect"
- "strconv"
- "sync"
-)
-
-type childTypesSingleton struct {
- mutex sync.RWMutex
- cache map[reflect.Type]map[string]*ChildType
-}
-
-var childTypes = &childTypesSingleton{cache: make(map[reflect.Type]map[string]*ChildType)}
-
-func (s *childTypesSingleton) GetCacheEntry(key reflect.Type) (map[string]*ChildType, bool) {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- childTypeMap, exists := s.cache[key]
- return childTypeMap, exists
-}
-
-func (s *childTypesSingleton) SetCacheEntry(key reflect.Type, value map[string]*ChildType) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- s.cache[key] = value
-}
-
-// ChildType structure contains construct details of an object
-type ChildType struct {
- ClassModule string
- ClassType reflect.Type
- IsContainer bool
- Key string
- KeyFromStr func(s string) interface{}
-}
-
-// ChildrenFields retrieves list of child objects associated to a given type
-func ChildrenFields(cls interface{}) map[string]*ChildType {
- if cls == nil {
- return nil
- }
-
- msgType := reflect.TypeOf(cls)
-
- if fields, have := childTypes.GetCacheEntry(msgType); have {
- return fields
- }
-
- fields := make(map[string]*ChildType)
- _, md := desc.ForMessage(cls.(desc.Message))
-
- // TODO: Do we need to validate MD for nil, panic or exception?
- for _, field := range md.Field {
- if options := field.GetOptions(); options != nil {
- if proto.HasExtension(options, common.E_ChildNode) {
- isContainer := *field.Label == descriptor.FieldDescriptorProto_LABEL_REPEATED
- meta, _ := proto.GetExtension(options, common.E_ChildNode)
-
- var keyFromStr func(string) interface{}
- var ct ChildType
-
- parentType := FindOwnerType(reflect.ValueOf(cls), field.GetName(), 0, false)
- if meta.(*common.ChildNode).GetKey() != "" {
- keyType := FindKeyOwner(reflect.New(parentType).Elem().Interface(), meta.(*common.ChildNode).GetKey(), 0)
-
- switch keyType.(reflect.Type).Name() {
- case "string":
- keyFromStr = func(s string) interface{} {
- return s
- }
- case "int32":
- keyFromStr = func(s string) interface{} {
- i, _ := strconv.Atoi(s)
- return int32(i)
- }
- case "int64":
- keyFromStr = func(s string) interface{} {
- i, _ := strconv.Atoi(s)
- return int64(i)
- }
- case "uint32":
- keyFromStr = func(s string) interface{} {
- i, _ := strconv.Atoi(s)
- return uint32(i)
- }
- case "uint64":
- keyFromStr = func(s string) interface{} {
- i, _ := strconv.Atoi(s)
- return uint64(i)
- }
- default:
- logger.Errorf("Key type not implemented - type: %s\n", keyType.(reflect.Type))
- }
- }
-
- ct = ChildType{
- ClassModule: parentType.String(),
- ClassType: parentType,
- IsContainer: isContainer,
- Key: meta.(*common.ChildNode).GetKey(),
- KeyFromStr: keyFromStr,
- }
-
- fields[field.GetName()] = &ct
- }
- }
- }
-
- // If called multiple times in quick succession w/ the same message types, it is possible for different cache entries to be returned.
- // This should not be an issue, as the cache is merely for optimization purposes.
- childTypes.SetCacheEntry(msgType, fields)
- return fields
-}
diff --git a/db/model/child_type_test.go b/db/model/child_type_test.go
deleted file mode 100644
index b57d988..0000000
--- a/db/model/child_type_test.go
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package model
-
-import (
- "github.com/opencord/voltha-protos/v3/go/voltha"
- "reflect"
- "testing"
-)
-
-// Dissect a proto message by extracting all the children fields
-func TestChildType_01_Device_Proto_ChildrenFields(t *testing.T) {
- var cls *voltha.Device
-
- t.Logf("Extracting children fields from proto type: %s", reflect.TypeOf(cls))
- names := ChildrenFields(cls)
- t.Logf("Extracting children field names: %+v", names)
-
- expectedKeys := []string{"ports", "flows", "flow_groups", "image_downloads", "pm_configs"}
- for _, key := range expectedKeys {
- if _, exists := names[key]; !exists {
- t.Errorf("Missing key:%s from class type:%s", key, reflect.TypeOf(cls))
- }
- }
-}
-
-// Verify that the cache contains an entry for types on which ChildrenFields was performed
-func TestChildType_02_Cache_Keys(t *testing.T) {
- childTypes.mutex.RLock()
- defer childTypes.mutex.RUnlock()
-
- if _, exists := childTypes.cache[reflect.TypeOf(&voltha.Device{})]; !exists {
- t.Errorf("childTypes.cache should have an entry of type: %+v\n", reflect.TypeOf(&voltha.Device{}).String())
- }
- for k := range childTypes.cache {
- t.Logf("childTypes.cache Key:%+v\n", k)
- }
-}
diff --git a/db/model/data_revision.go b/db/model/data_revision.go
deleted file mode 100644
index 0861d76..0000000
--- a/db/model/data_revision.go
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "bytes"
- "crypto/md5"
- "encoding/json"
- "fmt"
- "reflect"
-
- "github.com/golang/protobuf/proto"
-)
-
-// DataRevision stores the data associated to a revision along with its calculated checksum hash value
-type DataRevision struct {
- Data interface{}
- Hash string
-}
-
-// NewDataRevision creates a new instance of a DataRevision structure
-func NewDataRevision(root *root, data interface{}) *DataRevision {
- dr := DataRevision{}
- dr.Data = data
- dr.Hash = dr.hashData(root, data)
-
- return &dr
-}
-
-func (dr *DataRevision) hashData(root *root, data interface{}) string {
- var buffer bytes.Buffer
-
- if IsProtoMessage(data) {
- if pbdata, err := proto.Marshal(data.(proto.Message)); err != nil {
- logger.Debugf("problem to marshal protobuf data --> err: %s", err.Error())
- } else {
- buffer.Write(pbdata)
- // To ensure uniqueness in case data is nil, also include data type
- buffer.Write([]byte(reflect.TypeOf(data).String()))
- }
-
- } else if reflect.ValueOf(data).IsValid() {
- dataObj := reflect.New(reflect.TypeOf(data).Elem())
- if json, err := json.Marshal(dataObj.Interface()); err != nil {
- logger.Debugf("problem to marshal data --> err: %s", err.Error())
- } else {
- buffer.Write(json)
- }
- } else {
- dataObj := reflect.New(reflect.TypeOf(data).Elem())
- buffer.Write(dataObj.Bytes())
- }
-
- // Add the root pointer that owns the current data for extra uniqueness
- rootPtr := fmt.Sprintf("%p", root)
- buffer.Write([]byte(rootPtr))
-
- return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
-}
diff --git a/db/model/data_revision_test.go b/db/model/data_revision_test.go
deleted file mode 100644
index cf19d3c..0000000
--- a/db/model/data_revision_test.go
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "reflect"
- "testing"
-
- "github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-protos/v3/go/common"
- "github.com/opencord/voltha-protos/v3/go/openflow_13"
- "github.com/opencord/voltha-protos/v3/go/voltha"
- "github.com/stretchr/testify/assert"
-)
-
-var (
- TestNodePort = []*voltha.Port{
- {
- PortNo: 123,
- Label: "test-etcd_port-0",
- Type: voltha.Port_PON_OLT,
- AdminState: common.AdminState_ENABLED,
- OperStatus: common.OperStatus_ACTIVE,
- DeviceId: "etcd_port-0-device-id",
- Peers: []*voltha.Port_PeerPort{},
- },
- }
-
- TestNodeData = &voltha.Device{
- Id: "Config-Node-1",
- Type: "simulated_olt",
- Root: true,
- ParentId: "",
- ParentPortNo: 0,
- Vendor: "voltha-test",
- Model: "Modelxx",
- HardwareVersion: "0.0.1",
- FirmwareVersion: "0.0.1",
- Images: &voltha.Images{},
- SerialNumber: "1234567890",
- VendorId: "XXBB-INC",
- Adapter: "simulated_olt",
- Vlan: 1234,
- Address: &voltha.Device_HostAndPort{HostAndPort: "127.0.0.1:1234"},
- ExtraArgs: "",
- ProxyAddress: &voltha.Device_ProxyAddress{},
- AdminState: voltha.AdminState_PREPROVISIONED,
- OperStatus: common.OperStatus_ACTIVE,
- Reason: "",
- ConnectStatus: common.ConnectStatus_REACHABLE,
- Custom: &any.Any{},
- Ports: TestNodePort,
- Flows: &openflow_13.Flows{},
- FlowGroups: &openflow_13.FlowGroups{},
- PmConfigs: &voltha.PmConfigs{},
- ImageDownloads: []*voltha.ImageDownload{},
- }
-)
-
-func TestNewDataRevision(t *testing.T) {
-
- TestNodeRoot := &root{RevisionClass: reflect.TypeOf(NonPersistedRevision{})}
- dr := NewDataRevision(TestNodeRoot, TestNodeData)
- t.Logf("Data -->%v, Hash-->%v, ", dr.Data, dr.Hash)
- assert.NotNil(t, dr.Data)
- assert.True(t, reflect.TypeOf(dr.Data) == reflect.TypeOf(TestNodeData), "Data Type mismatch on NonPersistedRevision")
- assert.True(t, reflect.ValueOf(dr.Data) == reflect.ValueOf(TestNodeData), "Data Values mismatch on NonPersistedRevision")
- assert.NotNil(t, dr.Hash)
-
- drPR := NewDataRevision(&root{RevisionClass: reflect.TypeOf(PersistedRevision{})}, TestNodeData)
- assert.NotNil(t, drPR)
- assert.True(t, reflect.TypeOf(drPR.Data) == reflect.TypeOf(TestNodeData), "Data Type mismatc on PersistedRevisionh")
- assert.True(t, reflect.ValueOf(drPR.Data) == reflect.ValueOf(TestNodeData), "Data Values mismatch PersistedRevision")
- assert.NotNil(t, drPR.Hash)
-}
-func TestNoDataRevision(t *testing.T) {
-
- TestNodeData = nil
- TestNodeRoot = &root{RevisionClass: reflect.TypeOf(NonPersistedRevision{})}
- rev := NewDataRevision(TestNodeRoot, TestNodeData)
- assert.Nil(t, rev.Data, "Problem to marshal data when data is nil")
-
-}
diff --git a/db/model/event_bus.go b/db/model/event_bus.go
deleted file mode 100644
index f3dfcb2..0000000
--- a/db/model/event_bus.go
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "encoding/json"
-
- "github.com/golang/protobuf/proto"
- "github.com/opencord/voltha-protos/v3/go/voltha"
-)
-
-// EventBus contains the details required to communicate with the event bus mechanism
-type EventBus struct {
- client *EventBusClient
- topic string
-}
-
-// ignoredCallbacks keeps a list of callbacks that should not be advertised on the event bus
-var (
- ignoredCallbacks = map[CallbackType]struct{}{
- PreAdd: {},
- Get: {},
- PostListchange: {},
- PreRemove: {},
- PreUpdate: {},
- }
-)
-
-// NewEventBus creates a new instance of the EventBus structure
-func NewEventBus() *EventBus {
- bus := &EventBus{
- client: NewEventBusClient(),
- topic: "model-change-events",
- }
- return bus
-}
-
-// Advertise will publish the provided information to the event bus
-func (bus *EventBus) Advertise(args ...interface{}) interface{} {
- eventType := args[0].(CallbackType)
- hash := args[1].(string)
- data := args[2:]
-
- if _, ok := ignoredCallbacks[eventType]; ok {
- logger.Debugf("ignoring event - type:%s, data:%+v", eventType, data)
- }
- var kind voltha.ConfigEventType_Types
- switch eventType {
- case PostAdd:
- kind = voltha.ConfigEventType_add
- case PostRemove:
- kind = voltha.ConfigEventType_remove
- default:
- kind = voltha.ConfigEventType_update
- }
-
- var msg []byte
- var err error
- if IsProtoMessage(data) {
- if msg, err = proto.Marshal(data[0].(proto.Message)); err != nil {
- logger.Debugf("problem marshalling proto data: %+v, err:%s", data[0], err.Error())
- }
- } else if data[0] != nil {
- if msg, err = json.Marshal(data[0]); err != nil {
- logger.Debugf("problem marshalling json data: %+v, err:%s", data[0], err.Error())
- }
- } else {
- logger.Debugf("no data to advertise : %+v", data[0])
- }
-
- event := voltha.ConfigEvent{
- Type: kind,
- Hash: hash,
- Data: string(msg),
- }
-
- bus.client.Publish(bus.topic, event)
-
- return nil
-}
diff --git a/db/model/event_bus_client.go b/db/model/event_bus_client.go
deleted file mode 100644
index 93a64f9..0000000
--- a/db/model/event_bus_client.go
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "github.com/opencord/voltha-protos/v3/go/voltha"
-)
-
-// EventBusClient is an abstraction layer structure to communicate with an event bus mechanism
-type EventBusClient struct {
-}
-
-// NewEventBusClient creates a new EventBusClient instance
-func NewEventBusClient() *EventBusClient {
- return &EventBusClient{}
-}
-
-// Publish sends a event to the bus
-func (ebc *EventBusClient) Publish(topic string, event voltha.ConfigEvent) {
- logger.Debugf("publishing event:%+v, topic:%s\n", event, topic)
-}
diff --git a/db/model/merge.go b/db/model/merge.go
deleted file mode 100644
index 01e942b..0000000
--- a/db/model/merge.go
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "context"
-
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
-)
-
-func revisionsAreEqual(a, b []Revision) bool {
- // If one is nil, the other must also be nil.
- if (a == nil) != (b == nil) {
- return false
- }
-
- if len(a) != len(b) {
- return false
- }
-
- for i := range a {
- if a[i] != b[i] {
- return false
- }
- }
-
- return true
-}
-
-type changeAnalysis struct {
- KeyMap1 map[string]int
- KeyMap2 map[string]int
- AddedKeys map[string]struct{}
- RemovedKeys map[string]struct{}
- ChangedKeys map[string]struct{}
-}
-
-func newChangeAnalysis(lst1, lst2 []Revision, keyName string) *changeAnalysis {
- changes := &changeAnalysis{}
-
- changes.KeyMap1 = make(map[string]int)
- changes.KeyMap2 = make(map[string]int)
-
- changes.AddedKeys = make(map[string]struct{})
- changes.RemovedKeys = make(map[string]struct{})
- changes.ChangedKeys = make(map[string]struct{})
-
- for i, rev := range lst1 {
- _, v := GetAttributeValue(rev.GetData(), keyName, 0)
- changes.KeyMap1[v.String()] = i
- }
- for i, rev := range lst2 {
- _, v := GetAttributeValue(rev.GetData(), keyName, 0)
- changes.KeyMap2[v.String()] = i
- }
- for v := range changes.KeyMap2 {
- if _, ok := changes.KeyMap1[v]; !ok {
- changes.AddedKeys[v] = struct{}{}
- }
- }
- for v := range changes.KeyMap1 {
- if _, ok := changes.KeyMap2[v]; !ok {
- changes.RemovedKeys[v] = struct{}{}
- }
- }
- for v := range changes.KeyMap1 {
- if _, ok := changes.KeyMap2[v]; ok && lst1[changes.KeyMap1[v]].GetHash() != lst2[changes.KeyMap2[v]].GetHash() {
- changes.ChangedKeys[v] = struct{}{}
- }
- }
-
- return changes
-}
-
-// Merge3Way takes care of combining the revision contents of the same data set
-func Merge3Way(
- ctx context.Context,
- forkRev, srcRev, dstRev Revision,
- mergeChildFunc func(Revision) Revision,
- dryRun bool) (rev Revision, changes []ChangeTuple) {
-
- logger.Debugw("3-way-merge-request", log.Fields{"dryRun": dryRun})
-
- var configChanged bool
- var revsToDiscard []Revision
-
- if dstRev.GetConfig() == forkRev.GetConfig() {
- configChanged = dstRev.GetConfig() != srcRev.GetConfig()
- } else {
- if dstRev.GetConfig().Hash != srcRev.GetConfig().Hash {
- logger.Error("config-collision")
- }
- configChanged = true
- }
-
- //newChildren := reflect.ValueOf(dstRev.GetAllChildren()).Interface().(map[string][]Revision)
- newChildren := make(map[string][]Revision)
- for entryName, childrenEntry := range dstRev.GetAllChildren() {
- //newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
- newChildren[entryName] = make([]Revision, len(childrenEntry))
- copy(newChildren[entryName], childrenEntry)
- }
-
- childrenFields := ChildrenFields(forkRev.GetData())
-
- for fieldName, field := range childrenFields {
- forkList := forkRev.GetChildren(fieldName)
- srcList := srcRev.GetChildren(fieldName)
- dstList := dstRev.GetChildren(fieldName)
-
- if revisionsAreEqual(dstList, srcList) {
- for _, rev := range srcList {
- mergeChildFunc(rev)
- }
- continue
- }
-
- if field.Key == "" {
- if revisionsAreEqual(dstList, forkList) {
- if !revisionsAreEqual(srcList, forkList) {
- logger.Error("we should not be here")
- } else {
- for _, rev := range srcList {
- newChildren[fieldName] = append(newChildren[fieldName], mergeChildFunc(rev))
- }
- if field.IsContainer {
- changes = append(
- changes, ChangeTuple{PostListchange,
- NewOperationContext("", nil, fieldName, ""), nil},
- )
- }
- }
- } else {
- if !revisionsAreEqual(srcList, forkList) {
- logger.Error("cannot merge - single child node or un-keyed children list has changed")
- }
- }
- } else {
- if revisionsAreEqual(dstList, forkList) {
- src := newChangeAnalysis(forkList, srcList, field.Key)
-
- newList := make([]Revision, len(srcList))
- copy(newList, srcList)
-
- for key := range src.AddedKeys {
- idx := src.KeyMap2[key]
- newRev := mergeChildFunc(newList[idx])
-
- // FIXME: newRev may come back as nil... exclude those entries for now
- if newRev != nil {
- newList[idx] = newRev
- changes = append(changes, ChangeTuple{PostAdd, newList[idx].GetData(), newRev.GetData()})
- }
- }
- for key := range src.RemovedKeys {
- oldRev := forkList[src.KeyMap1[key]]
- revsToDiscard = append(revsToDiscard, oldRev)
- changes = append(changes, ChangeTuple{PostRemove, oldRev.GetData(), nil})
- }
- for key := range src.ChangedKeys {
- idx := src.KeyMap2[key]
- newRev := mergeChildFunc(newList[idx])
-
- // FIXME: newRev may come back as nil... exclude those entries for now
- if newRev != nil {
- newList[idx] = newRev
- }
- }
-
- if !dryRun {
- newChildren[fieldName] = newList
- }
- } else {
- src := newChangeAnalysis(forkList, srcList, field.Key)
- dst := newChangeAnalysis(forkList, dstList, field.Key)
-
- newList := make([]Revision, len(dstList))
- copy(newList, dstList)
-
- for key := range src.AddedKeys {
- if _, exists := dst.AddedKeys[key]; exists {
- childDstRev := dstList[dst.KeyMap2[key]]
- childSrcRev := srcList[src.KeyMap2[key]]
- if childDstRev.GetHash() == childSrcRev.GetHash() {
- mergeChildFunc(childDstRev)
- } else {
- logger.Error("conflict error - revision has been added is different")
- }
- } else {
- newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
- newList = append(newList, newRev)
- changes = append(changes, ChangeTuple{PostAdd, srcList[src.KeyMap2[key]], newRev.GetData()})
- }
- }
- for key := range src.ChangedKeys {
- if _, removed := dst.RemovedKeys[key]; removed {
- logger.Error("conflict error - revision has been removed")
- } else if _, changed := dst.ChangedKeys[key]; changed {
- childDstRev := dstList[dst.KeyMap2[key]]
- childSrcRev := srcList[src.KeyMap2[key]]
- if childDstRev.GetHash() == childSrcRev.GetHash() {
- mergeChildFunc(childSrcRev)
- } else if childDstRev.GetConfig().Hash != childSrcRev.GetConfig().Hash {
- logger.Error("conflict error - revision has been changed and is different")
- } else {
- newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
- newList[dst.KeyMap2[key]] = newRev
- }
- } else {
- newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
- newList[dst.KeyMap2[key]] = newRev
- }
- }
-
- // TODO: how do i sort this map in reverse order?
- for key := range src.RemovedKeys {
- if _, changed := dst.ChangedKeys[key]; changed {
- logger.Error("conflict error - revision has changed")
- }
- if _, removed := dst.RemovedKeys[key]; !removed {
- dstIdx := dst.KeyMap2[key]
- oldRev := newList[dstIdx]
- revsToDiscard = append(revsToDiscard, oldRev)
-
- copy(newList[dstIdx:], newList[dstIdx+1:])
- newList[len(newList)-1] = nil
- newList = newList[:len(newList)-1]
-
- changes = append(changes, ChangeTuple{PostRemove, oldRev.GetData(), nil})
- }
- }
-
- if !dryRun {
- newChildren[fieldName] = newList
- }
- }
- }
- }
-
- if !dryRun && len(newChildren) > 0 {
- if configChanged {
- rev = srcRev
- } else {
- rev = dstRev
- }
-
- for _, discarded := range revsToDiscard {
- discarded.Drop("", true)
- }
-
- // FIXME: Do not discard the latest value for now
- //dstRev.GetBranch().GetLatest().Drop("", configChanged)
- rev = rev.UpdateAllChildren(ctx, newChildren, dstRev.GetBranch())
-
- if configChanged {
- changes = append(changes, ChangeTuple{PostUpdate, dstRev.GetBranch().GetLatest().GetData(), rev.GetData()})
- }
- return rev, changes
- }
-
- return nil, nil
-}
diff --git a/db/model/model.go b/db/model/model.go
deleted file mode 100644
index 0453122..0000000
--- a/db/model/model.go
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-type contextKey string
-
-const (
- // DataRefreshPeriod is period to determine when data requires a refresh (in milliseconds)
- // TODO: make this configurable?
- DataRefreshPeriod int64 = 5000
-
- // RequestTimestamp attribute used to store a timestamp in the context object
- RequestTimestamp contextKey = "request-timestamp"
-
- // ReservationTTL is time limit for a KV path reservation (in seconds)
- ReservationTTL int64 = 180
-)
diff --git a/db/model/node.go b/db/model/node.go
deleted file mode 100644
index 947cfc7..0000000
--- a/db/model/node.go
+++ /dev/null
@@ -1,1114 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-// TODO: proper error handling
-// TODO: proper logging
-
-import (
- "context"
- "fmt"
- "reflect"
- "strings"
- "sync"
- "time"
-
- "github.com/golang/protobuf/proto"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
-)
-
-// When a branch has no transaction id, everything gets stored in NONE
-const (
- NONE string = "none"
-)
-
-// Node interface is an abstraction of the node data structure
-type Node interface {
- MakeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
-
- // CRUD functions
- Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
- Get(ctx context.Context, path string, hash string, depth int, deep bool, txid string) (interface{}, error)
- List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) (interface{}, error)
- Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision
- Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision
- CreateProxy(ctx context.Context, path string, exclusive bool) (*Proxy, error)
-
- GetProxy() *Proxy
-
- MakeBranch(txid string) *Branch
- DeleteBranch(txid string)
- MergeBranch(ctx context.Context, txid string, dryRun bool) (Revision, error)
-
- MakeTxBranch() string
- DeleteTxBranch(txid string)
- FoldTxBranch(ctx context.Context, txid string)
-}
-
-type node struct {
- mutex sync.RWMutex
- Root *root
- Type interface{}
- Branches map[string]*Branch
- Tags map[string]Revision
- Proxy *Proxy
- EventBus *EventBus
- AutoPrune bool
-}
-
-// ChangeTuple holds details of modifications made to a revision
-type ChangeTuple struct {
- Type CallbackType
- PreviousData interface{}
- LatestData interface{}
-}
-
-// newNode creates a new instance of the node data structure
-func newNode(root *root, initialData interface{}, autoPrune bool, txid string) *node {
- n := &node{}
-
- n.Root = root
- n.Branches = make(map[string]*Branch)
- n.Tags = make(map[string]Revision)
- n.Proxy = nil
- n.EventBus = nil
- n.AutoPrune = autoPrune
-
- if IsProtoMessage(initialData) {
- n.Type = reflect.ValueOf(initialData).Interface()
- dataCopy := proto.Clone(initialData.(proto.Message))
- n.initialize(dataCopy, txid)
- } else if reflect.ValueOf(initialData).IsValid() {
- // FIXME: this block does not reflect the original implementation
- // it should be checking if the provided initial_data is already a type!??!
- // it should be checked before IsProtoMessage
- n.Type = reflect.ValueOf(initialData).Interface()
- } else {
- // not implemented error
- logger.Errorf("cannot process initial data - %+v", initialData)
- }
-
- return n
-}
-
-// MakeNode creates a new node in the tree
-func (n *node) MakeNode(data interface{}, txid string) *node {
- return newNode(n.Root, data, true, txid)
-}
-
-// MakeRevision create a new revision of the node in the tree
-func (n *node) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
- return n.GetRoot().MakeRevision(branch, data, children)
-}
-
-// makeLatest will mark the revision of a node as being the latest
-func (n *node) makeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
- // Keep a reference to the current revision
- var previous string
- if branch.GetLatest() != nil {
- previous = branch.GetLatest().GetHash()
- }
-
- branch.AddRevision(revision)
-
- // If anything is new, then set the revision as the latest
- if branch.GetLatest() == nil || revision.GetHash() != branch.GetLatest().GetHash() {
- if revision.GetName() != "" {
- logger.Debugw("saving-latest-data", log.Fields{"hash": revision.GetHash(), "data": revision.GetData()})
- // Tag a timestamp to that revision
- revision.SetLastUpdate()
- getRevCache().Set(revision.GetName(), revision)
- }
- branch.SetLatest(revision)
- }
-
- // Delete the previous revision if anything has changed
- if previous != "" && previous != branch.GetLatest().GetHash() {
- branch.DeleteRevision(previous)
- }
-
- if changeAnnouncement != nil && branch.Txid == "" {
- if n.Proxy != nil {
- for _, change := range changeAnnouncement {
- logger.Debugw("adding-callback",
- log.Fields{
- "callbacks": n.GetProxy().getCallbacks(change.Type),
- "type": change.Type,
- "previousData": change.PreviousData,
- "latestData": change.LatestData,
- })
- n.Root.AddCallback(
- n.GetProxy().InvokeCallbacks,
- change.Type,
- true,
- change.PreviousData,
- change.LatestData)
- }
- }
- }
-}
-
-// Latest returns the latest revision of node with or without the transaction id
-func (n *node) Latest(txid ...string) Revision {
- var branch *Branch
-
- if len(txid) > 0 && txid[0] != "" {
- if branch = n.GetBranch(txid[0]); branch != nil {
- return branch.GetLatest()
- }
- } else if branch = n.GetBranch(NONE); branch != nil {
- return branch.GetLatest()
- }
- return nil
-}
-
-// initialize prepares the content of a node along with its possible ramifications
-func (n *node) initialize(data interface{}, txid string) {
- children := make(map[string][]Revision)
- for fieldName, field := range ChildrenFields(n.Type) {
- _, fieldValue := GetAttributeValue(data, fieldName, 0)
-
- if fieldValue.IsValid() {
- if field.IsContainer {
- if field.Key != "" {
- for i := 0; i < fieldValue.Len(); i++ {
- v := fieldValue.Index(i)
-
- if rev := n.MakeNode(v.Interface(), txid).Latest(txid); rev != nil {
- children[fieldName] = append(children[fieldName], rev)
- }
-
- // TODO: The following logic was ported from v1.0. Need to verify if it is required
- //var keysSeen []string
- //_, key := GetAttributeValue(v.Interface(), field.Key, 0)
- //for _, k := range keysSeen {
- // if k == key.String() {
- // //logger.Errorf("duplicate key - %s", k)
- // }
- //}
- //keysSeen = append(keysSeen, key.String())
- }
-
- } else {
- for i := 0; i < fieldValue.Len(); i++ {
- v := fieldValue.Index(i)
- if newNodeRev := n.MakeNode(v.Interface(), txid).Latest(); newNodeRev != nil {
- children[fieldName] = append(children[fieldName], newNodeRev)
- }
- }
- }
- } else {
- if newNodeRev := n.MakeNode(fieldValue.Interface(), txid).Latest(); newNodeRev != nil {
- children[fieldName] = append(children[fieldName], newNodeRev)
- }
- }
- } else {
- logger.Errorf("field is invalid - %+v", fieldValue)
- }
- }
-
- branch := NewBranch(n, "", nil, n.AutoPrune)
- rev := n.MakeRevision(branch, data, children)
- n.makeLatest(branch, rev, nil)
-
- if txid == "" {
- n.SetBranch(NONE, branch)
- } else {
- n.SetBranch(txid, branch)
- }
-}
-
-// findRevByKey retrieves a specific revision from a node tree
-func (n *node) findRevByKey(revs []Revision, keyName string, value interface{}) (int, Revision) {
- for i, rev := range revs {
- dataValue := reflect.ValueOf(rev.GetData())
- dataStruct := GetAttributeStructure(rev.GetData(), keyName, 0)
-
- fieldValue := dataValue.Elem().FieldByName(dataStruct.Name)
-
- a := fmt.Sprintf("%s", fieldValue.Interface())
- b := fmt.Sprintf("%s", value)
- if a == b {
- return i, revs[i]
- }
- }
-
- return -1, nil
-}
-
-// Get retrieves the data from a node tree that resides at the specified path
-func (n *node) List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) (interface{}, error) {
- n.mutex.Lock()
- defer n.mutex.Unlock()
-
- logger.Debugw("node-list-request", log.Fields{"path": path, "hash": hash, "depth": depth, "deep": deep, "txid": txid})
-
- for strings.HasPrefix(path, "/") {
- path = path[1:]
- }
-
- var branch *Branch
- var rev Revision
-
- if branch = n.GetBranch(txid); txid == "" || branch == nil {
- branch = n.GetBranch(NONE)
- }
-
- if hash != "" {
- rev = branch.GetRevision(hash)
- } else {
- rev = branch.GetLatest()
- }
-
- var result interface{}
- var prList []interface{}
-
- pr, err := rev.LoadFromPersistence(ctx, path, txid, nil)
- if err != nil {
- logger.Errorf("failed-to-load-from-persistence")
- return nil, err
- }
- if pr != nil {
- for _, revEntry := range pr {
- prList = append(prList, revEntry.GetData())
- }
- result = prList
- }
- return result, nil
-}
-
-// Get retrieves the data from a node tree that resides at the specified path
-func (n *node) Get(ctx context.Context, path string, hash string, depth int, reconcile bool, txid string) (interface{}, error) {
- n.mutex.Lock()
- defer n.mutex.Unlock()
-
- logger.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
-
- for strings.HasPrefix(path, "/") {
- path = path[1:]
- }
-
- var branch *Branch
- var rev Revision
-
- if branch = n.GetBranch(txid); txid == "" || branch == nil {
- branch = n.GetBranch(NONE)
- }
-
- if hash != "" {
- rev = branch.GetRevision(hash)
- } else {
- rev = branch.GetLatest()
- }
-
- var result interface{}
-
- // If there is no request to reconcile, try to get it from memory
- if !reconcile {
- // Try to find an entry matching the path value from one of these sources
- // 1. Start with the cache which stores revisions by watch names
- // 2. Then look in the revision tree, especially if it's a sub-path such as /devices/1234/flows
- // 3. Move on to the KV store if that path cannot be found or if the entry has expired
- if entry, exists := getRevCache().Get(path); exists && entry.(Revision) != nil {
- entryAge := time.Since(entry.(Revision).GetLastUpdate()).Nanoseconds() / int64(time.Millisecond)
- if entryAge < DataRefreshPeriod {
- logger.Debugw("using-cache-entry", log.Fields{
- "path": path,
- "hash": hash,
- "age": entryAge,
- })
- return proto.Clone(entry.(Revision).GetData().(proto.Message)), nil
- }
- logger.Debugw("cache-entry-expired", log.Fields{"path": path, "hash": hash, "age": entryAge})
- } else if result = n.getPath(ctx, rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
- logger.Debugw("using-rev-tree-entry", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
- return result, nil
- } else {
- logger.Debugw("not-using-cache-entry", log.Fields{
- "path": path,
- "hash": hash, "depth": depth,
- "reconcile": reconcile,
- "txid": txid,
- })
- }
- } else {
- logger.Debugw("reconcile-requested", log.Fields{
- "path": path,
- "hash": hash,
- "reconcile": reconcile,
- })
- }
-
- // If we got to this point, we are either trying to reconcile with the db
- // or we simply failed at getting information from memory
- if n.Root.KvStore != nil {
- if pr, err := rev.LoadFromPersistence(ctx, path, txid, nil); err != nil {
- logger.Errorf("failed-to-load-from-persistence")
- return nil, err
- } else if len(pr) > 0 {
- // Did we receive a single or multiple revisions?
- if len(pr) > 1 {
- var revs []interface{}
- for _, revEntry := range pr {
- revs = append(revs, revEntry.GetData())
- }
- result = revs
- } else {
- result = pr[0].GetData()
- }
- }
- }
- return result, nil
-}
-
-//getPath traverses the specified path and retrieves the data associated to it
-func (n *node) getPath(ctx context.Context, rev Revision, path string, depth int) interface{} {
- if path == "" {
- return n.getData(ctx, rev, depth)
- }
-
- partition := strings.SplitN(path, "/", 2)
- name := partition[0]
-
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
-
- names := ChildrenFields(n.Type)
- field := names[name]
-
- if field != nil && field.IsContainer {
- children := make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
-
- if field.Key != "" {
- if path != "" {
- partition = strings.SplitN(path, "/", 2)
- key := partition[0]
- path = ""
- keyValue := field.KeyFromStr(key)
- _, childRev := n.findRevByKey(children, field.Key, keyValue)
- if childRev == nil {
- return nil
- }
- childNode := childRev.getNode()
- return childNode.getPath(ctx, childRev, path, depth)
- }
- var response []interface{}
- for _, childRev := range children {
- childNode := childRev.getNode()
- value := childNode.getData(ctx, childRev, depth)
- response = append(response, value)
- }
- return response
- }
- var response []interface{}
- if path != "" {
- // TODO: raise error
- return response
- }
- for _, childRev := range children {
- childNode := childRev.getNode()
- value := childNode.getData(ctx, childRev, depth)
- response = append(response, value)
- }
- return response
- } else if children := rev.GetChildren(name); children != nil {
- childRev := children[0]
- childNode := childRev.getNode()
- return childNode.getPath(ctx, childRev, path, depth)
- }
-
- return nil
-}
-
-// getData retrieves the data from a node revision
-func (n *node) getData(ctx context.Context, rev Revision, depth int) interface{} {
- msg := rev.GetBranch().GetLatest().Get(depth)
- var modifiedMsg interface{}
-
- if n.GetProxy() != nil {
- logger.Debugw("invoking-get-callbacks", log.Fields{"data": msg})
- if modifiedMsg = n.GetProxy().InvokeCallbacks(ctx, Get, false, msg); modifiedMsg != nil {
- msg = modifiedMsg
- }
-
- }
-
- return msg
-}
-
-// Update changes the content of a node at the specified path with the provided data
-func (n *node) Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
- n.mutex.Lock()
- defer n.mutex.Unlock()
-
- logger.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid": txid})
-
- for strings.HasPrefix(path, "/") {
- path = path[1:]
- }
-
- var branch *Branch
- if txid == "" {
- branch = n.GetBranch(NONE)
- } else if branch = n.GetBranch(txid); branch == nil {
- branch = makeBranch(n)
- }
-
- if branch.GetLatest() != nil {
- logger.Debugf("Branch data : %+v, Passed data: %+v", branch.GetLatest().GetData(), data)
- }
- if path == "" {
- return n.doUpdate(ctx, branch, data, strict)
- }
-
- rev := branch.GetLatest()
-
- partition := strings.SplitN(path, "/", 2)
- name := partition[0]
-
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
-
- field := ChildrenFields(n.Type)[name]
- var children []Revision
-
- if field == nil {
- return n.doUpdate(ctx, branch, data, strict)
- }
-
- if field.IsContainer {
- if path == "" {
- logger.Errorf("cannot update a list")
- } else if field.Key != "" {
- partition := strings.SplitN(path, "/", 2)
- key := partition[0]
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
- keyValue := field.KeyFromStr(key)
-
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
-
- idx, childRev := n.findRevByKey(children, field.Key, keyValue)
-
- if childRev == nil {
- logger.Debugw("child-revision-is-nil", log.Fields{"key": keyValue})
- return branch.GetLatest()
- }
-
- childNode := childRev.getNode()
-
- // Save proxy in child node to ensure callbacks are called later on
- // only assign in cases of non sub-folder proxies, i.e. "/"
- if childNode.Proxy == nil && n.Proxy != nil && n.GetProxy().getFullPath() == "" {
- childNode.Proxy = n.Proxy
- }
-
- newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
-
- if newChildRev.GetHash() == childRev.GetHash() {
- if newChildRev != childRev {
- logger.Debug("clear-hash - %s %+v", newChildRev.GetHash(), newChildRev)
- newChildRev.ClearHash()
- }
- logger.Debugw("child-revisions-have-matching-hash", log.Fields{"hash": childRev.GetHash(), "key": keyValue})
- return branch.GetLatest()
- }
-
- _, newKey := GetAttributeValue(newChildRev.GetData(), field.Key, 0)
-
- _newKeyType := newKey.String()
- _keyValueType := fmt.Sprintf("%s", keyValue)
-
- if _newKeyType != _keyValueType {
- logger.Errorf("cannot change key field")
- }
-
- // Prefix the hash value with the data type (e.g. devices, logical_devices, adapters)
- newChildRev.SetName(name + "/" + _keyValueType)
-
- branch.LatestLock.Lock()
- defer branch.LatestLock.Unlock()
-
- if idx >= 0 {
- children[idx] = newChildRev
- } else {
- children = append(children, newChildRev)
- }
-
- updatedRev := rev.UpdateChildren(ctx, name, children, branch)
-
- n.makeLatest(branch, updatedRev, nil)
- updatedRev.ChildDrop(name, childRev.GetHash())
-
- return newChildRev
-
- } else {
- logger.Errorf("cannot index into container with no keys")
- }
- } else {
- childRev := rev.GetChildren(name)[0]
- childNode := childRev.getNode()
- newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
-
- branch.LatestLock.Lock()
- defer branch.LatestLock.Unlock()
-
- updatedRev := rev.UpdateChildren(ctx, name, []Revision{newChildRev}, branch)
- n.makeLatest(branch, updatedRev, nil)
-
- updatedRev.ChildDrop(name, childRev.GetHash())
-
- return newChildRev
- }
-
- return nil
-}
-
-func (n *node) doUpdate(ctx context.Context, branch *Branch, data interface{}, strict bool) Revision {
- logger.Debugw("comparing-types", log.Fields{"expected": reflect.ValueOf(n.Type).Type(), "actual": reflect.TypeOf(data)})
-
- if reflect.TypeOf(data) != reflect.ValueOf(n.Type).Type() {
- // TODO raise error
- logger.Errorw("types-do-not-match: %+v", log.Fields{"actual": reflect.TypeOf(data), "expected": n.Type})
- return nil
- }
-
- if n.GetProxy() != nil {
- logger.Debug("invoking proxy PreUpdate Callbacks")
- n.GetProxy().InvokeCallbacks(ctx, PreUpdate, false, branch.GetLatest(), data)
- }
-
- if strict {
- // TODO: checkAccessViolations(data, Branch.GetLatest.data)
- logger.Warn("access-violations-not-supported")
- }
-
- // The way the model is used, this function is only invoked upon data change. Therefore, to also
- // avoid a deep proto.message comparison (expensive), just create a new branch regardless
- rev := branch.GetLatest().UpdateData(ctx, data, branch)
- changes := []ChangeTuple{{PostUpdate, branch.GetLatest().GetData(), rev.GetData()}}
- n.makeLatest(branch, rev, changes)
-
- return rev
-}
-
-// Add inserts a new node at the specified path with the provided data
-func (n *node) Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
- n.mutex.Lock()
- defer n.mutex.Unlock()
-
- logger.Debugw("node-add-request", log.Fields{"path": path, "txid": txid})
-
- for strings.HasPrefix(path, "/") {
- path = path[1:]
- }
- if path == "" {
- // TODO raise error
- logger.Errorf("cannot add for non-container mode")
- return nil
- }
-
- var branch *Branch
- if txid == "" {
- branch = n.GetBranch(NONE)
- } else if branch = n.GetBranch(txid); branch == nil {
- branch = makeBranch(n)
- }
-
- rev := branch.GetLatest()
-
- partition := strings.SplitN(path, "/", 2)
- name := partition[0]
-
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
-
- field := ChildrenFields(n.Type)[name]
-
- var children []Revision
-
- if field.IsContainer {
- if path == "" {
- if field.Key != "" {
- if n.GetProxy() != nil {
- logger.Debug("invoking proxy PreAdd Callbacks")
- n.GetProxy().InvokeCallbacks(ctx, PreAdd, false, data)
- }
-
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
-
- _, key := GetAttributeValue(data, field.Key, 0)
-
- if _, exists := n.findRevByKey(children, field.Key, key.String()); exists != nil {
- // TODO raise error
- logger.Warnw("duplicate-key-found", log.Fields{"key": key.String()})
- return exists
- }
- childRev := n.MakeNode(data, "").Latest()
-
- // Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
- childRev.SetName(name + "/" + key.String())
-
- branch.LatestLock.Lock()
- defer branch.LatestLock.Unlock()
-
- children = append(children, childRev)
-
- updatedRev := rev.UpdateChildren(ctx, name, children, branch)
- changes := []ChangeTuple{{PostAdd, nil, childRev.GetData()}}
-
- n.makeLatest(branch, updatedRev, changes)
-
- return childRev
- }
- logger.Errorf("cannot add to non-keyed container")
-
- } else if field.Key != "" {
- partition := strings.SplitN(path, "/", 2)
- key := partition[0]
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
- keyValue := field.KeyFromStr(key)
-
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
-
- idx, childRev := n.findRevByKey(children, field.Key, keyValue)
-
- if childRev == nil {
- return branch.GetLatest()
- }
-
- childNode := childRev.getNode()
- newChildRev := childNode.Add(ctx, path, data, txid, makeBranch)
-
- // Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
- newChildRev.SetName(name + "/" + keyValue.(string))
-
- branch.LatestLock.Lock()
- defer branch.LatestLock.Unlock()
-
- if idx >= 0 {
- children[idx] = newChildRev
- } else {
- children = append(children, newChildRev)
- }
-
- updatedRev := rev.UpdateChildren(ctx, name, children, branch)
- n.makeLatest(branch, updatedRev, nil)
-
- updatedRev.ChildDrop(name, childRev.GetHash())
-
- return newChildRev
- } else {
- logger.Errorf("cannot add to non-keyed container")
- }
- } else {
- logger.Errorf("cannot add to non-container field")
- }
-
- return nil
-}
-
-// Remove eliminates a node at the specified path
-func (n *node) Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision {
- n.mutex.Lock()
- defer n.mutex.Unlock()
-
- logger.Debugw("node-remove-request", log.Fields{"path": path, "txid": txid, "makeBranch": makeBranch})
-
- for strings.HasPrefix(path, "/") {
- path = path[1:]
- }
- if path == "" {
- // TODO raise error
- logger.Errorf("cannot remove for non-container mode")
- }
- var branch *Branch
- if txid == "" {
- branch = n.GetBranch(NONE)
- } else if branch = n.GetBranch(txid); branch == nil {
- branch = makeBranch(n)
- }
-
- rev := branch.GetLatest()
-
- partition := strings.SplitN(path, "/", 2)
- name := partition[0]
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
-
- field := ChildrenFields(n.Type)[name]
- var children []Revision
- postAnnouncement := []ChangeTuple{}
-
- if field.IsContainer {
- if path == "" {
- logger.Errorw("cannot-remove-without-key", log.Fields{"name": name, "key": path})
- } else if field.Key != "" {
- partition := strings.SplitN(path, "/", 2)
- key := partition[0]
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
-
- keyValue := field.KeyFromStr(key)
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
-
- if path != "" {
- if idx, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil {
- childNode := childRev.getNode()
- if childNode.Proxy == nil {
- childNode.Proxy = n.Proxy
- }
- newChildRev := childNode.Remove(ctx, path, txid, makeBranch)
-
- branch.LatestLock.Lock()
- defer branch.LatestLock.Unlock()
-
- if idx >= 0 {
- children[idx] = newChildRev
- } else {
- children = append(children, newChildRev)
- }
-
- rev.SetChildren(name, children)
- branch.GetLatest().Drop(txid, false)
- n.makeLatest(branch, rev, nil)
- }
- return branch.GetLatest()
- }
-
- idx, childRev := n.findRevByKey(children, field.Key, keyValue)
- if childRev != nil && idx >= 0 {
- if n.GetProxy() != nil {
- data := childRev.GetData()
- n.GetProxy().InvokeCallbacks(ctx, PreRemove, false, data)
- postAnnouncement = append(postAnnouncement, ChangeTuple{PostRemove, data, nil})
- } else {
- postAnnouncement = append(postAnnouncement, ChangeTuple{PostRemove, childRev.GetData(), nil})
- }
-
- childRev.StorageDrop(ctx, txid, true)
- getRevCache().Delete(childRev.GetName())
-
- branch.LatestLock.Lock()
- defer branch.LatestLock.Unlock()
-
- children = append(children[:idx], children[idx+1:]...)
- rev.SetChildren(name, children)
-
- branch.GetLatest().Drop(txid, false)
- n.makeLatest(branch, rev, postAnnouncement)
-
- return rev
- }
- logger.Errorw("failed-to-find-revision", log.Fields{"name": name, "key": keyValue.(string)})
- }
- logger.Errorw("cannot-add-to-non-keyed-container", log.Fields{"name": name, "path": path, "fieldKey": field.Key})
-
- } else {
- logger.Errorw("cannot-add-to-non-container-field", log.Fields{"name": name, "path": path})
- }
-
- return nil
-}
-
-// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-// MakeBranchFunction is a type for function references intented to create a branch
-type MakeBranchFunction func(*node) *Branch
-
-// MakeBranch creates a new branch for the provided transaction id
-func (n *node) MakeBranch(txid string) *Branch {
- branchPoint := n.GetBranch(NONE).GetLatest()
- branch := NewBranch(n, txid, branchPoint, true)
- n.SetBranch(txid, branch)
- return branch
-}
-
-// DeleteBranch removes a branch with the specified id
-func (n *node) DeleteBranch(txid string) {
- delete(n.Branches, txid)
-}
-
-func (n *node) mergeChild(ctx context.Context, txid string, dryRun bool) func(Revision) Revision {
- f := func(rev Revision) Revision {
- childBranch := rev.GetBranch()
-
- if childBranch.Txid == txid {
- rev, _ = childBranch.Node.MergeBranch(ctx, txid, dryRun)
- }
-
- return rev
- }
- return f
-}
-
-// MergeBranch will integrate the contents of a transaction branch within the latest branch of a given node
-func (n *node) MergeBranch(ctx context.Context, txid string, dryRun bool) (Revision, error) {
- srcBranch := n.GetBranch(txid)
- dstBranch := n.GetBranch(NONE)
-
- forkRev := srcBranch.Origin
- srcRev := srcBranch.GetLatest()
- dstRev := dstBranch.GetLatest()
-
- rev, changes := Merge3Way(ctx, forkRev, srcRev, dstRev, n.mergeChild(ctx, txid, dryRun), dryRun)
-
- if !dryRun {
- if rev != nil {
- rev.SetName(dstRev.GetName())
- n.makeLatest(dstBranch, rev, changes)
- }
- n.DeleteBranch(txid)
- }
-
- // TODO: return proper error when one occurs
- return rev, nil
-}
-
-// CreateProxy returns a reference to a sub-tree of the data model
-func (n *node) CreateProxy(ctx context.Context, path string, exclusive bool) (*Proxy, error) {
- return n.createProxy(ctx, path, path, n, exclusive)
-}
-
-func (n *node) createProxy(ctx context.Context, path string, fullPath string, parentNode *node, exclusive bool) (*Proxy, error) {
- logger.Debugw("node-create-proxy", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "path": path,
- "fullPath": fullPath,
- })
-
- for strings.HasPrefix(path, "/") {
- path = path[1:]
- }
- if path == "" {
- return n.makeProxy(path, fullPath, parentNode, exclusive), nil
- }
-
- rev := n.GetBranch(NONE).GetLatest()
- partition := strings.SplitN(path, "/", 2)
- name := partition[0]
- var nodeType interface{}
- if len(partition) < 2 {
- path = ""
- nodeType = n.Type
- } else {
- path = partition[1]
- nodeType = parentNode.Type
- }
-
- field := ChildrenFields(nodeType)[name]
-
- if field != nil {
- if field.IsContainer {
- logger.Debugw("container-field", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "path": path,
- "name": name,
- })
- if path == "" {
- logger.Debugw("folder-proxy", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "fullPath": fullPath,
- "name": name,
- })
- newNode := n.MakeNode(reflect.New(field.ClassType.Elem()).Interface(), "")
- return newNode.makeProxy(path, fullPath, parentNode, exclusive), nil
- } else if field.Key != "" {
- logger.Debugw("key-proxy", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "fullPath": fullPath,
- "name": name,
- })
- partition := strings.SplitN(path, "/", 2)
- key := partition[0]
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
- keyValue := field.KeyFromStr(key)
- children := make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
-
- var childRev Revision
- if _, childRev = n.findRevByKey(children, field.Key, keyValue); childRev != nil {
- logger.Debugw("found-revision-matching-key-in-memory", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "fullPath": fullPath,
- "name": name,
- })
- } else if revs, err := n.GetBranch(NONE).GetLatest().LoadFromPersistence(ctx, fullPath, "", nil); err != nil {
- logger.Errorf("failed-to-load-from-persistence")
- return nil, err
- } else if len(revs) > 0 {
- logger.Debugw("found-revision-matching-key-in-db", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "fullPath": fullPath,
- "name": name,
- })
- childRev = revs[0]
- } else {
- logger.Debugw("no-revision-matching-key", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "fullPath": fullPath,
- "name": name,
- })
- }
- if childRev != nil {
- childNode := childRev.getNode()
- return childNode.createProxy(ctx, path, fullPath, n, exclusive)
- }
- } else {
- logger.Errorw("cannot-access-index-of-empty-container", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "path": path,
- "name": name,
- })
- }
- } else {
- logger.Debugw("non-container-field", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "path": path,
- "name": name,
- })
- childRev := rev.GetChildren(name)[0]
- childNode := childRev.getNode()
- return childNode.createProxy(ctx, path, fullPath, n, exclusive)
- }
- } else {
- logger.Debugw("field-object-is-nil", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "fullPath": fullPath,
- "name": name,
- })
- }
-
- logger.Warnw("cannot-create-proxy", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "path": path,
- "fullPath": fullPath,
- "latest-rev": rev.GetHash(),
- })
- return nil, nil
-}
-
-func (n *node) makeProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
- logger.Debugw("node-make-proxy", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "path": path,
- "fullPath": fullPath,
- })
-
- r := &root{
- node: n,
- Callbacks: n.Root.GetCallbacks(),
- NotificationCallbacks: n.Root.GetNotificationCallbacks(),
- DirtyNodes: n.Root.DirtyNodes,
- KvStore: n.Root.KvStore,
- Loading: n.Root.Loading,
- RevisionClass: n.Root.RevisionClass,
- }
-
- if n.Proxy == nil {
- logger.Debugw("constructing-new-proxy", log.Fields{
- "node-type": reflect.ValueOf(n.Type).Type(),
- "parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
- "path": path,
- "fullPath": fullPath,
- })
- n.Proxy = NewProxy(r, n, parentNode, path, fullPath, exclusive)
- } else {
- logger.Debugw("node-has-existing-proxy", log.Fields{
- "node-type": reflect.ValueOf(n.GetProxy().Node.Type).Type(),
- "parent-node-type": reflect.ValueOf(n.GetProxy().ParentNode.Type).Type(),
- "path": n.GetProxy().Path,
- "fullPath": n.GetProxy().FullPath,
- })
- if n.GetProxy().Exclusive {
- logger.Error("node is already owned exclusively")
- }
- }
-
- return n.Proxy
-}
-
-func (n *node) SetProxy(proxy *Proxy) {
- n.Proxy = proxy
-}
-
-func (n *node) GetProxy() *Proxy {
- return n.Proxy
-}
-
-func (n *node) GetBranch(key string) *Branch {
- if n.Branches != nil {
- if branch, exists := n.Branches[key]; exists {
- return branch
- }
- }
- return nil
-}
-
-func (n *node) SetBranch(key string, branch *Branch) {
- n.Branches[key] = branch
-}
-
-func (n *node) GetRoot() *root {
- return n.Root
-}
-func (n *node) SetRoot(root *root) {
- n.Root = root
-}
diff --git a/db/model/node_test.go b/db/model/node_test.go
deleted file mode 100644
index aadf21a..0000000
--- a/db/model/node_test.go
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package model
-
-import (
- "crypto/md5"
- "fmt"
- "reflect"
- "testing"
-
- "github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-protos/v3/go/common"
- "github.com/opencord/voltha-protos/v3/go/openflow_13"
- "github.com/opencord/voltha-protos/v3/go/voltha"
-)
-
-var (
- TestNodeDevice = &voltha.Device{
- Id: "Config-SomeNode-01-new-test",
- Type: "simulated_olt",
- Root: true,
- ParentId: "",
- ParentPortNo: 0,
- Vendor: "voltha-test",
- Model: "GetLatest-voltha-simulated-olt",
- HardwareVersion: "1.0.0",
- FirmwareVersion: "1.0.0",
- Images: &voltha.Images{},
- SerialNumber: "abcdef-123456",
- VendorId: "DEADBEEF-INC",
- Adapter: "simulated_olt",
- Vlan: 1234,
- Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
- ExtraArgs: "",
- ProxyAddress: &voltha.Device_ProxyAddress{},
- AdminState: voltha.AdminState_PREPROVISIONED,
- OperStatus: common.OperStatus_ACTIVE,
- Reason: "",
- ConnectStatus: common.ConnectStatus_REACHABLE,
- Custom: &any.Any{},
- Ports: TestNodePort,
- Flows: &openflow_13.Flows{},
- FlowGroups: &openflow_13.FlowGroups{},
- PmConfigs: &voltha.PmConfigs{},
- ImageDownloads: []*voltha.ImageDownload{},
- }
-
- TestNodeTxid = fmt.Sprintf("%x", md5.Sum([]byte("node_transaction_id")))
- TestNodeRoot = &root{RevisionClass: reflect.TypeOf(NonPersistedRevision{})}
-)
-
-// Exercise node creation code
-// This test will
-func TestNode_01_NewNode(t *testing.T) {
- node := newNode(TestNodeRoot, TestNodeDevice, false, TestNodeTxid)
-
- if reflect.ValueOf(node.Type).Type() != reflect.TypeOf(TestNodeDevice) {
- t.Errorf("Node type does not match original data type: %+v", reflect.ValueOf(node.Type).Type())
- } else if node.GetBranch(TestNodeTxid) == nil || node.GetBranch(TestNodeTxid).Latest == nil {
- t.Errorf("No branch associated to txid: %s", TestNodeTxid)
- } else if node.GetBranch(TestNodeTxid).Latest == nil {
- t.Errorf("Branch has no latest revision : %s", TestNodeTxid)
- } else if node.GetBranch(TestNodeTxid).GetLatest().GetConfig() == nil {
- t.Errorf("Latest revision has no assigned data: %+v", node.GetBranch(TestNodeTxid).GetLatest())
- }
-
- t.Logf("Created new node successfully : %+v\n", node)
-}
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
deleted file mode 100644
index 3bc888d..0000000
--- a/db/model/non_persisted_revision.go
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "bytes"
- "context"
- "crypto/md5"
- "fmt"
- "reflect"
- "sort"
- "strings"
- "sync"
- "time"
-
- "github.com/golang/protobuf/proto"
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
-)
-
-// TODO: Cache logic will have to be revisited to cleanup unused entries in memory (disabled for now)
-//
-type revCacheSingleton struct {
- sync.RWMutex
- Cache sync.Map
-}
-
-func (s *revCacheSingleton) Get(path string) (interface{}, bool) {
- return s.Cache.Load(path)
-}
-func (s *revCacheSingleton) Set(path string, value interface{}) {
- s.Cache.Store(path, value)
-}
-func (s *revCacheSingleton) Delete(path string) {
- s.Cache.Delete(path)
-}
-
-var revCacheInstance *revCacheSingleton
-var revCacheOnce sync.Once
-
-func getRevCache() *revCacheSingleton {
- revCacheOnce.Do(func() {
- revCacheInstance = &revCacheSingleton{Cache: sync.Map{}}
- })
- return revCacheInstance
-}
-
-// NonPersistedRevision -
-type NonPersistedRevision struct {
- mutex sync.RWMutex
- Root *root
- Config *DataRevision
- childrenLock sync.RWMutex
- Children map[string][]Revision
- Hash string
- Branch *Branch
- WeakRef string
- Name string
- lastUpdate time.Time
-}
-
-// NewNonPersistedRevision -
-func NewNonPersistedRevision(root *root, branch *Branch, data interface{}, children map[string][]Revision) Revision {
- r := &NonPersistedRevision{}
- r.Root = root
- r.Branch = branch
- r.Config = NewDataRevision(root, data)
- r.Children = children
- r.Hash = r.hashContent()
- return r
-}
-
-// SetConfig -
-func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- npr.Config = config
-}
-
-// GetConfig -
-func (npr *NonPersistedRevision) GetConfig() *DataRevision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- return npr.Config
-}
-
-// SetAllChildren -
-func (npr *NonPersistedRevision) SetAllChildren(children map[string][]Revision) {
- npr.childrenLock.Lock()
- defer npr.childrenLock.Unlock()
- npr.Children = make(map[string][]Revision)
-
- for key, value := range children {
- npr.Children[key] = make([]Revision, len(value))
- copy(npr.Children[key], value)
- }
-}
-
-// SetChildren -
-func (npr *NonPersistedRevision) SetChildren(name string, children []Revision) {
- npr.childrenLock.Lock()
- defer npr.childrenLock.Unlock()
-
- npr.Children[name] = make([]Revision, len(children))
- copy(npr.Children[name], children)
-}
-
-// GetAllChildren -
-func (npr *NonPersistedRevision) GetAllChildren() map[string][]Revision {
- npr.childrenLock.Lock()
- defer npr.childrenLock.Unlock()
-
- return npr.Children
-}
-
-// GetChildren -
-func (npr *NonPersistedRevision) GetChildren(name string) []Revision {
- npr.childrenLock.Lock()
- defer npr.childrenLock.Unlock()
-
- if _, exists := npr.Children[name]; exists {
- return npr.Children[name]
- }
- return nil
-}
-
-// SetHash -
-func (npr *NonPersistedRevision) SetHash(hash string) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- npr.Hash = hash
-}
-
-// GetHash -
-func (npr *NonPersistedRevision) GetHash() string {
- //npr.mutex.Lock()
- //defer npr.mutex.Unlock()
- return npr.Hash
-}
-
-// ClearHash -
-func (npr *NonPersistedRevision) ClearHash() {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- npr.Hash = ""
-}
-
-// GetName -
-func (npr *NonPersistedRevision) GetName() string {
- //npr.mutex.Lock()
- //defer npr.mutex.Unlock()
- return npr.Name
-}
-
-// SetName -
-func (npr *NonPersistedRevision) SetName(name string) {
- //npr.mutex.Lock()
- //defer npr.mutex.Unlock()
- npr.Name = name
-}
-
-// SetBranch -
-func (npr *NonPersistedRevision) SetBranch(branch *Branch) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- npr.Branch = branch
-}
-
-// GetBranch -
-func (npr *NonPersistedRevision) GetBranch() *Branch {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- return npr.Branch
-}
-
-// GetData -
-func (npr *NonPersistedRevision) GetData() interface{} {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- if npr.Config == nil {
- return nil
- }
- return npr.Config.Data
-}
-
-func (npr *NonPersistedRevision) getNode() *node {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- return npr.Branch.Node
-}
-
-// Finalize -
-func (npr *NonPersistedRevision) Finalize(ctx context.Context, skipOnExist bool) {
- npr.Hash = npr.hashContent()
-}
-
-// hashContent generates a hash string based on the contents of the revision.
-// The string should be unique to avoid conflicts with other revisions
-func (npr *NonPersistedRevision) hashContent() string {
- var buffer bytes.Buffer
- var childrenKeys []string
-
- if npr.Config != nil {
- buffer.WriteString(npr.Config.Hash)
- }
-
- if npr.Name != "" {
- buffer.WriteString(npr.Name)
- }
-
- for key := range npr.Children {
- childrenKeys = append(childrenKeys, key)
- }
-
- sort.Strings(childrenKeys)
-
- if len(npr.Children) > 0 {
- // Loop through sorted Children keys
- for _, key := range childrenKeys {
- for _, child := range npr.Children[key] {
- if child != nil && child.GetHash() != "" {
- buffer.WriteString(child.GetHash())
- }
- }
- }
- }
-
- return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
-}
-
-// Get will retrieve the data for the current revision
-func (npr *NonPersistedRevision) Get(depth int) interface{} {
- // 1. Clone the data to avoid any concurrent access issues
- // 2. The current rev might still be pointing to an old config
- // thus, force the revision to get its latest value
- latestRev := npr.GetBranch().GetLatest()
- originalData := proto.Clone(latestRev.GetData().(proto.Message))
- data := originalData
-
- if depth != 0 {
- // FIXME: Traversing the struct through reflection sometimes corrupts the data.
- // Unlike the original python implementation, golang structs are not lazy loaded.
- // Keeping this non-critical logic for now, but Get operations should be forced to
- // depth=0 to avoid going through the following loop.
- for fieldName, field := range ChildrenFields(latestRev.GetData()) {
- childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
- if field.IsContainer {
- for _, rev := range latestRev.GetChildren(fieldName) {
- childData := rev.Get(depth - 1)
- foundEntry := false
- for i := 0; i < childDataHolder.Len(); i++ {
- cdhIf := childDataHolder.Index(i).Interface()
- if cdhIf.(proto.Message).String() == childData.(proto.Message).String() {
- foundEntry = true
- break
- }
- }
- if !foundEntry {
- // avoid duplicates by adding it only if the child was not found in the holder
- childDataHolder = reflect.Append(childDataHolder, reflect.ValueOf(childData))
- }
- }
- } else {
- if revs := npr.GetBranch().GetLatest().GetChildren(fieldName); len(revs) > 0 {
- rev := revs[0]
- if rev != nil {
- childData := rev.Get(depth - 1)
- if reflect.TypeOf(childData) == reflect.TypeOf(childDataHolder.Interface()) {
- childDataHolder = reflect.ValueOf(childData)
- }
- }
- }
- }
- // Merge child data with cloned object
- reflect.ValueOf(data).Elem().FieldByName(childDataName).Set(childDataHolder)
- }
- }
-
- result := data
-
- if result != nil {
- // We need to send back a copy of the retrieved object
- result = proto.Clone(data.(proto.Message))
- }
-
- return result
-}
-
-// UpdateData will refresh the data content of the revision
-func (npr *NonPersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
-
- logger.Debugw("update-data", log.Fields{"hash": npr.GetHash(), "current": npr.Config.Data, "provided": data})
-
- // Construct a new revision based on the current one
- newRev := NonPersistedRevision{}
- newRev.Config = NewDataRevision(npr.Root, data)
- newRev.Hash = npr.Hash
- newRev.Root = npr.Root
- newRev.Name = npr.Name
- newRev.Branch = branch
- newRev.lastUpdate = npr.lastUpdate
-
- newRev.Children = make(map[string][]Revision)
- for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
- newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
- }
-
- newRev.Finalize(ctx, false)
-
- logger.Debugw("update-data-complete", log.Fields{"updated": newRev.Config.Data, "provided": data})
-
- return &newRev
-}
-
-// UpdateChildren will refresh the list of children with the provided ones
-// It will carefully go through the list and ensure that no child is lost
-func (npr *NonPersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
-
- // Construct a new revision based on the current one
- updatedRev := &NonPersistedRevision{}
- updatedRev.Config = NewDataRevision(npr.Root, npr.Config.Data)
- updatedRev.Hash = npr.Hash
- updatedRev.Branch = branch
- updatedRev.Name = npr.Name
- updatedRev.lastUpdate = npr.lastUpdate
-
- updatedRev.Children = make(map[string][]Revision)
- for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
- updatedRev.Children[entryName] = append(updatedRev.Children[entryName], childrenEntry...)
- }
-
- var updatedChildren []Revision
-
- // Verify if the map contains already contains an entry matching the name value
- // If so, we need to retain the contents of that entry and merge them with the provided children revision list
- if existingChildren := branch.GetLatest().GetChildren(name); existingChildren != nil {
- // Construct a map of unique child names with the respective index value
- // for the children in the existing revision as well as the new ones
- existingNames := make(map[string]int)
- newNames := make(map[string]int)
-
- for i, newChild := range children {
- newNames[newChild.GetName()] = i
- }
-
- for i, existingChild := range existingChildren {
- existingNames[existingChild.GetName()] = i
-
- // If an existing entry is not in the new list, add it to the updated list, so it is not forgotten
- if _, exists := newNames[existingChild.GetName()]; !exists {
- updatedChildren = append(updatedChildren, existingChild)
- }
- }
-
- logger.Debugw("existing-children-names", log.Fields{"hash": npr.GetHash(), "names": existingNames})
-
- // Merge existing and new children
- for _, newChild := range children {
- nameIndex, nameExists := existingNames[newChild.GetName()]
-
- // Does the existing list contain a child with that name?
- if nameExists {
- // This function is invoked only when the data has actually changed (current Core usage). Therefore,
- // we need to avoid an expensive deep proto.message comparison and treat the data as an update
- newChild.getNode().SetRoot(existingChildren[nameIndex].getNode().GetRoot())
- updatedChildren = append(updatedChildren, newChild)
- } else {
- logger.Debugw("adding-unknown-child", log.Fields{
- "hash": newChild.GetHash(),
- "data": newChild.GetData(),
- })
-
- // new entry ... just add it
- updatedChildren = append(updatedChildren, newChild)
- }
- }
-
- // Save children in new revision
- updatedRev.SetChildren(name, updatedChildren)
-
- updatedNames := make(map[string]int)
- for i, updatedChild := range updatedChildren {
- updatedNames[updatedChild.GetName()] = i
- }
-
- logger.Debugw("updated-children-names", log.Fields{"hash": npr.GetHash(), "names": updatedNames})
-
- } else {
- // There are no children available, just save the provided ones
- updatedRev.SetChildren(name, children)
- }
-
- updatedRev.Finalize(ctx, false)
-
- return updatedRev
-}
-
-// UpdateAllChildren will replace the current list of children with the provided ones
-func (npr *NonPersistedRevision) UpdateAllChildren(ctx context.Context, children map[string][]Revision, branch *Branch) Revision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
-
- newRev := npr
- newRev.Config = npr.Config
- newRev.Hash = npr.Hash
- newRev.Branch = branch
- newRev.Name = npr.Name
- newRev.lastUpdate = npr.lastUpdate
-
- newRev.Children = make(map[string][]Revision)
- for entryName, childrenEntry := range children {
- newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
- }
- newRev.Finalize(ctx, false)
-
- return newRev
-}
-
-// Drop is used to indicate when a revision is no longer required
-func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
- logger.Debugw("dropping-revision", log.Fields{"hash": npr.GetHash(), "name": npr.GetName()})
-}
-
-// ChildDrop will remove a child entry matching the provided parameters from the current revision
-func (npr *NonPersistedRevision) ChildDrop(childType string, childHash string) {
- if childType != "" {
- children := make([]Revision, len(npr.GetChildren(childType)))
- copy(children, npr.GetChildren(childType))
- for i, child := range children {
- if child.GetHash() == childHash {
- children = append(children[:i], children[i+1:]...)
- npr.SetChildren(childType, children)
- break
- }
- }
- }
-}
-
-// ChildDropByName will remove a child entry matching the type and name
-func (npr *NonPersistedRevision) ChildDropByName(childName string) {
- // Extract device type
- parts := strings.SplitN(childName, "/", 2)
- childType := parts[0]
-
- if childType != "" {
- children := make([]Revision, len(npr.GetChildren(childType)))
- copy(children, npr.GetChildren(childType))
- for i, child := range children {
- if child.GetName() == childName {
- children = append(children[:i], children[i+1:]...)
- npr.SetChildren(childType, children)
- break
- }
- }
- }
-}
-
-// SetLastUpdate -
-func (npr *NonPersistedRevision) SetLastUpdate(ts ...time.Time) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
-
- if len(ts) > 0 {
- npr.lastUpdate = ts[0]
- } else {
- npr.lastUpdate = time.Now()
- }
-}
-
-// GetLastUpdate -
-func (npr *NonPersistedRevision) GetLastUpdate() time.Time {
- npr.mutex.RLock()
- defer npr.mutex.RUnlock()
-
- return npr.lastUpdate
-}
-
-func (npr *NonPersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error) {
- // stub... required by interface
- return nil, nil
-}
-
-// SetupWatch -
-func (npr *NonPersistedRevision) SetupWatch(ctx context.Context, key string) {
- // stub ... required by interface
-}
-
-// StorageDrop -
-func (npr *NonPersistedRevision) StorageDrop(ctx context.Context, txid string, includeConfig bool) {
- // stub ... required by interface
-}
-
-func (npr *NonPersistedRevision) getVersion() int64 {
- return -1
-}
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
deleted file mode 100644
index 822b8b2..0000000
--- a/db/model/persisted_revision.go
+++ /dev/null
@@ -1,459 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "bytes"
- "compress/gzip"
- "context"
- "reflect"
- "strings"
- "sync"
-
- "github.com/golang/protobuf/proto"
- "github.com/opencord/voltha-lib-go/v3/pkg/db"
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
-)
-
-// PersistedRevision holds information of revision meant to be saved in a persistent storage
-type PersistedRevision struct {
- Revision
- Compress bool
-
- events chan *kvstore.Event
- kvStore *db.Backend
- mutex sync.RWMutex
- versionMutex sync.RWMutex
- Version int64
- isStored bool
-}
-
-// NewPersistedRevision creates a new instance of a PersistentRevision structure
-func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
- pr := &PersistedRevision{}
- pr.kvStore = branch.Node.GetRoot().KvStore
- pr.Version = 1
- pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
- return pr
-}
-
-func (pr *PersistedRevision) getVersion() int64 {
- pr.versionMutex.RLock()
- defer pr.versionMutex.RUnlock()
- return pr.Version
-}
-
-func (pr *PersistedRevision) setVersion(version int64) {
- pr.versionMutex.Lock()
- defer pr.versionMutex.Unlock()
- pr.Version = version
-}
-
-// Finalize is responsible of saving the revision in the persistent storage
-func (pr *PersistedRevision) Finalize(ctx context.Context, skipOnExist bool) {
- pr.store(ctx, skipOnExist)
-}
-
-func (pr *PersistedRevision) store(ctx context.Context, skipOnExist bool) {
- if pr.GetBranch().Txid != "" {
- return
- }
-
- logger.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
-
- // clone the revision data to avoid any race conditions with processes
- // accessing the same data
- cloned := proto.Clone(pr.GetConfig().Data.(proto.Message))
-
- if blob, err := proto.Marshal(cloned); err != nil {
- logger.Errorw("problem-to-marshal", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
- } else {
- if pr.Compress {
- var b bytes.Buffer
- w := gzip.NewWriter(&b)
- if _, err := w.Write(blob); err != nil {
- logger.Errorw("Unable to write a compressed form of p to the underlying io.Writer.", log.Fields{"error": err})
- }
- w.Close()
- blob = b.Bytes()
- }
-
- getRevCache().Set(pr.GetName(), pr)
- if err := pr.kvStore.Put(ctx, pr.GetName(), blob); err != nil {
- logger.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
- } else {
- logger.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
- pr.isStored = true
- }
- }
-}
-
-// UpdateData modifies the information in the data model and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
- logger.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
-
- newNPR := pr.Revision.UpdateData(ctx, data, branch)
-
- newPR := &PersistedRevision{
- Revision: newNPR,
- Compress: pr.Compress,
- kvStore: pr.kvStore,
- events: pr.events,
- Version: pr.getVersion(),
- }
-
- if newPR.GetHash() != pr.GetHash() {
- newPR.isStored = false
- pr.Drop(branch.Txid, false)
- pr.Drop(branch.Txid, false)
- } else {
- newPR.isStored = true
- }
-
- return newPR
-}
-
-// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
- logger.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
-
- newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
-
- newPR := &PersistedRevision{
- Revision: newNPR,
- Compress: pr.Compress,
- kvStore: pr.kvStore,
- events: pr.events,
- Version: pr.getVersion(),
- }
-
- if newPR.GetHash() != pr.GetHash() {
- newPR.isStored = false
- pr.Drop(branch.Txid, false)
- } else {
- newPR.isStored = true
- }
-
- return newPR
-}
-
-// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
-func (pr *PersistedRevision) UpdateAllChildren(ctx context.Context, children map[string][]Revision, branch *Branch) Revision {
- logger.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
-
- newNPR := pr.Revision.UpdateAllChildren(ctx, children, branch)
-
- newPR := &PersistedRevision{
- Revision: newNPR,
- Compress: pr.Compress,
- kvStore: pr.kvStore,
- events: pr.events,
- Version: pr.getVersion(),
- }
-
- if newPR.GetHash() != pr.GetHash() {
- newPR.isStored = false
- pr.Drop(branch.Txid, false)
- } else {
- newPR.isStored = true
- }
-
- return newPR
-}
-
-// Drop takes care of eliminating a revision hash that is no longer needed
-// and its associated config when required
-func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
- pr.Revision.Drop(txid, includeConfig)
-}
-
-// StorageDrop takes care of eliminating a revision hash that is no longer needed
-// and its associated config when required
-func (pr *PersistedRevision) StorageDrop(ctx context.Context, txid string, includeConfig bool) {
- logger.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "key": pr.GetName(), "isStored": pr.isStored})
-
- pr.mutex.Lock()
- defer pr.mutex.Unlock()
- if pr.kvStore != nil && txid == "" {
- if err := pr.kvStore.Delete(ctx, pr.GetName()); err != nil {
- logger.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
- } else {
- pr.isStored = false
- }
- } else {
- if includeConfig {
- logger.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
- }
- logger.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
- }
-
- pr.Revision.Drop(txid, includeConfig)
-}
-
-// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
-func (pr *PersistedRevision) verifyPersistedEntry(ctx context.Context, data interface{}, typeName string, keyName string,
- keyValue string, txid string, version int64) (response Revision) {
- // Parent which holds the current node entry
- parent := pr.GetBranch().Node.GetRoot()
-
- // Get a copy of the parent's children
- children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
- copy(children, parent.GetBranch(NONE).Latest.GetChildren(typeName))
-
- // Verify if a child with the provided key value can be found
- if childIdx, childRev := pr.getNode().findRevByKey(children, keyName, keyValue); childRev != nil {
- // A child matching the provided key exists in memory
- // Verify if the data differs from what was retrieved from persistence
- // Also check if we are treating a newer revision of the data or not
- if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() && childRev.getVersion() < version {
- logger.Debugw("revision-data-is-different", log.Fields{
- "key": childRev.GetHash(),
- "name": childRev.GetName(),
- "data": childRev.GetData(),
- "in-memory-version": childRev.getVersion(),
- "persisted-version": version,
- })
-
- //
- // Data has changed; replace the child entry and update the parent revision
- //
-
- // BEGIN Lock child -- prevent any incoming changes
- childRev.GetBranch().LatestLock.Lock()
-
- // Update child
- updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
-
- updatedChildRev.getNode().SetProxy(childRev.getNode().GetProxy())
- updatedChildRev.SetLastUpdate()
- updatedChildRev.(*PersistedRevision).setVersion(version)
-
- // Update cache
- getRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
- childRev.Drop(txid, false)
-
- childRev.GetBranch().LatestLock.Unlock()
- // END lock child
-
- // Update child entry
- children[childIdx] = updatedChildRev
-
- // BEGIN lock parent -- Update parent
- parent.GetBranch(NONE).LatestLock.Lock()
-
- updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
- parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
-
- parent.GetBranch(NONE).LatestLock.Unlock()
- // END lock parent
-
- // Drop the previous child revision
- parent.GetBranch(NONE).Latest.ChildDrop(typeName, childRev.GetHash())
-
- if updatedChildRev != nil {
- logger.Debugw("verify-persisted-entry--adding-child", log.Fields{
- "key": updatedChildRev.GetHash(),
- "name": updatedChildRev.GetName(),
- "data": updatedChildRev.GetData(),
- })
- response = updatedChildRev
- }
- } else {
- logger.Debugw("keeping-revision-data", log.Fields{
- "key": childRev.GetHash(),
- "name": childRev.GetName(),
- "data": childRev.GetData(),
- "in-memory-version": childRev.getVersion(),
- "persistence-version": version,
- })
-
- // Update timestamp to reflect when it was last read and to reset tracked timeout
- childRev.SetLastUpdate()
- if childRev.getVersion() < version {
- childRev.(*PersistedRevision).setVersion(version)
- }
- getRevCache().Set(childRev.GetName(), childRev)
- response = childRev
- }
-
- } else {
- // There is no available child with that key value.
- // Create a new child and update the parent revision.
- logger.Debugw("no-such-revision-entry", log.Fields{
- "key": keyValue,
- "name": typeName,
- "data": data,
- "version": version,
- })
-
- // BEGIN child lock
- pr.GetBranch().LatestLock.Lock()
-
- // Construct a new child node with the retrieved persistence data
- childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
-
- // We need to start watching this entry for future changes
- childRev.SetName(typeName + "/" + keyValue)
- childRev.(*PersistedRevision).setVersion(version)
-
- // Add entry to cache
- getRevCache().Set(childRev.GetName(), childRev)
-
- pr.GetBranch().LatestLock.Unlock()
- // END child lock
-
- //
- // Add the child to the parent revision
- //
-
- // BEGIN parent lock
- parent.GetBranch(NONE).LatestLock.Lock()
- children = append(children, childRev)
- updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
- updatedRev.getNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
- parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
- parent.GetBranch(NONE).LatestLock.Unlock()
- // END parent lock
-
- // Child entry is valid and can be included in the response object
- if childRev != nil {
- logger.Debugw("adding-revision-to-response", log.Fields{
- "key": childRev.GetHash(),
- "name": childRev.GetName(),
- "data": childRev.GetData(),
- })
- response = childRev
- }
- }
-
- return response
-}
-
-// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
-// by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error) {
- pr.mutex.Lock()
- defer pr.mutex.Unlock()
-
- logger.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
-
- var response []Revision
- var err error
-
- for strings.HasPrefix(path, "/") {
- path = path[1:]
- }
-
- if pr.kvStore != nil && path != "" {
- if len(blobs) == 0 {
- logger.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
-
- if blobs, err = pr.kvStore.List(ctx, path); err != nil {
- logger.Errorw("failed-to-retrieve-data-from-kvstore", log.Fields{"error": err})
- return nil, err
- }
- }
-
- partition := strings.SplitN(path, "/", 2)
- name := partition[0]
-
- var nodeType interface{}
- if len(partition) < 2 {
- path = ""
- nodeType = pr.GetBranch().Node.Type
- } else {
- path = partition[1]
- nodeType = pr.GetBranch().Node.GetRoot().Type
- }
-
- field := ChildrenFields(nodeType)[name]
-
- if field != nil && field.IsContainer {
- logger.Debugw("parsing-data-blobs", log.Fields{
- "path": path,
- "name": name,
- "size": len(blobs),
- })
-
- for _, blob := range blobs {
- output := blob.Value.([]byte)
-
- data := reflect.New(field.ClassType.Elem())
-
- if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
- logger.Errorw("failed-to-unmarshal", log.Fields{
- "path": path,
- "txid": txid,
- "error": err,
- })
- } else if path == "" {
- if field.Key != "" {
- logger.Debugw("no-path-with-container-key", log.Fields{
- "path": path,
- "txid": txid,
- "data": data.Interface(),
- })
-
- // Retrieve the key identifier value from the data structure
- // based on the field's key attribute
- _, key := GetAttributeValue(data.Interface(), field.Key, 0)
-
- if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, key.String(), txid, blob.Version); entry != nil {
- response = append(response, entry)
- }
- } else {
- logger.Debugw("path-with-no-container-key", log.Fields{
- "path": path,
- "txid": txid,
- "data": data.Interface(),
- })
- }
-
- } else if field.Key != "" {
- logger.Debugw("path-with-container-key", log.Fields{
- "path": path,
- "txid": txid,
- "data": data.Interface(),
- })
- // The request is for a specific entry/id
- partition := strings.SplitN(path, "/", 2)
- key := partition[0]
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
- keyValue := field.KeyFromStr(key)
-
- if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, keyValue.(string), txid, blob.Version); entry != nil {
- response = append(response, entry)
- }
- }
- }
-
- logger.Debugw("no-more-data-blobs", log.Fields{"path": path, "name": name})
- } else {
- logger.Debugw("cannot-process-field", log.Fields{
- "type": pr.GetBranch().Node.Type,
- "name": name,
- })
- }
- }
-
- return response, nil
-}
diff --git a/db/model/profiling.go b/db/model/profiling.go
deleted file mode 100644
index c82afd7..0000000
--- a/db/model/profiling.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "sync"
-)
-
-// Profiling is used to store performance details collected at runtime
-type Profiling struct {
- sync.RWMutex
- DatabaseRetrieveTime float64
- DatabaseRetrieveCount int
- InMemoryModelTime float64
- InMemoryModelCount int
- InMemoryProcessTime float64
- DatabaseStoreTime float64
- InMemoryLockTime float64
- InMemoryLockCount int
-}
-
-var profilingInstance *Profiling
-var profilingOnce sync.Once
-
-// GetProfiling returns a singleton instance of the Profiling structure
-func GetProfiling() *Profiling {
- profilingOnce.Do(func() {
- profilingInstance = &Profiling{}
- })
- return profilingInstance
-}
-
-// AddToDatabaseRetrieveTime appends a time period to retrieve data from the database
-func (p *Profiling) AddToDatabaseRetrieveTime(period float64) {
- p.Lock()
- defer p.Unlock()
-
- p.DatabaseRetrieveTime += period
- p.DatabaseRetrieveCount++
-}
-
-// AddToInMemoryModelTime appends a time period to construct/deconstruct data in memory
-func (p *Profiling) AddToInMemoryModelTime(period float64) {
- p.Lock()
- defer p.Unlock()
-
- p.InMemoryModelTime += period
- p.InMemoryModelCount++
-}
-
-// AddToInMemoryProcessTime appends a time period to process data
-func (p *Profiling) AddToInMemoryProcessTime(period float64) {
- p.Lock()
- defer p.Unlock()
-
- p.InMemoryProcessTime += period
-}
-
-// AddToDatabaseStoreTime appends a time period to store data in the database
-func (p *Profiling) AddToDatabaseStoreTime(period float64) {
- p.Lock()
- defer p.Unlock()
-
- p.DatabaseStoreTime += period
-}
-
-// AddToInMemoryLockTime appends a time period when a code block was locked
-func (p *Profiling) AddToInMemoryLockTime(period float64) {
- p.Lock()
- defer p.Unlock()
-
- p.InMemoryLockTime += period
- p.InMemoryLockCount++
-}
-
-// Reset initializes the profile counters
-func (p *Profiling) Reset() {
- p.Lock()
- defer p.Unlock()
-
- p.DatabaseRetrieveTime = 0
- p.DatabaseRetrieveCount = 0
- p.InMemoryModelTime = 0
- p.InMemoryModelCount = 0
- p.InMemoryProcessTime = 0
- p.DatabaseStoreTime = 0
- p.InMemoryLockTime = 0
- p.InMemoryLockCount = 0
-}
-
-// Report will provide the current profile counter status
-func (p *Profiling) Report() {
- p.Lock()
- defer p.Unlock()
-
- logger.Infof("[ Profiling Report ]")
- logger.Infof("Database Retrieval : %f", p.DatabaseRetrieveTime)
- logger.Infof("Database Retrieval Count : %d", p.DatabaseRetrieveCount)
- logger.Infof("Avg Database Retrieval : %f", p.DatabaseRetrieveTime/float64(p.DatabaseRetrieveCount))
- logger.Infof("In-Memory Modeling : %f", p.InMemoryModelTime)
- logger.Infof("In-Memory Modeling Count: %d", p.InMemoryModelCount)
- logger.Infof("Avg In-Memory Modeling : %f", p.InMemoryModelTime/float64(p.InMemoryModelCount))
- logger.Infof("In-Memory Locking : %f", p.InMemoryLockTime)
- logger.Infof("In-Memory Locking Count: %d", p.InMemoryLockCount)
- logger.Infof("Avg In-Memory Locking : %f", p.InMemoryLockTime/float64(p.InMemoryLockCount))
-
-}
diff --git a/db/model/profiling_test.go b/db/model/profiling_test.go
deleted file mode 100644
index 8b515af..0000000
--- a/db/model/profiling_test.go
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package model
-
-import (
- "reflect"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestProfiling(t *testing.T) {
- want := &Profiling{}
- result := GetProfiling()
- if reflect.TypeOf(result) != reflect.TypeOf(want) {
- t.Errorf("GetProfiling() = result: %v, want: %v", result, want)
- }
-
- /*
- * GetProfiling() returns a singleton instance of the Profiling structure.
- * Verifying this by interchangably calling other methods through above
- * returned "result" instance and by again calling GetProfiling() method,
- * and comparing the results, i.e. all are getting executed on the single
- * same profiling instance.
- */
-
- logger.Info("/***** Unit Test Begin: Profiling Report: *****/")
- result.Report()
-
- GetProfiling().AddToDatabaseRetrieveTime(2.0)
- assert.Equal(t, 2.0, result.DatabaseRetrieveTime)
- assert.Equal(t, 1, result.DatabaseRetrieveCount)
- result.AddToDatabaseRetrieveTime(3.0)
- assert.Equal(t, 5.0, GetProfiling().DatabaseRetrieveTime)
- assert.Equal(t, 2, GetProfiling().DatabaseRetrieveCount)
-
- GetProfiling().AddToInMemoryModelTime(2.0)
- assert.Equal(t, 2.0, result.InMemoryModelTime)
- assert.Equal(t, 1, result.InMemoryModelCount)
- result.AddToInMemoryModelTime(3.0)
- assert.Equal(t, 5.0, GetProfiling().InMemoryModelTime)
- assert.Equal(t, 2, GetProfiling().InMemoryModelCount)
-
- GetProfiling().AddToInMemoryProcessTime(2.0)
- assert.Equal(t, 2.0, result.InMemoryProcessTime)
- result.AddToInMemoryProcessTime(3.0)
- assert.Equal(t, 5.0, GetProfiling().InMemoryProcessTime)
-
- GetProfiling().AddToDatabaseStoreTime(2.0)
- assert.Equal(t, 2.0, result.DatabaseStoreTime)
- result.AddToDatabaseStoreTime(3.0)
- assert.Equal(t, 5.0, GetProfiling().DatabaseStoreTime)
-
- GetProfiling().AddToInMemoryLockTime(2.0)
- assert.Equal(t, 2.0, result.InMemoryLockTime)
- assert.Equal(t, 1, result.InMemoryLockCount)
- result.AddToInMemoryLockTime(3.0)
- assert.Equal(t, 5.0, GetProfiling().InMemoryLockTime)
- assert.Equal(t, 2, GetProfiling().InMemoryLockCount)
-
- logger.Info("/***** Unit Test End: Profiling Report: *****/")
- GetProfiling().Report()
-
- result.Reset()
-}
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 73ea70d..997ebe4 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -18,495 +18,149 @@
import (
"context"
- "crypto/md5"
"errors"
"fmt"
- "reflect"
- "runtime"
- "strings"
- "sync"
-
+ "github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "reflect"
)
-// OperationContext holds details on the information used during an operation
-type OperationContext struct {
- Path string
- Data interface{}
- FieldName string
- ChildKey string
-}
+// RequestTimestamp attribute used to store a timestamp in the context object
+const RequestTimestamp contextKey = "request-timestamp"
-// NewOperationContext instantiates a new OperationContext structure
-func NewOperationContext(path string, data interface{}, fieldName string, childKey string) *OperationContext {
- oc := &OperationContext{
- Path: path,
- Data: data,
- FieldName: fieldName,
- ChildKey: childKey,
- }
- return oc
-}
-
-// Update applies new data to the context structure
-func (oc *OperationContext) Update(data interface{}) *OperationContext {
- oc.Data = data
- return oc
-}
+type contextKey string
// Proxy holds the information for a specific location with the data model
type Proxy struct {
- mutex sync.RWMutex
- Root *root
- Node *node
- ParentNode *node
- Path string
- FullPath string
- Exclusive bool
- Callbacks map[CallbackType]map[string]*CallbackTuple
- operation ProxyOperation
+ kvStore *db.Backend
+ path string
}
// NewProxy instantiates a new proxy to a specific location
-func NewProxy(root *root, node *node, parentNode *node, path string, fullPath string, exclusive bool) *Proxy {
- callbacks := make(map[CallbackType]map[string]*CallbackTuple)
- if fullPath == "/" {
- fullPath = ""
- }
- p := &Proxy{
- Root: root,
- Node: node,
- ParentNode: parentNode,
- Exclusive: exclusive,
- Path: path,
- FullPath: fullPath,
- Callbacks: callbacks,
- }
- return p
-}
-
-// getRoot returns the root attribute of the proxy
-func (p *Proxy) getRoot() *root {
- return p.Root
-}
-
-// getPath returns the path attribute of the proxy
-func (p *Proxy) getPath() string {
- return p.Path
-}
-
-// getFullPath returns the full path attribute of the proxy
-func (p *Proxy) getFullPath() string {
- return p.FullPath
-}
-
-// getCallbacks returns the full list of callbacks associated to the proxy
-func (p *Proxy) getCallbacks(callbackType CallbackType) map[string]*CallbackTuple {
- p.mutex.RLock()
- defer p.mutex.RUnlock()
-
- if p != nil {
- if cb, exists := p.Callbacks[callbackType]; exists {
- return cb
- }
- } else {
- logger.Debugw("proxy-is-nil", log.Fields{"callback-type": callbackType.String()})
- }
- return nil
-}
-
-// getCallback returns a specific callback matching the type and function hash
-func (p *Proxy) getCallback(callbackType CallbackType, funcHash string) *CallbackTuple {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- if tuple, exists := p.Callbacks[callbackType][funcHash]; exists {
- return tuple
- }
- return nil
-}
-
-// setCallbacks applies a callbacks list to a type
-func (p *Proxy) setCallbacks(callbackType CallbackType, callbacks map[string]*CallbackTuple) {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- p.Callbacks[callbackType] = callbacks
-}
-
-// setCallback applies a callback to a type and hash value
-func (p *Proxy) setCallback(callbackType CallbackType, funcHash string, tuple *CallbackTuple) {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- p.Callbacks[callbackType][funcHash] = tuple
-}
-
-// DeleteCallback removes a callback matching the type and hash
-func (p *Proxy) DeleteCallback(callbackType CallbackType, funcHash string) {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- delete(p.Callbacks[callbackType], funcHash)
-}
-
-// ProxyOperation callbackType is an enumerated value to express when a callback should be executed
-type ProxyOperation uint8
-
-// Enumerated list of callback types
-const (
- ProxyNone ProxyOperation = iota
- ProxyGet
- ProxyList
- ProxyAdd
- ProxyUpdate
- ProxyRemove
- ProxyCreate
- ProxyWatch
-)
-
-var proxyOperationTypes = []string{
- "PROXY_NONE",
- "PROXY_GET",
- "PROXY_LIST",
- "PROXY_ADD",
- "PROXY_UPDATE",
- "PROXY_REMOVE",
- "PROXY_CREATE",
- "PROXY_WATCH",
-}
-
-func (t ProxyOperation) String() string {
- return proxyOperationTypes[t]
-}
-
-// GetOperation -
-func (p *Proxy) GetOperation() ProxyOperation {
- p.mutex.RLock()
- defer p.mutex.RUnlock()
- return p.operation
-}
-
-// SetOperation -
-func (p *Proxy) SetOperation(operation ProxyOperation) {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- p.operation = operation
-}
-
-// List will retrieve information from the data model at the specified path location
-// A list operation will force access to persistence storage
-func (p *Proxy) List(ctx context.Context, path string, depth int, deep bool, txid string) (interface{}, error) {
- var effectivePath string
+func NewProxy(kvStore *db.Backend, path string) *Proxy {
if path == "/" {
- effectivePath = p.getFullPath()
- } else {
- effectivePath = p.getFullPath() + path
+ path = ""
}
+ return &Proxy{
+ kvStore: kvStore,
+ path: path,
+ }
+}
- p.SetOperation(ProxyList)
- defer p.SetOperation(ProxyNone)
+// List will retrieve information from the data model at the specified path location, and write it to the target slice
+// target must be a type of the form *[]<proto.Message Type> For example: *[]*voltha.Device
+func (p *Proxy) List(ctx context.Context, path string, target interface{}) error {
+ completePath := p.path + path
logger.Debugw("proxy-list", log.Fields{
- "path": path,
- "effective": effectivePath,
- "operation": p.GetOperation(),
+ "path": completePath,
})
- return p.getRoot().List(ctx, path, "", depth, deep, txid)
-}
-// Get will retrieve information from the data model at the specified path location
-func (p *Proxy) Get(ctx context.Context, path string, depth int, deep bool, txid string) (interface{}, error) {
- var effectivePath string
- if path == "/" {
- effectivePath = p.getFullPath()
- } else {
- effectivePath = p.getFullPath() + path
+ // verify type of target is *[]*<type>
+ pointerType := reflect.TypeOf(target) // *[]*<type>
+ if pointerType.Kind() != reflect.Ptr {
+ return errors.New("target is not of type *[]*<type>")
+ }
+ sliceType := pointerType.Elem() // []*type
+ if sliceType.Kind() != reflect.Slice {
+ return errors.New("target is not of type *[]*<type>")
+ }
+ elemType := sliceType.Elem() // *type
+ if sliceType.Implements(reflect.TypeOf((*proto.Message)(nil)).Elem()) {
+ return errors.New("target slice does not contain elements of type proto.Message")
+ }
+ dataType := elemType.Elem() // type
+
+ blobs, err := p.kvStore.List(ctx, completePath)
+ if err != nil {
+ return fmt.Errorf("failed to retrieve %s from kvstore: %s", path, err)
}
- p.SetOperation(ProxyGet)
- defer p.SetOperation(ProxyNone)
-
- logger.Debugw("proxy-get", log.Fields{
- "path": path,
- "effective": effectivePath,
- "operation": p.GetOperation(),
+ logger.Debugw("parsing-data-blobs", log.Fields{
+ "path": path,
+ "size": len(blobs),
})
- return p.getRoot().Get(ctx, path, "", depth, deep, txid)
+ ret := reflect.MakeSlice(sliceType, len(blobs), len(blobs))
+ i := 0
+ for _, blob := range blobs {
+ data := reflect.New(dataType)
+ if err := proto.Unmarshal(blob.Value.([]byte), data.Interface().(proto.Message)); err != nil {
+ return fmt.Errorf("failed to unmarshal %s: %s", blob.Key, err)
+ }
+ ret.Index(i).Set(data)
+ i++
+ }
+ reflect.ValueOf(target).Elem().Set(ret)
+ return nil
+}
+
+// Get will retrieve information from the data model at the specified path location, and write it to target
+func (p *Proxy) Get(ctx context.Context, path string, target proto.Message) (bool, error) {
+ completePath := p.path + path
+
+ logger.Debugw("proxy-get", log.Fields{
+ "path": completePath,
+ })
+
+ blob, err := p.kvStore.Get(ctx, completePath)
+ if err != nil {
+ return false, fmt.Errorf("failed to retrieve %s from kvstore: %s", path, err)
+ } else if blob == nil {
+ return false, nil // this blob does not exist
+ }
+
+ logger.Debugw("parsing-data-blobs", log.Fields{
+ "path": path,
+ })
+
+ if err := proto.Unmarshal(blob.Value.([]byte), target); err != nil {
+ return false, fmt.Errorf("failed to unmarshal %s: %s", blob.Key, err)
+ }
+ return true, nil
}
// Update will modify information in the data model at the specified location with the provided data
-func (p *Proxy) Update(ctx context.Context, path string, data interface{}, strict bool, txid string) (interface{}, error) {
- if !strings.HasPrefix(path, "/") {
- logger.Errorf("invalid path: %s", path)
- return nil, fmt.Errorf("invalid path: %s", path)
- }
- var fullPath string
- var effectivePath string
- if path == "/" {
- fullPath = p.getPath()
- effectivePath = p.getFullPath()
- } else {
- fullPath = p.getPath() + path
- effectivePath = p.getFullPath() + path
- }
-
- p.SetOperation(ProxyUpdate)
- defer p.SetOperation(ProxyNone)
-
- logger.Debugw("proxy-update", log.Fields{
- "path": path,
- "effective": effectivePath,
- "full": fullPath,
- "operation": p.GetOperation(),
- })
-
- result := p.getRoot().Update(ctx, fullPath, data, strict, txid, nil)
-
- if result != nil {
- return result.GetData(), nil
- }
-
- return nil, nil
+func (p *Proxy) Update(ctx context.Context, path string, data proto.Message) error {
+ return p.add(ctx, path, data)
}
// AddWithID will insert new data at specified location.
-// This method also allows the user to specify the ID of the data entry to ensure
-// that access control is active while inserting the information.
-func (p *Proxy) AddWithID(ctx context.Context, path string, id string, data interface{}, txid string) (interface{}, error) {
- if !strings.HasPrefix(path, "/") {
- logger.Errorf("invalid path: %s", path)
- return nil, fmt.Errorf("invalid path: %s", path)
- }
- var fullPath string
- var effectivePath string
- if path == "/" {
- fullPath = p.getPath()
- effectivePath = p.getFullPath()
- } else {
- fullPath = p.getPath() + path
- effectivePath = p.getFullPath() + path + "/" + id
- }
-
- p.SetOperation(ProxyAdd)
- defer p.SetOperation(ProxyNone)
-
- logger.Debugw("proxy-add-with-id", log.Fields{
- "path": path,
- "effective": effectivePath,
- "full": fullPath,
- "operation": p.GetOperation(),
- })
-
- result := p.getRoot().Add(ctx, fullPath, data, txid, nil)
-
- if result != nil {
- return result.GetData(), nil
- }
-
- return nil, nil
+// This method also allows the user to specify the ID.
+func (p *Proxy) AddWithID(ctx context.Context, path string, id string, data proto.Message) error {
+ return p.add(ctx, path+"/"+id, data)
}
-// Add will insert new data at specified location.
-func (p *Proxy) Add(ctx context.Context, path string, data interface{}, txid string) (interface{}, error) {
- if !strings.HasPrefix(path, "/") {
- logger.Errorf("invalid path: %s", path)
- return nil, fmt.Errorf("invalid path: %s", path)
- }
- var fullPath string
- var effectivePath string
- if path == "/" {
- fullPath = p.getPath()
- effectivePath = p.getFullPath()
- } else {
- fullPath = p.getPath() + path
- effectivePath = p.getFullPath() + path
- }
-
- p.SetOperation(ProxyAdd)
- defer p.SetOperation(ProxyNone)
+// add will insert new data at specified location.
+func (p *Proxy) add(ctx context.Context, path string, data proto.Message) error {
+ completePath := p.path + path
logger.Debugw("proxy-add", log.Fields{
- "path": path,
- "effective": effectivePath,
- "full": fullPath,
- "operation": p.GetOperation(),
+ "path": completePath,
})
- result := p.getRoot().Add(ctx, fullPath, data, txid, nil)
-
- if result != nil {
- return result.GetData(), nil
+ blob, err := proto.Marshal(data)
+ if err != nil {
+ return fmt.Errorf("unable to save to kvStore, error marshalling: %s", err)
}
- return nil, nil
+ if err := p.kvStore.Put(ctx, completePath, blob); err != nil {
+ return fmt.Errorf("unable to write to kvStore: %s", err)
+ }
+ return nil
}
// Remove will delete an entry at the specified location
-func (p *Proxy) Remove(ctx context.Context, path string, txid string) (interface{}, error) {
- if !strings.HasPrefix(path, "/") {
- logger.Errorf("invalid path: %s", path)
- return nil, fmt.Errorf("invalid path: %s", path)
- }
- var fullPath string
- var effectivePath string
- if path == "/" {
- fullPath = p.getPath()
- effectivePath = p.getFullPath()
- } else {
- fullPath = p.getPath() + path
- effectivePath = p.getFullPath() + path
- }
-
- p.SetOperation(ProxyRemove)
- defer p.SetOperation(ProxyNone)
+func (p *Proxy) Remove(ctx context.Context, path string) error {
+ completePath := p.path + path
logger.Debugw("proxy-remove", log.Fields{
- "path": path,
- "effective": effectivePath,
- "full": fullPath,
- "operation": p.GetOperation(),
+ "path": completePath,
})
- result := p.getRoot().Remove(ctx, fullPath, txid, nil)
-
- if result != nil {
- return result.GetData(), nil
+ if err := p.kvStore.Delete(ctx, completePath); err != nil {
+ return fmt.Errorf("unable to delete %s in kvStore: %s", completePath, err)
}
-
- return nil, nil
-}
-
-// CreateProxy to interact with specific path directly
-func (p *Proxy) CreateProxy(ctx context.Context, path string, exclusive bool) (*Proxy, error) {
- if !strings.HasPrefix(path, "/") {
- logger.Errorf("invalid path: %s", path)
- return nil, fmt.Errorf("invalid path: %s", path)
- }
-
- var fullPath string
- var effectivePath string
- if path == "/" {
- fullPath = p.getPath()
- effectivePath = p.getFullPath()
- } else {
- fullPath = p.getPath() + path
- effectivePath = p.getFullPath() + path
- }
-
- p.SetOperation(ProxyCreate)
- defer p.SetOperation(ProxyNone)
-
- logger.Debugw("proxy-create", log.Fields{
- "path": path,
- "effective": effectivePath,
- "full": fullPath,
- "operation": p.GetOperation(),
- })
-
- return p.getRoot().CreateProxy(ctx, fullPath, exclusive)
-}
-
-// OpenTransaction creates a new transaction branch to isolate operations made to the data model
-func (p *Proxy) OpenTransaction() *Transaction {
- txid := p.getRoot().MakeTxBranch()
- return NewTransaction(p, txid)
-}
-
-// commitTransaction will apply and merge modifications made in the transaction branch to the data model
-func (p *Proxy) commitTransaction(ctx context.Context, txid string) {
- p.getRoot().FoldTxBranch(ctx, txid)
-}
-
-// cancelTransaction will terminate a transaction branch along will all changes within it
-func (p *Proxy) cancelTransaction(txid string) {
- p.getRoot().DeleteTxBranch(txid)
-}
-
-// CallbackFunction is a type used to define callback functions
-type CallbackFunction func(ctx context.Context, args ...interface{}) interface{}
-
-// CallbackTuple holds the function and arguments details of a callback
-type CallbackTuple struct {
- callback CallbackFunction
- args []interface{}
-}
-
-// Execute will process the a callback with its provided arguments
-func (tuple *CallbackTuple) Execute(ctx context.Context, contextArgs []interface{}) interface{} {
- args := []interface{}{}
-
- args = append(args, tuple.args...)
-
- args = append(args, contextArgs...)
-
- return tuple.callback(ctx, args...)
-}
-
-// RegisterCallback associates a callback to the proxy
-func (p *Proxy) RegisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
- if p.getCallbacks(callbackType) == nil {
- p.setCallbacks(callbackType, make(map[string]*CallbackTuple))
- }
- funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
- logger.Debugf("value of function: %s", funcName)
- funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
-
- p.setCallback(callbackType, funcHash, &CallbackTuple{callback, args})
-}
-
-// UnregisterCallback removes references to a callback within a proxy
-func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
- if p.getCallbacks(callbackType) == nil {
- logger.Errorf("no such callback type - %s", callbackType.String())
- return
- }
-
- funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
- funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
-
- logger.Debugf("value of function: %s", funcName)
-
- if p.getCallback(callbackType, funcHash) == nil {
- logger.Errorf("function with hash value: '%s' not registered with callback type: '%s'", funcHash, callbackType)
- return
- }
-
- p.DeleteCallback(callbackType, funcHash)
-}
-
-func (p *Proxy) invoke(ctx context.Context, callback *CallbackTuple, context []interface{}) (result interface{}, err error) {
- defer func() {
- if r := recover(); r != nil {
- errStr := fmt.Sprintf("callback error occurred: %+v", r)
- err = errors.New(errStr)
- logger.Error(errStr)
- }
- }()
-
- result = callback.Execute(ctx, context)
-
- return result, err
-}
-
-// InvokeCallbacks executes all callbacks associated to a specific type
-func (p *Proxy) InvokeCallbacks(ctx context.Context, args ...interface{}) (result interface{}) {
- callbackType := args[0].(CallbackType)
- proceedOnError := args[1].(bool)
- context := args[2:]
-
- var err error
-
- if callbacks := p.getCallbacks(callbackType); callbacks != nil {
- p.mutex.Lock()
- for _, callback := range callbacks {
- if result, err = p.invoke(ctx, callback, context); err != nil {
- if !proceedOnError {
- logger.Info("An error occurred. Stopping callback invocation")
- break
- }
- logger.Info("An error occurred. Invoking next callback")
- }
- }
- p.mutex.Unlock()
- }
-
- return result
+ return nil
}
diff --git a/db/model/proxy_load_test.go b/db/model/proxy_load_test.go
deleted file mode 100644
index 25903ed..0000000
--- a/db/model/proxy_load_test.go
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package model
-
-import (
- "context"
- "encoding/hex"
- "math/rand"
- "reflect"
- "strconv"
- "sync"
- "testing"
-
- "github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- "github.com/opencord/voltha-protos/v3/go/common"
- "github.com/opencord/voltha-protos/v3/go/openflow_13"
- "github.com/opencord/voltha-protos/v3/go/voltha"
- "github.com/stretchr/testify/assert"
-)
-
-var (
- BenchmarkProxyRoot Root
- BenchmarkProxyDeviceProxy *Proxy
- BenchmarkProxyPLT *proxyLoadTest
- BenchmarkProxyLogger log.Logger
-)
-
-type proxyLoadChanges struct {
- ID string
- Before interface{}
- After interface{}
-}
-type proxyLoadTest struct {
- mutex sync.RWMutex
-
- addMutex sync.RWMutex
- addedDevices []string
-
- firmwareMutex sync.RWMutex
- updatedFirmwares []proxyLoadChanges
- flowMutex sync.RWMutex
- updatedFlows []proxyLoadChanges
-
- preAddExecuted bool
- postAddExecuted bool
- preUpdateExecuted bool
- postUpdateExecuted bool
-}
-
-func (plt *proxyLoadTest) SetPreAddExecuted(status bool) {
- plt.mutex.Lock()
- defer plt.mutex.Unlock()
- plt.preAddExecuted = status
-}
-func (plt *proxyLoadTest) SetPostAddExecuted(status bool) {
- plt.mutex.Lock()
- defer plt.mutex.Unlock()
- plt.postAddExecuted = status
-}
-func (plt *proxyLoadTest) SetPreUpdateExecuted(status bool) {
- plt.mutex.Lock()
- defer plt.mutex.Unlock()
- plt.preUpdateExecuted = status
-}
-func (plt *proxyLoadTest) SetPostUpdateExecuted(status bool) {
- plt.mutex.Lock()
- defer plt.mutex.Unlock()
- plt.postUpdateExecuted = status
-}
-
-func init() {
- var err error
- BenchmarkProxyRoot = NewRoot(&voltha.Voltha{}, nil)
-
- BenchmarkProxyLogger, _ = log.AddPackage(log.JSON, log.DebugLevel, log.Fields{"instanceId": "PLT"})
- //log.UpdateAllLoggers(log.Fields{"instanceId": "PROXY_LOAD_TEST"})
- //Setup default logger - applies for packages that do not have specific logger set
- if _, err := log.SetDefaultLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId": "PLT"}); err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
- }
-
- // Update all loggers (provisioned via init) with a common field
- if err := log.UpdateAllLoggers(log.Fields{"instanceId": "PLT"}); err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
- }
- log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
-
- if BenchmarkProxyDeviceProxy, err = BenchmarkProxyRoot.CreateProxy(context.Background(), "/", false); err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create benchmark proxy")
- }
- // Register ADD instructions callbacks
- BenchmarkProxyPLT = &proxyLoadTest{}
-
- BenchmarkProxyDeviceProxy.RegisterCallback(PreAdd, commonCallbackFunc, "PreAdd", BenchmarkProxyPLT.SetPreAddExecuted)
- BenchmarkProxyDeviceProxy.RegisterCallback(PostAdd, commonCallbackFunc, "PostAdd", BenchmarkProxyPLT.SetPostAddExecuted)
-
- //// Register UPDATE instructions callbacks
- BenchmarkProxyDeviceProxy.RegisterCallback(PreUpdate, commonCallbackFunc, "PreUpdate", BenchmarkProxyPLT.SetPreUpdateExecuted)
- BenchmarkProxyDeviceProxy.RegisterCallback(PostUpdate, commonCallbackFunc, "PostUpdate", BenchmarkProxyPLT.SetPostUpdateExecuted)
-
-}
-
-func BenchmarkProxy_AddDevice(b *testing.B) {
- var err error
- defer GetProfiling().Report()
- b.RunParallel(func(pb *testing.PB) {
- b.Log("Started adding devices")
- for pb.Next() {
- ltPorts := []*voltha.Port{
- {
- PortNo: 123,
- Label: "lt-port-0",
- Type: voltha.Port_PON_OLT,
- AdminState: common.AdminState_ENABLED,
- OperStatus: common.OperStatus_ACTIVE,
- DeviceId: "lt-port-0-device-id",
- Peers: []*voltha.Port_PeerPort{},
- },
- }
-
- ltStats := &openflow_13.OfpFlowStats{
- Id: 1000,
- }
- ltFlows := &openflow_13.Flows{
- Items: []*openflow_13.OfpFlowStats{ltStats},
- }
- ltDevice := &voltha.Device{
- Id: "",
- Type: "simulated_olt",
- Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
- AdminState: voltha.AdminState_PREPROVISIONED,
- Flows: ltFlows,
- Ports: ltPorts,
- }
-
- ltDevIDBin, _ := uuid.New().MarshalBinary()
- ltDevID := "0001" + hex.EncodeToString(ltDevIDBin)[:12]
- ltDevice.Id = ltDevID
-
- BenchmarkProxyPLT.SetPreAddExecuted(false)
- BenchmarkProxyPLT.SetPostAddExecuted(false)
-
- var added interface{}
- // Add the device
- if added, err = BenchmarkProxyDeviceProxy.AddWithID(context.Background(), "/devices", ltDevID, ltDevice, ""); err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create proxy")
- }
- if added == nil {
- BenchmarkProxyLogger.Errorf("Failed to add device: %+v", ltDevice)
- continue
- } else {
- BenchmarkProxyLogger.Infof("Device was added 1: %+v", added)
- }
-
- BenchmarkProxyPLT.addMutex.Lock()
- BenchmarkProxyPLT.addedDevices = append(BenchmarkProxyPLT.addedDevices, added.(*voltha.Device).Id)
- BenchmarkProxyPLT.addMutex.Unlock()
- }
- })
-
- BenchmarkProxyLogger.Infof("Number of added devices : %d", len(BenchmarkProxyPLT.addedDevices))
-}
-
-func BenchmarkProxy_UpdateFirmware(b *testing.B) {
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- //for i:=0; i < b.N; i++ {
-
- if len(BenchmarkProxyPLT.addedDevices) > 0 {
- var target interface{}
- randomID := BenchmarkProxyPLT.addedDevices[rand.Intn(len(BenchmarkProxyPLT.addedDevices))]
- firmProxy, err := BenchmarkProxyRoot.CreateProxy(context.Background(), "/", false)
- if err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create firmware proxy")
- }
- target, err = firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false,
- "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to create target due to error %v", err)
- assert.NotNil(b, err)
- }
- if !reflect.ValueOf(target).IsValid() {
- BenchmarkProxyLogger.Errorf("Failed to find device: %s %+v", randomID, target)
- continue
- }
-
- BenchmarkProxyPLT.SetPreUpdateExecuted(false)
- BenchmarkProxyPLT.SetPostUpdateExecuted(false)
- firmProxy.RegisterCallback(PreUpdate, commonCallbackFunc, "PreUpdate", BenchmarkProxyPLT.SetPreUpdateExecuted)
- firmProxy.RegisterCallback(PostUpdate, commonCallbackFunc, "PostUpdate", BenchmarkProxyPLT.SetPostUpdateExecuted)
-
- var fwVersion int
-
- before := target.(*voltha.Device).FirmwareVersion
- if target.(*voltha.Device).FirmwareVersion == "n/a" {
- fwVersion = 0
- } else {
- fwVersion, _ = strconv.Atoi(target.(*voltha.Device).FirmwareVersion)
- fwVersion++
- }
-
- target.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
- after := target.(*voltha.Device).FirmwareVersion
-
- var updated interface{}
- if updated, err = firmProxy.Update(context.Background(), "/devices/"+randomID, target.(*voltha.Device), false, ""); err != nil {
- BenchmarkProxyLogger.Errorf("Failed to update firmware proxy due to error %v", err)
- assert.NotNil(b, err)
- }
- if updated == nil {
- BenchmarkProxyLogger.Errorf("Failed to update device: %+v", target)
- continue
- } else {
- BenchmarkProxyLogger.Infof("Device was updated : %+v", updated)
-
- }
-
- d, err := firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get device info from firmware proxy due to error %v", err)
- assert.NotNil(b, err)
- }
- if !reflect.ValueOf(d).IsValid() {
- BenchmarkProxyLogger.Errorf("Failed to get device: %s", randomID)
- continue
- } else if d.(*voltha.Device).FirmwareVersion == after {
- BenchmarkProxyLogger.Infof("Imm Device was updated with new value: %s %+v", randomID, d)
- } else if d.(*voltha.Device).FirmwareVersion == before {
- BenchmarkProxyLogger.Errorf("Imm Device kept old value: %s %+v %+v", randomID, d, target)
- } else {
- BenchmarkProxyLogger.Errorf("Imm Device has unknown value: %s %+v %+v", randomID, d, target)
- }
-
- BenchmarkProxyPLT.firmwareMutex.Lock()
-
- BenchmarkProxyPLT.updatedFirmwares = append(
- BenchmarkProxyPLT.updatedFirmwares,
- proxyLoadChanges{ID: randomID, Before: before, After: after},
- )
- BenchmarkProxyPLT.firmwareMutex.Unlock()
- }
- }
- })
-}
-
-func BenchmarkProxy_UpdateFlows(b *testing.B) {
- b.RunParallel(func(pb *testing.PB) {
- for pb.Next() {
- if len(BenchmarkProxyPLT.addedDevices) > 0 {
- randomID := BenchmarkProxyPLT.addedDevices[rand.Intn(len(BenchmarkProxyPLT.addedDevices))]
-
- flowsProxy, err := BenchmarkProxyRoot.CreateProxy(context.Background(), "/devices/"+randomID+"/flows", false)
- if err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create flows proxy")
- }
- flows, err := flowsProxy.Get(context.Background(), "/", 0, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get flows from flows proxy due to error: %v", err)
- assert.NotNil(b, err)
- }
-
- before := flows.(*openflow_13.Flows).Items[0].TableId
- flows.(*openflow_13.Flows).Items[0].TableId = uint32(rand.Intn(3000))
- after := flows.(*openflow_13.Flows).Items[0].TableId
-
- flowsProxy.RegisterCallback(
- PreUpdate,
- commonCallback2,
- )
- flowsProxy.RegisterCallback(
- PostUpdate,
- commonCallback2,
- )
-
- var updated interface{}
- if updated, err = flowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); err != nil {
- BenchmarkProxyLogger.Errorf("Cannot update flows proxy due to error: %v", err)
- assert.NotNil(b, err)
- }
- if updated == nil {
- b.Errorf("Failed to update flows for device: %+v", flows)
- } else {
- BenchmarkProxyLogger.Infof("Flows were updated : %+v", updated)
- }
- BenchmarkProxyPLT.flowMutex.Lock()
- BenchmarkProxyPLT.updatedFlows = append(
- BenchmarkProxyPLT.updatedFlows,
- proxyLoadChanges{ID: randomID, Before: before, After: after},
- )
- BenchmarkProxyPLT.flowMutex.Unlock()
- }
- }
- })
-}
-
-func BenchmarkProxy_GetDevices(b *testing.B) {
- //traverseBranches(BenchmarkProxy_DeviceProxy.Root.node.Branches[NONE].GetLatest(), 0)
-
- for i := 0; i < len(BenchmarkProxyPLT.addedDevices); i++ {
- devToGet := BenchmarkProxyPLT.addedDevices[i]
- // Verify that the added device can now be retrieved
- d, err := BenchmarkProxyDeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get device info from device proxy due to error: %v", err)
- assert.NotNil(b, err)
- }
- if !reflect.ValueOf(d).IsValid() {
- BenchmarkProxyLogger.Errorf("Failed to get device: %s", devToGet)
- continue
- } else {
- BenchmarkProxyLogger.Infof("Got device: %s %+v", devToGet, d)
- }
- }
-}
-
-func BenchmarkProxy_GetUpdatedFirmware(b *testing.B) {
- for i := 0; i < len(BenchmarkProxyPLT.updatedFirmwares); i++ {
- devToGet := BenchmarkProxyPLT.updatedFirmwares[i].ID
- // Verify that the updated device can be retrieved and that the updates were actually applied
- d, err := BenchmarkProxyDeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get device info from device proxy due to error: %v", err)
- assert.NotNil(b, err)
- }
- if !reflect.ValueOf(d).IsValid() {
- BenchmarkProxyLogger.Errorf("Failed to get device: %s", devToGet)
- continue
- } else if d.(*voltha.Device).FirmwareVersion == BenchmarkProxyPLT.updatedFirmwares[i].After.(string) {
- BenchmarkProxyLogger.Infof("Device was updated with new value: %s %+v", devToGet, d)
- } else if d.(*voltha.Device).FirmwareVersion == BenchmarkProxyPLT.updatedFirmwares[i].Before.(string) {
- BenchmarkProxyLogger.Errorf("Device kept old value: %s %+v %+v", devToGet, d, BenchmarkProxyPLT.updatedFirmwares[i])
- } else {
- BenchmarkProxyLogger.Errorf("Device has unknown value: %s %+v %+v", devToGet, d, BenchmarkProxyPLT.updatedFirmwares[i])
- }
- }
-}
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index 5bfd8d8..683e0a4 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -19,23 +19,22 @@
"context"
"encoding/hex"
"encoding/json"
- "math/rand"
- "reflect"
- "strconv"
- "testing"
- "time"
-
- "github.com/golang/protobuf/proto"
"github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/common"
"github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/stretchr/testify/assert"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
)
var (
- TestProxyRoot Root
+ BenchmarkProxyLogger log.Logger
TestProxyRootLogicalDevice *Proxy
TestProxyRootDevice *Proxy
TestProxyRootAdapter *Proxy
@@ -54,17 +53,22 @@
)
func init() {
- var err error
- TestProxyRoot = NewRoot(&voltha.Voltha{}, nil)
- if TestProxyRootLogicalDevice, err = TestProxyRoot.CreateProxy(context.Background(), "/", false); err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create logical device proxy")
+ BenchmarkProxyLogger, _ = log.AddPackage(log.JSON, log.DebugLevel, log.Fields{"instanceId": "PLT"})
+ //log.UpdateAllLoggers(log.Fields{"instanceId": "PROXY_LOAD_TEST"})
+ //Setup default logger - applies for packages that do not have specific logger set
+ if _, err := log.SetDefaultLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId": "PLT"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
}
- if TestProxyRootDevice, err = TestProxyRoot.CreateProxy(context.Background(), "/", false); err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create device proxy")
+
+ // Update all loggers (provisioned via init) with a common field
+ if err := log.UpdateAllLoggers(log.Fields{"instanceId": "PLT"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
}
- if TestProxyRootAdapter, err = TestProxyRoot.CreateProxy(context.Background(), "/", false); err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create adapter proxy")
- }
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
+
+ TestProxyRootLogicalDevice = NewProxy(mockBackend, "/")
+ TestProxyRootDevice = NewProxy(mockBackend, "/")
+ TestProxyRootAdapter = NewProxy(mockBackend, "/")
TestProxyLogicalPorts = []*voltha.LogicalPort{
{
@@ -115,51 +119,68 @@
}
}
+type mockKV struct {
+ kvstore.Client // pretend we implement everything
+
+ mutex sync.RWMutex
+ data map[string]interface{}
+}
+
+func (kv *mockKV) List(_ context.Context, key string) (map[string]*kvstore.KVPair, error) {
+ kv.mutex.RLock()
+ defer kv.mutex.RUnlock()
+
+ ret := make(map[string]*kvstore.KVPair, len(kv.data))
+ for k, v := range kv.data {
+ if strings.HasPrefix(k, key) {
+ ret[k] = &kvstore.KVPair{Key: k, Value: v}
+ }
+ }
+ return ret, nil
+}
+func (kv *mockKV) Get(_ context.Context, key string) (*kvstore.KVPair, error) {
+ kv.mutex.RLock()
+ defer kv.mutex.RUnlock()
+
+ if val, have := kv.data[key]; have {
+ return &kvstore.KVPair{Key: key, Value: val}, nil
+ }
+ return nil, nil
+}
+func (kv *mockKV) Put(_ context.Context, key string, value interface{}) error {
+ kv.mutex.Lock()
+ defer kv.mutex.Unlock()
+
+ kv.data[key] = value
+ return nil
+}
+func (kv *mockKV) Delete(_ context.Context, key string) error {
+ kv.mutex.Lock()
+ defer kv.mutex.Unlock()
+
+ delete(kv.data, key)
+ return nil
+}
+
+var mockBackend = &db.Backend{Client: &mockKV{data: make(map[string]interface{})}}
+
func TestProxy_1_1_1_Add_NewDevice(t *testing.T) {
devIDBin, _ := uuid.New().MarshalBinary()
TestProxyDeviceID = "0001" + hex.EncodeToString(devIDBin)[:12]
TestProxyDevice.Id = TestProxyDeviceID
- preAddExecuted := make(chan struct{})
- postAddExecuted := make(chan struct{})
- preAddExecutedPtr, postAddExecutedPtr := preAddExecuted, postAddExecuted
-
- devicesProxy, err := TestProxyRoot.CreateProxy(context.Background(), "/devices", false)
- if err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create devices proxy")
- }
- devicesProxy.RegisterCallback(PreAdd, commonCallback2, "PRE_ADD Device container changes")
- devicesProxy.RegisterCallback(PostAdd, commonCallback2, "POST_ADD Device container changes")
-
- // Register ADD instructions callbacks
- TestProxyRootDevice.RegisterCallback(PreAdd, commonChanCallback, "PreAdd instructions", &preAddExecutedPtr)
- TestProxyRootDevice.RegisterCallback(PostAdd, commonChanCallback, "PostAdd instructions", &postAddExecutedPtr)
-
- added, err := TestProxyRootDevice.Add(context.Background(), "/devices", TestProxyDevice, "")
- if err != nil {
+ if err := TestProxyRootDevice.AddWithID(context.Background(), "devices", TestProxyDeviceID, TestProxyDevice); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add test proxy device due to error: %v", err)
- assert.NotNil(t, err)
+ t.Errorf("failed to add device: %s", err)
}
- if added == nil {
- t.Error("Failed to add device")
- } else {
- t.Logf("Added device : %+v", added)
- }
-
- if !verifyGotResponse(preAddExecuted) {
- t.Error("PreAdd callback was not executed")
- }
- if !verifyGotResponse(postAddExecuted) {
- t.Error("PostAdd callback was not executed")
- }
+ t.Logf("Added device : %+v", TestProxyDevice.Id)
// Verify that the added device can now be retrieved
- d, err := TestProxyRootDevice.Get(context.Background(), "/devices/"+TestProxyDeviceID, 0, false, "")
- if err != nil {
+ d := &voltha.Device{}
+ if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed get device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if !reflect.ValueOf(d).IsValid() {
+ } else if !have {
t.Error("Failed to find added device")
} else {
djson, _ := json.Marshal(d)
@@ -170,98 +191,74 @@
func TestProxy_1_1_2_Add_ExistingDevice(t *testing.T) {
TestProxyDevice.Id = TestProxyDeviceID
- added, err := TestProxyRootDevice.Add(context.Background(), "/devices", TestProxyDevice, "")
- if err != nil {
+ if err := TestProxyRootDevice.add(context.Background(), "devices", TestProxyDevice); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add device to test proxy due to error: %v", err)
assert.NotNil(t, err)
}
- if added.(proto.Message).String() != reflect.ValueOf(TestProxyDevice).Interface().(proto.Message).String() {
- t.Errorf("Devices don't match - existing: %+v returned: %+v", TestProxyLogicalDevice, added)
- }
-}
-func verifyGotResponse(callbackIndicator <-chan struct{}) bool {
- timeout := time.After(1 * time.Second)
- // Wait until the channel closes, or we time out
- select {
- case <-callbackIndicator:
- // Received response successfully
- return true
-
- case <-timeout:
- // Got a timeout! fail with a timeout error
- return false
+ d := &voltha.Device{}
+ if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyDeviceID, d); err != nil {
+ BenchmarkProxyLogger.Errorf("Failed get device info from test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ } else if !have {
+ t.Error("Failed to find added device")
+ } else {
+ if d.String() != TestProxyDevice.String() {
+ t.Errorf("Devices don't match - existing: %+v returned: %+v", TestProxyLogicalDevice, d)
+ }
+ djson, _ := json.Marshal(d)
+ t.Logf("Found device: %s", string(djson))
}
+
}
func TestProxy_1_1_3_Add_NewAdapter(t *testing.T) {
TestProxyAdapterID = "test-adapter"
TestProxyAdapter.Id = TestProxyAdapterID
- preAddExecuted := make(chan struct{})
- postAddExecuted := make(chan struct{})
- preAddExecutedPtr, postAddExecutedPtr := preAddExecuted, postAddExecuted
-
- // Register ADD instructions callbacks
- TestProxyRootAdapter.RegisterCallback(PreAdd, commonChanCallback, "PreAdd instructions for adapters", &preAddExecutedPtr)
- TestProxyRootAdapter.RegisterCallback(PostAdd, commonChanCallback, "PostAdd instructions for adapters", &postAddExecutedPtr)
// Add the adapter
- added, err := TestProxyRootAdapter.Add(context.Background(), "/adapters", TestProxyAdapter, "")
- if err != nil {
+ if err := TestProxyRootAdapter.AddWithID(context.Background(), "adapters", TestProxyAdapterID, TestProxyAdapter); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add adapter to test proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if added == nil {
- t.Error("Failed to add adapter")
} else {
- t.Logf("Added adapter : %+v", added)
+ t.Logf("Added adapter : %+v", TestProxyAdapter.Id)
}
- verifyGotResponse(postAddExecuted)
-
// Verify that the added device can now be retrieved
- d, err := TestProxyRootAdapter.Get(context.Background(), "/adapters/"+TestProxyAdapterID, 0, false, "")
- if err != nil {
+ d := &voltha.Device{}
+ if have, err := TestProxyRootAdapter.Get(context.Background(), "adapters/"+TestProxyAdapterID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to retrieve device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if !reflect.ValueOf(d).IsValid() {
+ } else if !have {
t.Error("Failed to find added adapter")
} else {
djson, _ := json.Marshal(d)
t.Logf("Found adapter: %s", string(djson))
}
- if !verifyGotResponse(preAddExecuted) {
- t.Error("PreAdd callback was not executed")
- }
- if !verifyGotResponse(postAddExecuted) {
- t.Error("PostAdd callback was not executed")
- }
}
func TestProxy_1_2_1_Get_AllDevices(t *testing.T) {
- devices, err := TestProxyRootDevice.Get(context.Background(), "/devices", 1, false, "")
- if err != nil {
+ var devices []*voltha.Device
+ if err := TestProxyRootDevice.List(context.Background(), "devices", &devices); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get all devices info from test proxy due to error: %v", err)
assert.NotNil(t, err)
}
- if len(devices.([]interface{})) == 0 {
+ if len(devices) == 0 {
t.Error("there are no available devices to retrieve")
} else {
// Save the target device id for later tests
- TestProxyTargetDeviceID = devices.([]interface{})[0].(*voltha.Device).Id
+ TestProxyTargetDeviceID = devices[0].Id
t.Logf("retrieved all devices: %+v", devices)
}
}
func TestProxy_1_2_2_Get_SingleDevice(t *testing.T) {
- d, err := TestProxyRootDevice.Get(context.Background(), "/devices/"+TestProxyTargetDeviceID, 0, false, "")
- if err != nil {
+ d := &voltha.Device{}
+ if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyTargetDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get single device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if !reflect.ValueOf(d).IsValid() {
+ } else if !have {
t.Errorf("Failed to find device : %s", TestProxyTargetDeviceID)
} else {
djson, _ := json.Marshal(d)
@@ -272,64 +269,42 @@
func TestProxy_1_3_1_Update_Device(t *testing.T) {
var fwVersion int
- preUpdateExecuted := make(chan struct{})
- postUpdateExecuted := make(chan struct{})
- preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
-
- retrieved, err := TestProxyRootDevice.Get(context.Background(), "/devices/"+TestProxyTargetDeviceID, 1, false, "")
- if err != nil {
+ retrieved := &voltha.Device{}
+ if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyTargetDeviceID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if retrieved == nil {
+ } else if !have {
t.Error("Failed to get device")
} else {
t.Logf("Found raw device (root proxy): %+v", retrieved)
- if retrieved.(*voltha.Device).FirmwareVersion == "n/a" {
+ if retrieved.FirmwareVersion == "n/a" {
fwVersion = 0
} else {
- fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
+ fwVersion, _ = strconv.Atoi(retrieved.FirmwareVersion)
fwVersion++
}
- retrieved.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
+ retrieved.FirmwareVersion = strconv.Itoa(fwVersion)
- TestProxyRootDevice.RegisterCallback(
- PreUpdate,
- commonChanCallback,
- "PreUpdate instructions (root proxy)", &preUpdateExecutedPtr,
- )
- TestProxyRootDevice.RegisterCallback(
- PostUpdate,
- commonChanCallback,
- "PostUpdate instructions (root proxy)", &postUpdateExecutedPtr,
- )
-
- afterUpdate, err := TestProxyRootDevice.Update(context.Background(), "/devices/"+TestProxyTargetDeviceID, retrieved, false, "")
- if err != nil {
+ if err := TestProxyRootDevice.Update(context.Background(), "devices/"+TestProxyTargetDeviceID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to update device info test proxy due to error: %v", err)
assert.NotNil(t, err)
}
- if afterUpdate == nil {
+ afterUpdate := &voltha.Device{}
+ if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyTargetDeviceID, afterUpdate); err != nil {
+ BenchmarkProxyLogger.Errorf("Failed to get device info from test proxy due to error: %v", err)
+ } else if !have {
t.Error("Failed to update device")
} else {
t.Logf("Updated device : %+v", afterUpdate)
}
- if !verifyGotResponse(preUpdateExecuted) {
- t.Error("PreUpdate callback was not executed")
- }
- if !verifyGotResponse(postUpdateExecuted) {
- t.Error("PostUpdate callback was not executed")
- }
-
- d, err := TestProxyRootDevice.Get(context.Background(), "/devices/"+TestProxyTargetDeviceID, 1, false, "")
- if err != nil {
+ d := &voltha.Device{}
+ if have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyTargetDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get device info from test proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if !reflect.ValueOf(d).IsValid() {
+ } else if !have {
t.Error("Failed to find updated device (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -338,143 +313,33 @@
}
}
-func TestProxy_1_3_2_Update_DeviceFlows(t *testing.T) {
- // Get a device proxy and update a specific port
- devFlowsProxy, err := TestProxyRoot.CreateProxy(context.Background(), "/devices/"+TestProxyDeviceID+"/flows", false)
- if err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create device flows proxy")
- }
- flows, err := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get flows from device flows proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- flows.(*openflow_13.Flows).Items[0].TableId = 2244
-
- preUpdateExecuted := make(chan struct{})
- postUpdateExecuted := make(chan struct{})
- preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
-
- devFlowsProxy.RegisterCallback(
- PreUpdate,
- commonChanCallback,
- "PreUpdate instructions (flows proxy)", &preUpdateExecutedPtr,
- )
- devFlowsProxy.RegisterCallback(
- PostUpdate,
- commonChanCallback,
- "PostUpdate instructions (flows proxy)", &postUpdateExecutedPtr,
- )
-
- kvFlows, err := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get flows from device flows proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
-
- if reflect.DeepEqual(flows, kvFlows) {
- t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
- }
-
- updated, err := devFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to update flows in device flows proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- if updated == nil {
- t.Error("Failed to update flow")
- } else {
- t.Logf("Updated flows : %+v", updated)
- }
-
- if !verifyGotResponse(preUpdateExecuted) {
- t.Error("PreUpdate callback was not executed")
- }
- if !verifyGotResponse(postUpdateExecuted) {
- t.Error("PostUpdate callback was not executed")
- }
-
- d, err := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get flows in device flows proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- if d == nil {
- t.Error("Failed to find updated flows (flows proxy)")
- } else {
- djson, _ := json.Marshal(d)
- t.Logf("Found flows (flows proxy): %s", string(djson))
- }
-
- d, err = TestProxyRootDevice.Get(context.Background(), "/devices/"+TestProxyDeviceID+"/flows", 1, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get flows from device flows proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- if !reflect.ValueOf(d).IsValid() {
- t.Error("Failed to find updated flows (root proxy)")
- } else {
- djson, _ := json.Marshal(d)
- t.Logf("Found flows (root proxy): %s", string(djson))
- }
-}
-
func TestProxy_1_3_3_Update_Adapter(t *testing.T) {
- preUpdateExecuted := make(chan struct{})
- postUpdateExecuted := make(chan struct{})
- preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
- adaptersProxy, err := TestProxyRoot.CreateProxy(context.Background(), "/adapters", false)
- if err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create adapters proxy")
- }
- retrieved, err := TestProxyRootAdapter.Get(context.Background(), "/adapters/"+TestProxyAdapterID, 1, false, "")
- if err != nil {
+ adaptersProxy := NewProxy(mockBackend, "/adapters")
+
+ retrieved := &voltha.Adapter{}
+ if have, err := TestProxyRootAdapter.Get(context.Background(), "adapters/"+TestProxyAdapterID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to retrieve adapter info from adapters proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if retrieved == nil {
+ } else if !have {
t.Error("Failed to get adapter")
} else {
t.Logf("Found raw adapter (root proxy): %+v", retrieved)
- retrieved.(*voltha.Adapter).Version = "test-adapter-version-2"
+ retrieved.Version = "test-adapter-version-2"
- adaptersProxy.RegisterCallback(
- PreUpdate,
- commonChanCallback,
- "PreUpdate instructions for adapters", &preUpdateExecutedPtr,
- )
- adaptersProxy.RegisterCallback(
- PostUpdate,
- commonChanCallback,
- "PostUpdate instructions for adapters", &postUpdateExecutedPtr,
- )
-
- afterUpdate, err := adaptersProxy.Update(context.Background(), "/"+TestProxyAdapterID, retrieved, false, "")
- if err != nil {
+ if err := adaptersProxy.Update(context.Background(), TestProxyAdapterID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to update adapter info in adapters proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if afterUpdate == nil {
- t.Error("Failed to update adapter")
} else {
- t.Logf("Updated adapter : %+v", afterUpdate)
+ t.Logf("Updated adapter : %s", retrieved.Id)
}
- if !verifyGotResponse(preUpdateExecuted) {
- t.Error("PreUpdate callback for adapter was not executed")
- }
- if !verifyGotResponse(postUpdateExecuted) {
- t.Error("PostUpdate callback for adapter was not executed")
- }
-
- d, err := TestProxyRootAdapter.Get(context.Background(), "/adapters/"+TestProxyAdapterID, 1, false, "")
- if err != nil {
+ d := &voltha.Adapter{}
+ if have, err := TestProxyRootAdapter.Get(context.Background(), "adapters/"+TestProxyAdapterID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get updated adapter info from adapters proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if !reflect.ValueOf(d).IsValid() {
+ } else if !have {
t.Error("Failed to find updated adapter (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -484,45 +349,19 @@
}
func TestProxy_1_4_1_Remove_Device(t *testing.T) {
- preRemoveExecuted := make(chan struct{})
- postRemoveExecuted := make(chan struct{})
- preRemoveExecutedPtr, postRemoveExecutedPtr := preRemoveExecuted, postRemoveExecuted
-
- TestProxyRootDevice.RegisterCallback(
- PreRemove,
- commonChanCallback,
- "PreRemove instructions (root proxy)", &preRemoveExecutedPtr,
- )
- TestProxyRootDevice.RegisterCallback(
- PostRemove,
- commonChanCallback,
- "PostRemove instructions (root proxy)", &postRemoveExecutedPtr,
- )
-
- removed, err := TestProxyRootDevice.Remove(context.Background(), "/devices/"+TestProxyDeviceID, "")
- if err != nil {
+ if err := TestProxyRootDevice.Remove(context.Background(), "devices/"+TestProxyDeviceID); err != nil {
BenchmarkProxyLogger.Errorf("Failed to remove device from devices proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- if removed == nil {
- t.Error("Failed to remove device")
+ t.Errorf("failed to remove device: %s", err)
} else {
- t.Logf("Removed device : %+v", removed)
+ t.Logf("Removed device : %+v", TestProxyDeviceID)
}
- if !verifyGotResponse(preRemoveExecuted) {
- t.Error("PreRemove callback was not executed")
- }
- if !verifyGotResponse(postRemoveExecuted) {
- t.Error("PostRemove callback was not executed")
- }
-
- d, err := TestProxyRootDevice.Get(context.Background(), "/devices/"+TestProxyDeviceID, 0, false, "")
+ d := &voltha.Device{}
+ have, err := TestProxyRootDevice.Get(context.Background(), "devices/"+TestProxyDeviceID, d)
if err != nil {
BenchmarkProxyLogger.Errorf("Failed to get device info from devices proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if reflect.ValueOf(d).IsValid() {
+ } else if have {
djson, _ := json.Marshal(d)
t.Errorf("Device was not removed - %s", djson)
} else {
@@ -536,82 +375,67 @@
TestProxyLogicalDeviceID = "0001" + hex.EncodeToString(ldIDBin)[:12]
TestProxyLogicalDevice.Id = TestProxyLogicalDeviceID
- preAddExecuted := make(chan struct{})
- postAddExecuted := make(chan struct{})
- preAddExecutedPtr, postAddExecutedPtr := preAddExecuted, postAddExecuted
-
- // Register
- TestProxyRootLogicalDevice.RegisterCallback(PreAdd, commonChanCallback, "PreAdd instructions", &preAddExecutedPtr)
- TestProxyRootLogicalDevice.RegisterCallback(PostAdd, commonChanCallback, "PostAdd instructions", &postAddExecutedPtr)
-
- added, err := TestProxyRootLogicalDevice.Add(context.Background(), "/logical_devices", TestProxyLogicalDevice, "")
- if err != nil {
+ if err := TestProxyRootLogicalDevice.AddWithID(context.Background(), "logical_devices", TestProxyLogicalDeviceID, TestProxyLogicalDevice); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add new logical device into proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if added == nil {
- t.Error("Failed to add logical device")
} else {
- t.Logf("Added logical device : %+v", added)
+ t.Logf("Added logical device : %s", TestProxyLogicalDevice.Id)
}
- verifyGotResponse(postAddExecuted)
-
- ld, err := TestProxyRootLogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxyLogicalDeviceID, 0, false, "")
- if err != nil {
+ ld := &voltha.LogicalDevice{}
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices/"+TestProxyLogicalDeviceID, ld); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get logical device info from logical device proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if !reflect.ValueOf(ld).IsValid() {
+ } else if !have {
t.Error("Failed to find added logical device")
} else {
ldJSON, _ := json.Marshal(ld)
t.Logf("Found logical device: %s", string(ldJSON))
}
-
- if !verifyGotResponse(preAddExecuted) {
- t.Error("PreAdd callback was not executed")
- }
- if !verifyGotResponse(postAddExecuted) {
- t.Error("PostAdd callback was not executed")
- }
}
func TestProxy_2_1_2_Add_ExistingLogicalDevice(t *testing.T) {
TestProxyLogicalDevice.Id = TestProxyLogicalDeviceID
- added, err := TestProxyRootLogicalDevice.Add(context.Background(), "/logical_devices", TestProxyLogicalDevice, "")
- if err != nil {
+ if err := TestProxyRootLogicalDevice.add(context.Background(), "logical_devices", TestProxyLogicalDevice); err != nil {
BenchmarkProxyLogger.Errorf("Failed to add logical device due to error: %v", err)
assert.NotNil(t, err)
}
- if added.(proto.Message).String() != reflect.ValueOf(TestProxyLogicalDevice).Interface().(proto.Message).String() {
- t.Errorf("Logical devices don't match - existing: %+v returned: %+v", TestProxyLogicalDevice, added)
+
+ device := &voltha.LogicalDevice{}
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices", device); err != nil {
+ BenchmarkProxyLogger.Errorf("Failed to get logical device info from logical device proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ } else if !have {
+ t.Error("Failed to find added logical device")
+ } else {
+ if device.String() != TestProxyLogicalDevice.String() {
+ t.Errorf("Logical devices don't match - existing: %+v returned: %+v", TestProxyLogicalDevice, device)
+ }
}
}
func TestProxy_2_2_1_Get_AllLogicalDevices(t *testing.T) {
- logicalDevices, err := TestProxyRootLogicalDevice.Get(context.Background(), "/logical_devices", 1, false, "")
- if err != nil {
+ var logicalDevices []*voltha.LogicalDevice
+ if err := TestProxyRootLogicalDevice.List(context.Background(), "logical_devices", &logicalDevices); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get all logical devices from proxy due to error: %v", err)
assert.NotNil(t, err)
}
- if len(logicalDevices.([]interface{})) == 0 {
+ if len(logicalDevices) == 0 {
t.Error("there are no available logical devices to retrieve")
} else {
// Save the target device id for later tests
- TestProxyTargetLogicalDeviceID = logicalDevices.([]interface{})[0].(*voltha.LogicalDevice).Id
+ TestProxyTargetLogicalDeviceID = logicalDevices[0].Id
t.Logf("retrieved all logical devices: %+v", logicalDevices)
}
}
func TestProxy_2_2_2_Get_SingleLogicalDevice(t *testing.T) {
- ld, err := TestProxyRootLogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxyTargetLogicalDeviceID, 0, false, "")
- if err != nil {
+ ld := &voltha.LogicalDevice{}
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices/"+TestProxyTargetLogicalDeviceID, ld); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get single logical device from proxy due to error: %v", err)
assert.NotNil(t, err)
- }
- if !reflect.ValueOf(ld).IsValid() {
+ } else if !have {
t.Errorf("Failed to find logical device : %s", TestProxyTargetLogicalDeviceID)
} else {
ldJSON, _ := json.Marshal(ld)
@@ -622,222 +446,61 @@
func TestProxy_2_3_1_Update_LogicalDevice(t *testing.T) {
var fwVersion int
- preUpdateExecuted := make(chan struct{})
- postUpdateExecuted := make(chan struct{})
- preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
- retrieved, err := TestProxyRootLogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxyTargetLogicalDeviceID, 1, false, "")
- if err != nil {
+ retrieved := &voltha.LogicalDevice{}
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices/"+TestProxyTargetLogicalDeviceID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get logical devices due to error: %v", err)
assert.NotNil(t, err)
- }
- if retrieved == nil {
+ } else if !have {
t.Error("Failed to get logical device")
} else {
t.Logf("Found raw logical device (root proxy): %+v", retrieved)
- if retrieved.(*voltha.LogicalDevice).RootDeviceId == "" {
+ if retrieved.RootDeviceId == "" {
fwVersion = 0
} else {
- fwVersion, _ = strconv.Atoi(retrieved.(*voltha.LogicalDevice).RootDeviceId)
+ fwVersion, _ = strconv.Atoi(retrieved.RootDeviceId)
fwVersion++
}
- TestProxyRootLogicalDevice.RegisterCallback(
- PreUpdate,
- commonChanCallback,
- "PreUpdate instructions (root proxy)", &preUpdateExecutedPtr,
- )
- TestProxyRootLogicalDevice.RegisterCallback(
- PostUpdate,
- commonChanCallback,
- "PostUpdate instructions (root proxy)", &postUpdateExecutedPtr,
- )
+ retrieved.RootDeviceId = strconv.Itoa(fwVersion)
- retrieved.(*voltha.LogicalDevice).RootDeviceId = strconv.Itoa(fwVersion)
-
- afterUpdate, err := TestProxyRootLogicalDevice.Update(context.Background(), "/logical_devices/"+TestProxyTargetLogicalDeviceID, retrieved, false,
- "")
- if err != nil {
+ if err := TestProxyRootLogicalDevice.Update(context.Background(), "logical_devices/"+TestProxyTargetLogicalDeviceID, retrieved); err != nil {
BenchmarkProxyLogger.Errorf("Faield to update logical device info due to error: %v", err)
assert.NotNil(t, err)
- }
- if afterUpdate == nil {
- t.Error("Failed to update logical device")
} else {
- t.Logf("Updated logical device : %+v", afterUpdate)
+ t.Log("Updated logical device")
}
- if !verifyGotResponse(preUpdateExecuted) {
- t.Error("PreUpdate callback was not executed")
- }
- if !verifyGotResponse(postUpdateExecuted) {
- t.Error("PostUpdate callback was not executed")
- }
-
- d, err := TestProxyRootLogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxyTargetLogicalDeviceID, 1, false, "")
- if err != nil {
+ d := &voltha.LogicalDevice{}
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices/"+TestProxyTargetLogicalDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get logical device info due to error: %v", err)
assert.NotNil(t, err)
- }
- if !reflect.ValueOf(d).IsValid() {
+ } else if !have {
t.Error("Failed to find updated logical device (root proxy)")
} else {
djson, _ := json.Marshal(d)
-
t.Logf("Found logical device (root proxy): %s raw: %+v", string(djson), d)
}
}
}
-func TestProxy_2_3_2_Update_LogicalDeviceFlows(t *testing.T) {
- // Get a device proxy and update a specific port
- ldFlowsProxy, err := TestProxyRoot.CreateProxy(context.Background(), "/logical_devices/"+TestProxyLogicalDeviceID+"/flows", false)
- if err != nil {
- log.With(log.Fields{"error": err}).Fatal("Failed to create logical device flows proxy")
- }
- flows, err := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get flows from logical device flows proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- flows.(*openflow_13.Flows).Items[0].TableId = rand.Uint32()
- t.Logf("before updated flows: %+v", flows)
-
- ldFlowsProxy.RegisterCallback(
- PreUpdate,
- commonCallback2,
- )
- ldFlowsProxy.RegisterCallback(
- PostUpdate,
- commonCallback2,
- )
-
- kvFlows, err := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Faield to get flows from logical device flows proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- if reflect.DeepEqual(flows, kvFlows) {
- t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
- }
-
- updated, err := ldFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to update flows in logical device flows proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- if updated == nil {
- t.Error("Failed to update logical device flows")
- } else {
- t.Logf("Updated logical device flows : %+v", updated)
- }
-
- if d, _ := ldFlowsProxy.Get(context.Background(), "/", 0, false, ""); d == nil {
- t.Error("Failed to find updated logical device flows (flows proxy)")
- } else {
- djson, _ := json.Marshal(d)
- t.Logf("Found flows (flows proxy): %s", string(djson))
- }
-
- d, err := TestProxyRootLogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxyLogicalDeviceID+"/flows", 0, false,
- "")
- if err != nil {
- BenchmarkProxyLogger.Errorf("Failed to get flows from logical device flows proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- if !reflect.ValueOf(d).IsValid() {
- t.Error("Failed to find updated logical device flows (root proxy)")
- } else {
- djson, _ := json.Marshal(d)
- t.Logf("Found logical device flows (root proxy): %s", string(djson))
- }
-}
-
func TestProxy_2_4_1_Remove_Device(t *testing.T) {
- preRemoveExecuted := make(chan struct{})
- postRemoveExecuted := make(chan struct{})
- preRemoveExecutedPtr, postRemoveExecutedPtr := preRemoveExecuted, postRemoveExecuted
-
- TestProxyRootLogicalDevice.RegisterCallback(
- PreRemove,
- commonChanCallback,
- "PreRemove instructions (root proxy)", &preRemoveExecutedPtr,
- )
- TestProxyRootLogicalDevice.RegisterCallback(
- PostRemove,
- commonChanCallback,
- "PostRemove instructions (root proxy)", &postRemoveExecutedPtr,
- )
-
- removed, err := TestProxyRootLogicalDevice.Remove(context.Background(), "/logical_devices/"+TestProxyLogicalDeviceID, "")
- if err != nil {
+ if err := TestProxyRootLogicalDevice.Remove(context.Background(), "logical_devices/"+TestProxyLogicalDeviceID); err != nil {
BenchmarkProxyLogger.Errorf("Failed to remove device from logical devices proxy due to error: %v", err)
- assert.NotNil(t, err)
- }
- if removed == nil {
- t.Error("Failed to remove logical device")
+ t.Errorf("Failed to remove logical device: %s", err)
} else {
- t.Logf("Removed device : %+v", removed)
+ t.Logf("Removed device : %+v", TestProxyLogicalDeviceID)
}
- if !verifyGotResponse(preRemoveExecuted) {
- t.Error("PreRemove callback was not executed")
- }
- if !verifyGotResponse(postRemoveExecuted) {
- t.Error("PostRemove callback was not executed")
- }
-
- d, err := TestProxyRootLogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxyLogicalDeviceID, 0, false, "")
- if err != nil {
+ d := &voltha.LogicalDevice{}
+ if have, err := TestProxyRootLogicalDevice.Get(context.Background(), "logical_devices/"+TestProxyLogicalDeviceID, d); err != nil {
BenchmarkProxyLogger.Errorf("Failed to get logical device info due to error: %v", err)
assert.NotNil(t, err)
- }
- if reflect.ValueOf(d).IsValid() {
+ } else if have {
djson, _ := json.Marshal(d)
t.Errorf("Device was not removed - %s", djson)
} else {
t.Logf("Device was removed: %s", TestProxyLogicalDeviceID)
}
}
-
-// -----------------------------
-// Callback tests
-// -----------------------------
-
-func TestProxy_Callbacks_1_Register(t *testing.T) {
- TestProxyRootDevice.RegisterCallback(PreAdd, firstCallback, "abcde", "12345")
-
- m := make(map[string]string)
- m["name"] = "fghij"
- TestProxyRootDevice.RegisterCallback(PreAdd, secondCallback, m, 1.2345)
-
- d := &voltha.Device{Id: "12345"}
- TestProxyRootDevice.RegisterCallback(PreAdd, thirdCallback, "klmno", d)
-}
-
-func TestProxy_Callbacks_2_Invoke_WithNoInterruption(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- TestProxyRootDevice.InvokeCallbacks(ctx, PreAdd, false, nil)
-}
-
-func TestProxy_Callbacks_3_Invoke_WithInterruption(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- TestProxyRootDevice.InvokeCallbacks(ctx, PreAdd, true, nil)
-}
-
-func TestProxy_Callbacks_4_Unregister(t *testing.T) {
- TestProxyRootDevice.UnregisterCallback(PreAdd, firstCallback)
- TestProxyRootDevice.UnregisterCallback(PreAdd, secondCallback)
- TestProxyRootDevice.UnregisterCallback(PreAdd, thirdCallback)
-}
-
-//func TestProxy_Callbacks_5_Add(t *testing.T) {
-// TestProxyRootDevice.Root.AddCallback(TestProxyRootDevice.InvokeCallbacks, PostUpdate, false, "some data", "some new data")
-//}
-//
-//func TestProxy_Callbacks_6_Execute(t *testing.T) {
-// TestProxyRootDevice.Root.ExecuteCallbacks()
-//}
diff --git a/db/model/revision.go b/db/model/revision.go
deleted file mode 100644
index 77c1c02..0000000
--- a/db/model/revision.go
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "context"
- "time"
-
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
-)
-
-// Revision -
-type Revision interface {
- Finalize(context.Context, bool)
- SetConfig(revision *DataRevision)
- GetConfig() *DataRevision
- Drop(txid string, includeConfig bool)
- StorageDrop(ctx context.Context, txid string, includeConfig bool)
- ChildDrop(childType string, childHash string)
- ChildDropByName(childName string)
- SetChildren(name string, children []Revision)
- GetChildren(name string) []Revision
- SetAllChildren(children map[string][]Revision)
- GetAllChildren() map[string][]Revision
- SetHash(hash string)
- GetHash() string
- ClearHash()
- getVersion() int64
- SetupWatch(ctx context.Context, key string)
- SetName(name string)
- GetName() string
- SetBranch(branch *Branch)
- GetBranch() *Branch
- Get(int) interface{}
- GetData() interface{}
- getNode() *node
- SetLastUpdate(ts ...time.Time)
- GetLastUpdate() time.Time
- LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error)
- UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision
- UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision
- UpdateAllChildren(ctx context.Context, children map[string][]Revision, branch *Branch) Revision
-}
diff --git a/db/model/root.go b/db/model/root.go
deleted file mode 100644
index c3b932e..0000000
--- a/db/model/root.go
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "context"
- "encoding/hex"
- "encoding/json"
- "reflect"
- "sync"
-
- "github.com/golang/protobuf/proto"
- "github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v3/pkg/db"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
-)
-
-// Root is used to provide an abstraction to the base root structure
-type Root interface {
- Node
-
- ExecuteCallbacks(ctx context.Context)
- AddCallback(callback CallbackFunction, args ...interface{})
- AddNotificationCallback(callback CallbackFunction, args ...interface{})
-}
-
-// root points to the top of the data model tree or sub-tree identified by a proxy
-type root struct {
- *node
-
- Callbacks []CallbackTuple
- NotificationCallbacks []CallbackTuple
-
- DirtyNodes map[string][]*node
- KvStore *db.Backend
- Loading bool
- RevisionClass interface{}
-
- mutex sync.RWMutex
-}
-
-// NewRoot creates an new instance of a root object
-func NewRoot(initialData interface{}, kvStore *db.Backend) Root {
- root := &root{}
-
- root.KvStore = kvStore
- root.DirtyNodes = make(map[string][]*node)
- root.Loading = false
-
- // If there is no storage in place just revert to
- // a non persistent mechanism
- if kvStore != nil {
- root.RevisionClass = reflect.TypeOf(PersistedRevision{})
- } else {
- root.RevisionClass = reflect.TypeOf(NonPersistedRevision{})
- }
-
- root.Callbacks = []CallbackTuple{}
- root.NotificationCallbacks = []CallbackTuple{}
-
- root.node = newNode(root, initialData, false, "")
-
- return root
-}
-
-// MakeTxBranch creates a new transaction branch
-func (r *root) MakeTxBranch() string {
- txidBin, _ := uuid.New().MarshalBinary()
- txid := hex.EncodeToString(txidBin)[:12]
-
- r.DirtyNodes[txid] = []*node{r.node}
- r.node.MakeBranch(txid)
-
- return txid
-}
-
-// DeleteTxBranch removes a transaction branch
-func (r *root) DeleteTxBranch(txid string) {
- for _, dirtyNode := range r.DirtyNodes[txid] {
- dirtyNode.DeleteBranch(txid)
- }
- delete(r.DirtyNodes, txid)
- r.node.DeleteBranch(txid)
-}
-
-// FoldTxBranch will merge the contents of a transaction branch with the root object
-func (r *root) FoldTxBranch(ctx context.Context, txid string) {
- // Start by doing a dry run of the merge
- // If that fails, it bails out and the branch is deleted
- if _, err := r.node.MergeBranch(ctx, txid, true); err != nil {
- // Merge operation fails
- r.DeleteTxBranch(txid)
- } else {
- if _, err = r.node.MergeBranch(ctx, txid, false); err != nil {
- logger.Errorw("Unable to integrate the contents of a transaction branch within the latest branch of a given node", log.Fields{"error": err})
- }
- r.node.GetRoot().ExecuteCallbacks(ctx)
- r.DeleteTxBranch(txid)
- }
-}
-
-// ExecuteCallbacks will invoke all the callbacks linked to root object
-func (r *root) ExecuteCallbacks(ctx context.Context) {
- r.mutex.Lock()
- defer r.mutex.Unlock()
-
- for len(r.Callbacks) > 0 {
- callback := r.Callbacks[0]
- r.Callbacks = r.Callbacks[1:]
- go callback.Execute(ctx, nil)
- }
- //for len(r.NotificationCallbacks) > 0 {
- // callback := r.NotificationCallbacks[0]
- // r.NotificationCallbacks = r.NotificationCallbacks[1:]
- // go callback.Execute(nil)
- //}
-}
-
-// getCallbacks returns the available callbacks
-func (r *root) GetCallbacks() []CallbackTuple {
- r.mutex.Lock()
- defer r.mutex.Unlock()
-
- return r.Callbacks
-}
-
-// getCallbacks returns the available notification callbacks
-func (r *root) GetNotificationCallbacks() []CallbackTuple {
- r.mutex.Lock()
- defer r.mutex.Unlock()
-
- return r.NotificationCallbacks
-}
-
-// AddCallback inserts a new callback with its arguments
-func (r *root) AddCallback(callback CallbackFunction, args ...interface{}) {
- r.mutex.Lock()
- defer r.mutex.Unlock()
-
- r.Callbacks = append(r.Callbacks, CallbackTuple{callback, args})
-}
-
-// AddNotificationCallback inserts a new notification callback with its arguments
-func (r *root) AddNotificationCallback(callback CallbackFunction, args ...interface{}) {
- r.mutex.Lock()
- defer r.mutex.Unlock()
-
- r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
-}
-
-func (r *root) syncParent(ctx context.Context, childRev Revision, txid string) {
- data := proto.Clone(r.GetProxy().ParentNode.Latest().GetData().(proto.Message))
-
- for fieldName := range ChildrenFields(data) {
- childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
- if reflect.TypeOf(childRev.GetData()) == reflect.TypeOf(childDataHolder.Interface()) {
- childDataHolder = reflect.ValueOf(childRev.GetData())
- reflect.ValueOf(data).Elem().FieldByName(childDataName).Set(childDataHolder)
- }
- }
-
- r.GetProxy().ParentNode.Latest().SetConfig(NewDataRevision(r.GetProxy().ParentNode.GetRoot(), data))
- r.GetProxy().ParentNode.Latest(txid).Finalize(ctx, false)
-}
-
-// Update modifies the content of an object at a given path with the provided data
-func (r *root) Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
- var result Revision
-
- if txid != "" {
- trackDirty := func(node *node) *Branch {
- r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
- return node.MakeBranch(txid)
- }
- result = r.node.Update(ctx, path, data, strict, txid, trackDirty)
- } else {
- result = r.node.Update(ctx, path, data, strict, "", nil)
- }
-
- if result != nil {
- if r.GetProxy().FullPath != r.GetProxy().Path {
- r.syncParent(ctx, result, txid)
- } else {
- result.Finalize(ctx, false)
- }
- }
-
- r.node.GetRoot().ExecuteCallbacks(ctx)
-
- return result
-}
-
-// Add creates a new object at the given path with the provided data
-func (r *root) Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
- var result Revision
-
- if txid != "" {
- trackDirty := func(node *node) *Branch {
- r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
- return node.MakeBranch(txid)
- }
- result = r.node.Add(ctx, path, data, txid, trackDirty)
- } else {
- result = r.node.Add(ctx, path, data, "", nil)
- }
-
- if result != nil {
- result.Finalize(ctx, true)
- r.node.GetRoot().ExecuteCallbacks(ctx)
- }
- return result
-}
-
-// Remove discards an object at a given path
-func (r *root) Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision {
- var result Revision
-
- if txid != "" {
- trackDirty := func(node *node) *Branch {
- r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
- return node.MakeBranch(txid)
- }
- result = r.node.Remove(ctx, path, txid, trackDirty)
- } else {
- result = r.node.Remove(ctx, path, "", nil)
- }
-
- r.node.GetRoot().ExecuteCallbacks(ctx)
-
- return result
-}
-
-// MakeLatest updates a branch with the latest node revision
-func (r *root) MakeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
- r.makeLatest(ctx, branch, revision, changeAnnouncement)
-}
-
-func (r *root) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
- if r.RevisionClass.(reflect.Type) == reflect.TypeOf(PersistedRevision{}) {
- return NewPersistedRevision(branch, data, children)
- }
-
- return NewNonPersistedRevision(r, branch, data, children)
-}
-
-func (r *root) makeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
- r.node.makeLatest(branch, revision, changeAnnouncement)
-
- if r.KvStore != nil && branch.Txid == "" {
- tags := make(map[string]string)
- for k, v := range r.node.Tags {
- tags[k] = v.GetHash()
- }
- data := &rootData{
- Latest: branch.GetLatest().GetHash(),
- Tags: tags,
- }
- if blob, err := json.Marshal(data); err != nil {
- // TODO report error
- } else {
- logger.Debugf("Changing root to : %s", string(blob))
- if err := r.KvStore.Put(ctx, "root", blob); err != nil {
- logger.Errorf("failed to properly put value in kvstore - err: %s", err.Error())
- }
- }
- }
-}
-
-type rootData struct {
- Latest string `json:"latest"`
- Tags map[string]string `json:"tags"`
-}
diff --git a/db/model/transaction.go b/db/model/transaction.go
deleted file mode 100644
index 670525d..0000000
--- a/db/model/transaction.go
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "context"
- "fmt"
-)
-
-// Transaction -
-type Transaction struct {
- proxy *Proxy
- txid string
-}
-
-// NewTransaction -
-func NewTransaction(proxy *Proxy, txid string) *Transaction {
- tx := &Transaction{
- proxy: proxy,
- txid: txid,
- }
- return tx
-}
-func (t *Transaction) Get(ctx context.Context, path string, depth int, deep bool) (interface{}, error) {
- if t.txid == "" {
- logger.Errorf("closed transaction")
- return nil, fmt.Errorf("closed transaction")
- }
- // TODO: need to review the return values at the different layers!!!!!
- return t.proxy.Get(ctx, path, depth, deep, t.txid)
-}
-func (t *Transaction) Update(ctx context.Context, path string, data interface{}, strict bool) (interface{}, error) {
- if t.txid == "" {
- logger.Errorf("closed transaction")
- return nil, fmt.Errorf("closed transaction")
- }
- return t.proxy.Update(ctx, path, data, strict, t.txid)
-}
-func (t *Transaction) Add(ctx context.Context, path string, data interface{}) (interface{}, error) {
- if t.txid == "" {
- logger.Errorf("closed transaction")
- return nil, fmt.Errorf("closed transaction")
- }
- return t.proxy.Add(ctx, path, data, t.txid)
-}
-func (t *Transaction) Remove(ctx context.Context, path string) (interface{}, error) {
- if t.txid == "" {
- logger.Errorf("closed transaction")
- return nil, fmt.Errorf("closed transaction")
- }
- return t.proxy.Remove(ctx, path, t.txid)
-}
-
-// Cancel -
-func (t *Transaction) Cancel() {
- t.proxy.cancelTransaction(t.txid)
- t.txid = ""
-}
-
-// Commit -
-func (t *Transaction) Commit(ctx context.Context) {
- t.proxy.commitTransaction(ctx, t.txid)
- t.txid = ""
-}
diff --git a/db/model/transaction_test.go b/db/model/transaction_test.go
deleted file mode 100644
index 4e1346b..0000000
--- a/db/model/transaction_test.go
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "context"
- "encoding/hex"
- "strconv"
- "testing"
- "time"
-
- "github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- "github.com/opencord/voltha-protos/v3/go/common"
- "github.com/opencord/voltha-protos/v3/go/voltha"
- "github.com/stretchr/testify/assert"
-)
-
-var (
- TestTransactionRoot Root
- TestTransactionRootProxy *Proxy
- TestTransactionTargetDeviceID string
- TestTransactionDeviceID string
-)
-
-func init() {
- var err error
- TestTransactionRoot = NewRoot(&voltha.Voltha{}, nil)
- if TestTransactionRootProxy, err = TestTransactionRoot.CreateProxy(context.Background(), "/", false); err != nil {
- log.With(log.Fields{"error": err}).Fatal("Cannot create proxy")
- }
-}
-
-func TestTransaction_2_AddDevice(t *testing.T) {
- devIDBin, _ := uuid.New().MarshalBinary()
- TestTransactionDeviceID = "0001" + hex.EncodeToString(devIDBin)[:12]
-
- ports := []*voltha.Port{
- {
- PortNo: 123,
- Label: "test-port-0",
- Type: voltha.Port_PON_OLT,
- AdminState: common.AdminState_ENABLED,
- OperStatus: common.OperStatus_ACTIVE,
- DeviceId: "etcd_port-0-device-id",
- Peers: []*voltha.Port_PeerPort{},
- },
- }
-
- device := &voltha.Device{
- Id: TestTransactionDeviceID,
- Type: "simulated_olt",
- Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
- AdminState: voltha.AdminState_PREPROVISIONED,
- Ports: ports,
- }
-
- addTx := TestTransactionRootProxy.OpenTransaction()
-
- added, err := addTx.Add(context.Background(), "/devices", device)
- if err != nil {
- logger.Errorf("Failed to add device due to error %v", err)
- assert.NotNil(t, err)
- }
- if added == nil {
- t.Error("Failed to add device")
- } else {
- TestTransactionTargetDeviceID = added.(*voltha.Device).Id
- t.Logf("Added device : %+v", added)
- }
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- addTx.Commit(ctx)
-}
-
-func TestTransaction_3_GetDevice_PostAdd(t *testing.T) {
-
- basePath := "/devices/" + TestTransactionDeviceID
-
- getDevWithPortsTx := TestTransactionRootProxy.OpenTransaction()
- device1, err := getDevWithPortsTx.Get(context.Background(), basePath+"/ports", 1, false)
- if err != nil {
- logger.Errorf("Failed to get device with ports due to error %v", err)
- assert.NotNil(t, err)
- }
- t.Logf("retrieved device with ports: %+v", device1)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- getDevWithPortsTx.Commit(ctx)
-
- getDevTx := TestTransactionRootProxy.OpenTransaction()
- device2, err := getDevTx.Get(context.Background(), basePath, 0, false)
- if err != nil {
- logger.Errorf("Failed to open transaction due to error %v", err)
- assert.NotNil(t, err)
- }
- t.Logf("retrieved device: %+v", device2)
-
- getDevTx.Commit(ctx)
-}
-
-func TestTransaction_4_UpdateDevice(t *testing.T) {
- updateTx := TestTransactionRootProxy.OpenTransaction()
- if retrieved, err := updateTx.Get(context.Background(), "/devices/"+TestTransactionTargetDeviceID, 1, false); err != nil {
- logger.Errorf("Failed to retrieve device info due to error %v", err)
- assert.NotNil(t, err)
- } else if retrieved == nil {
- t.Error("Failed to get device")
- } else {
- var fwVersion int
- if retrieved.(*voltha.Device).FirmwareVersion == "n/a" {
- fwVersion = 0
- } else {
- fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
- fwVersion++
- }
-
- //cloned := reflect.ValueOf(retrieved).Elem().Interface().(voltha.Device)
- retrieved.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
- t.Logf("Before update : %+v", retrieved)
-
- // FIXME: The makeBranch passed in function is nil or not being executed properly!!!!!
- afterUpdate, err := updateTx.Update(context.Background(), "/devices/"+TestTransactionTargetDeviceID, retrieved, false)
- if err != nil {
- logger.Errorf("Failed to update device info due to error %v", err)
- assert.NotNil(t, err)
- }
- if afterUpdate == nil {
- t.Error("Failed to update device")
- } else {
- t.Logf("Updated device : %+v", afterUpdate)
- }
- }
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- updateTx.Commit(ctx)
-}
-
-func TestTransaction_5_GetDevice_PostUpdate(t *testing.T) {
-
- basePath := "/devices/" + TestTransactionDeviceID
-
- getDevWithPortsTx := TestTransactionRootProxy.OpenTransaction()
- device1, err := getDevWithPortsTx.Get(context.Background(), basePath+"/ports", 1, false)
- if err != nil {
- logger.Errorf("Failed to device with ports info due to error %v", err)
- assert.NotNil(t, err)
- }
- t.Logf("retrieved device with ports: %+v", device1)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- getDevWithPortsTx.Commit(ctx)
-
- getDevTx := TestTransactionRootProxy.OpenTransaction()
- device2, err := getDevTx.Get(context.Background(), basePath, 0, false)
- if err != nil {
- logger.Errorf("Failed to get device info due to error %v", err)
- assert.NotNil(t, err)
- }
- t.Logf("retrieved device: %+v", device2)
-
- getDevTx.Commit(ctx)
-}
-
-func TestTransaction_6_RemoveDevice(t *testing.T) {
- removeTx := TestTransactionRootProxy.OpenTransaction()
- removed, err := removeTx.Remove(context.Background(), "/devices/"+TestTransactionDeviceID)
- if err != nil {
- logger.Errorf("Failed to remove device due to error %v", err)
- assert.NotNil(t, err)
- }
- if removed == nil {
- t.Error("Failed to remove device")
- } else {
- t.Logf("Removed device : %+v", removed)
- }
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- removeTx.Commit(ctx)
-}
-
-func TestTransaction_7_GetDevice_PostRemove(t *testing.T) {
-
- basePath := "/devices/" + TestTransactionDeviceID
-
- getDevTx := TestTransactionRootProxy.OpenTransaction()
- device, err := TestTransactionRootProxy.Get(context.Background(), basePath, 0, false, "")
- if err != nil {
- logger.Errorf("Failed to get device info post remove due to error %v", err)
- assert.NotNil(t, err)
- }
- t.Logf("retrieved device: %+v", device)
-
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- getDevTx.Commit(ctx)
-}
diff --git a/db/model/utils.go b/db/model/utils.go
deleted file mode 100644
index 6d1b2d5..0000000
--- a/db/model/utils.go
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "reflect"
- "strings"
-)
-
-// IsProtoMessage determines if the specified implements proto.Message type
-func IsProtoMessage(object interface{}) bool {
- var ok = false
-
- if object != nil {
- st := reflect.TypeOf(object)
- _, ok = st.MethodByName("ProtoMessage")
- }
- return ok
-}
-
-// FindOwnerType will traverse a data structure and find the parent type of the specified object
-func FindOwnerType(obj reflect.Value, name string, depth int, found bool) reflect.Type {
- prefix := ""
- for d := 0; d < depth; d++ {
- prefix += ">>"
- }
- k := obj.Kind()
- switch k {
- case reflect.Ptr:
- if found {
- return obj.Type()
- }
-
- t := obj.Type().Elem()
- n := reflect.New(t)
-
- if rc := FindOwnerType(n.Elem(), name, depth+1, found); rc != nil {
- return rc
- }
-
- case reflect.Struct:
- if found {
- return obj.Type()
- }
-
- for i := 0; i < obj.NumField(); i++ {
- v := reflect.Indirect(obj)
-
- json := strings.Split(v.Type().Field(i).Tag.Get("json"), ",")
-
- if json[0] == name {
- return FindOwnerType(obj.Field(i), name, depth+1, true)
- }
-
- if rc := FindOwnerType(obj.Field(i), name, depth+1, found); rc != nil {
- return rc
- }
- }
- case reflect.Slice:
- s := reflect.MakeSlice(obj.Type(), 1, 1)
- n := reflect.New(obj.Type())
- n.Elem().Set(s)
-
- for i := 0; i < n.Elem().Len(); i++ {
- if found {
- return reflect.ValueOf(n.Elem().Index(i).Interface()).Type()
- }
- }
-
- for i := 0; i < obj.Len(); i++ {
- if found {
- return obj.Index(i).Type()
- }
-
- if rc := FindOwnerType(obj.Index(i), name, depth+1, found); rc != nil {
- return rc
- }
- }
- default:
- }
-
- return nil
-}
-
-// FindKeyOwner will traverse a structure to find the owner type of the specified name
-func FindKeyOwner(iface interface{}, name string, depth int) interface{} {
- obj := reflect.ValueOf(iface)
- k := obj.Kind()
- switch k {
- case reflect.Ptr:
- t := obj.Type().Elem()
- n := reflect.New(t)
-
- if rc := FindKeyOwner(n.Elem().Interface(), name, depth+1); rc != nil {
- return rc
- }
-
- case reflect.Struct:
- for i := 0; i < obj.NumField(); i++ {
- json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
-
- if json[0] == name {
- return obj.Type().Field(i).Type
- }
-
- if rc := FindKeyOwner(obj.Field(i).Interface(), name, depth+1); rc != nil {
- return rc
- }
- }
-
- case reflect.Slice:
- s := reflect.MakeSlice(obj.Type(), 1, 1)
- n := reflect.New(obj.Type())
- n.Elem().Set(s)
-
- for i := 0; i < n.Elem().Len(); i++ {
- if rc := FindKeyOwner(n.Elem().Index(i).Interface(), name, depth+1); rc != nil {
- return rc
- }
- }
- default:
- }
-
- return nil
-}
-
-// GetAttributeValue traverse a structure to find the value of an attribute
-// FIXME: Need to figure out if GetAttributeValue and GetAttributeStructure can become one
-// Code is repeated in both, but outputs have a different purpose
-// Left as-is for now to get things working
-func GetAttributeValue(data interface{}, name string, depth int) (string, reflect.Value) {
- var attribName string
- var attribValue reflect.Value
- obj := reflect.ValueOf(data)
-
- if !obj.IsValid() {
- return attribName, attribValue
- }
-
- k := obj.Kind()
- switch k {
- case reflect.Ptr:
- if obj.IsNil() {
- return attribName, attribValue
- }
-
- if attribName, attribValue = GetAttributeValue(obj.Elem().Interface(), name, depth+1); attribValue.IsValid() {
- return attribName, attribValue
- }
-
- case reflect.Struct:
- for i := 0; i < obj.NumField(); i++ {
- json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
-
- if json[0] == name {
- return obj.Type().Field(i).Name, obj.Field(i)
- }
-
- if obj.Field(i).IsValid() {
- if attribName, attribValue = GetAttributeValue(obj.Field(i).Interface(), name, depth+1); attribValue.IsValid() {
- return attribName, attribValue
- }
- }
- }
-
- case reflect.Slice:
- s := reflect.MakeSlice(obj.Type(), 1, 1)
- n := reflect.New(obj.Type())
- n.Elem().Set(s)
-
- for i := 0; i < obj.Len(); i++ {
- if attribName, attribValue = GetAttributeValue(obj.Index(i).Interface(), name, depth+1); attribValue.IsValid() {
- return attribName, attribValue
- }
- }
- default:
- }
-
- return attribName, attribValue
-
-}
-
-// GetAttributeStructure will traverse a structure to find the data structure for the named attribute
-// FIXME: See GetAttributeValue(...) comment
-func GetAttributeStructure(data interface{}, name string, depth int) reflect.StructField {
- var result reflect.StructField
- obj := reflect.ValueOf(data)
-
- if !obj.IsValid() {
- return result
- }
-
- k := obj.Kind()
- switch k {
- case reflect.Ptr:
- t := obj.Type().Elem()
- n := reflect.New(t)
-
- if rc := GetAttributeStructure(n.Elem().Interface(), name, depth+1); rc.Name != "" {
- return rc
- }
-
- case reflect.Struct:
- for i := 0; i < obj.NumField(); i++ {
- v := reflect.Indirect(obj)
- json := strings.Split(obj.Type().Field(i).Tag.Get("json"), ",")
-
- if json[0] == name {
- return v.Type().Field(i)
- }
-
- if obj.Field(i).IsValid() {
- if rc := GetAttributeStructure(obj.Field(i).Interface(), name, depth+1); rc.Name != "" {
- return rc
- }
- }
- }
-
- case reflect.Slice:
- s := reflect.MakeSlice(obj.Type(), 1, 1)
- n := reflect.New(obj.Type())
- n.Elem().Set(s)
-
- for i := 0; i < obj.Len(); i++ {
- if rc := GetAttributeStructure(obj.Index(i).Interface(), name, depth+1); rc.Name != "" {
- return rc
- }
-
- }
- default:
- }
-
- return result
-
-}
diff --git a/db/model/utils_test.go b/db/model/utils_test.go
deleted file mode 100644
index 7b6f5cf..0000000
--- a/db/model/utils_test.go
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-protos/v3/go/common"
- "github.com/opencord/voltha-protos/v3/go/openflow_13"
- "github.com/opencord/voltha-protos/v3/go/voltha"
- "github.com/stretchr/testify/assert"
- "reflect"
- "testing"
-)
-
-const (
- testValidValPorts = "ports"
- testValidValItems = "items"
- testInvalidVal = "invalid_val"
- wantResultPorts = "Ports"
- wantResultItems = "Items"
-)
-
-var (
- TestUtilsPort = []*voltha.Port{
- {
- PortNo: 123,
- Label: "test-etcd_port-0",
- Type: voltha.Port_PON_OLT,
- AdminState: common.AdminState_ENABLED,
- OperStatus: common.OperStatus_ACTIVE,
- DeviceId: "etcd_port-0-device-id",
- Peers: []*voltha.Port_PeerPort{},
- },
- }
-
- TestUtilsStats = &openflow_13.OfpFlowStats{
- Id: 1111,
- }
-
- TestUtilsFlows = &openflow_13.Flows{
- Items: []*openflow_13.OfpFlowStats{TestProxyStats},
- }
-
- TestUtilsDevice = &voltha.Device{
- Id: "Config-Node-1",
- Type: "simulated_olt",
- Root: true,
- ParentId: "",
- ParentPortNo: 0,
- Vendor: "voltha-test",
- Model: "Modelxx",
- HardwareVersion: "0.0.1",
- FirmwareVersion: "0.0.1",
- Images: &voltha.Images{},
- SerialNumber: "1234567890",
- VendorId: "XXBB-INC",
- Adapter: "simulated_olt",
- Vlan: 1234,
- Address: &voltha.Device_HostAndPort{HostAndPort: "127.0.0.1:4321"},
- ExtraArgs: "",
- ProxyAddress: &voltha.Device_ProxyAddress{},
- AdminState: voltha.AdminState_PREPROVISIONED,
- OperStatus: common.OperStatus_ACTIVE,
- Reason: "",
- ConnectStatus: common.ConnectStatus_REACHABLE,
- Custom: &any.Any{},
- Ports: TestUtilsPort,
- Flows: TestUtilsFlows,
- FlowGroups: &openflow_13.FlowGroups{},
- PmConfigs: &voltha.PmConfigs{},
- ImageDownloads: []*voltha.ImageDownload{},
- }
-
- TestUtilsEmpty interface{}
-)
-
-func TestIsProtoMessage(t *testing.T) {
- result := IsProtoMessage(TestUtilsDevice)
- assert.True(t, result)
-
- result1 := IsProtoMessage(TestUtilsEmpty)
- assert.False(t, result1)
-}
-
-func TestFindOwnerType(t *testing.T) {
- result := FindOwnerType(reflect.ValueOf(TestUtilsDevice), testValidValPorts, 0, false)
- if result != reflect.TypeOf(&voltha.Port{}) {
- t.Errorf("TestFindOwnerType: Case0: result: %v, expected: %v", result, reflect.TypeOf(&voltha.Port{}))
- }
-
- result1 := FindOwnerType(reflect.ValueOf(TestUtilsDevice), testValidValItems, 1, false)
- if result1 != reflect.TypeOf(&openflow_13.OfpFlowStats{}) {
- t.Errorf("TestFindOwnerType: Case1: result: %v, expected: %v", result1, reflect.TypeOf(&openflow_13.OfpFlowStats{}))
- }
-
- result2 := FindOwnerType(reflect.ValueOf(TestUtilsDevice), testInvalidVal, 1, false)
- assert.Nil(t, result2)
-}
-
-func TestKeyOwner(t *testing.T) {
- result := FindKeyOwner(TestUtilsDevice, testValidValPorts, 0)
- if result != reflect.TypeOf(TestUtilsPort) {
- t.Errorf("TestKeyOwner: Case0: result: %v, expected: %v", result, reflect.TypeOf(TestUtilsPort))
- }
-
- result1 := FindKeyOwner(TestUtilsDevice, testValidValItems, 1)
- if result1 != reflect.TypeOf(TestUtilsFlows.Items) {
- t.Errorf("TestKeyOwner: Case1: result: %v, expected: %v", result1, reflect.TypeOf(TestUtilsFlows.Items))
- }
-
- result2 := FindKeyOwner(TestUtilsDevice, testInvalidVal, 1)
- assert.Nil(t, result2)
-}
-
-func TestGetAttributeValue(t *testing.T) {
- result1, result2 := GetAttributeValue(TestUtilsDevice, testValidValPorts, 0)
- assert.Equal(t, wantResultPorts, result1)
- assert.Equal(t, reflect.ValueOf(TestUtilsPort).Index(0), result2.Index(0))
-
- result3, _ := GetAttributeValue(TestUtilsDevice, testValidValItems, 1)
- assert.Equal(t, wantResultItems, result3)
-
- result4, _ := GetAttributeValue(TestUtilsDevice, testInvalidVal, 1)
- assert.Empty(t, result4)
-}
-
-func TestGetAttributeStructure(t *testing.T) {
- result := GetAttributeStructure(TestUtilsDevice, testValidValPorts, 0)
- assert.Equal(t, wantResultPorts, result.Name)
- assert.Equal(t, reflect.TypeOf(TestUtilsPort), result.Type)
-
- result1 := GetAttributeStructure(TestUtilsDevice, testValidValItems, 1)
- assert.Equal(t, wantResultItems, result1.Name)
- assert.Equal(t, reflect.TypeOf(TestUtilsFlows.Items), result1.Type)
-
- result2 := GetAttributeStructure(TestUtilsDevice, testInvalidVal, 1)
- assert.Empty(t, result2.Name)
- assert.Nil(t, result2.Type)
-}
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 383600d..1ed5b23 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -18,7 +18,6 @@
import (
"context"
- "errors"
"fmt"
"sync"
"time"
@@ -141,19 +140,17 @@
//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() error {
// Load the adapters
- adaptersIf, err := aMgr.clusterDataProxy.List(context.Background(), "/adapters", 0, false, "")
- if err != nil {
+ var adapters []*voltha.Adapter
+ if err := aMgr.clusterDataProxy.List(context.Background(), "adapters", &adapters); err != nil {
logger.Errorw("Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
return err
}
- if adaptersIf != nil {
- for _, adapterIf := range adaptersIf.([]interface{}) {
- if adapter, ok := adapterIf.(*voltha.Adapter); ok {
- if err := aMgr.addAdapter(adapter, false); err != nil {
- logger.Errorw("failed to add adapter", log.Fields{"adapterId": adapter.Id})
- } else {
- logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
- }
+ if len(adapters) != 0 {
+ for _, adapter := range adapters {
+ if err := aMgr.addAdapter(adapter, false); err != nil {
+ logger.Errorw("failed to add adapter", log.Fields{"adapterId": adapter.Id})
+ } else {
+ logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
}
}
} else {
@@ -163,20 +160,16 @@
}
// Load the device types
- deviceTypesIf, err := aMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, "")
- if err != nil {
+ var deviceTypes []*voltha.DeviceType
+ if err := aMgr.clusterDataProxy.List(context.Background(), "device_types", &deviceTypes); err != nil {
logger.Errorw("Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
}
- if deviceTypesIf != nil {
+ if len(deviceTypes) != 0 {
dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
- for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
- if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
- logger.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
- dTypes.Items = append(dTypes.Items, dType)
- } else {
- logger.Errorw("not an voltha device type", log.Fields{"interface": deviceTypeIf})
- }
+ for _, dType := range deviceTypes {
+ logger.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
+ dTypes.Items = append(dTypes.Items, dType)
}
return aMgr.addDeviceTypes(dTypes, false)
}
@@ -203,22 +196,14 @@
if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
- kvAdapter, err := aMgr.clusterDataProxy.Get(context.Background(), "/adapters/"+adapter.Id, 0, false, "")
- if err != nil {
+ if have, err := aMgr.clusterDataProxy.Get(context.Background(), "adapters/"+adapter.Id, &voltha.Adapter{}); err != nil {
logger.Errorw("failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
return err
- }
- if kvAdapter == nil {
- added, err := aMgr.clusterDataProxy.AddWithID(context.Background(), "/adapters", adapter.Id, adapter, "")
- if err != nil {
+ } else if !have {
+ if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "adapters", adapter.Id, adapter); err != nil {
logger.Errorw("failed-to-save-adapter-to-cluster-proxy", log.Fields{"error": err})
return err
}
- if added == nil {
- //TODO: Errors when saving to KV would require a separate go routine to be launched and try the saving again
- logger.Errorw("failed-to-save-adapter", log.Fields{"adapter": adapter})
- return errors.New("failed-to-save-adapter")
- }
logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapter": adapter})
}
}
@@ -241,23 +226,16 @@
if saveToDb {
// Save the device types to the KV store
for _, deviceType := range deviceTypes.Items {
- dType, err := aMgr.clusterDataProxy.Get(context.Background(), "/device_types/"+deviceType.Id, 0, false, "")
- if err != nil {
+ if have, err := aMgr.clusterDataProxy.Get(context.Background(), "device_types/"+deviceType.Id, &voltha.DeviceType{}); err != nil {
logger.Errorw("Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
- }
- if dType == nil {
+ } else if !have {
// Does not exist - save it
clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
- added, err := aMgr.clusterDataProxy.AddWithID(context.Background(), "/device_types", deviceType.Id, clonedDType, "")
- if err != nil {
+ if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "device_types", deviceType.Id, clonedDType); err != nil {
logger.Errorw("Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
return err
}
- if added == nil {
- logger.Errorw("failed-to-save-deviceType", log.Fields{"deviceType": deviceType})
- return errors.New("failed-to-save-deviceType")
- }
logger.Debugw("device-type-saved-to-KV-Store", log.Fields{"deviceType": deviceType})
}
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 69cd3c8..5043d47 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -18,7 +18,6 @@
import (
"context"
- "fmt"
"sync"
"time"
@@ -46,8 +45,6 @@
adapterMgr *AdapterManager
config *config.RWCoreFlags
kmp kafka.InterContainerProxy
- clusterDataRoot model.Root
- localDataRoot model.Root
clusterDataProxy *model.Proxy
localDataProxy *model.Proxy
exitChannel chan struct{}
@@ -79,8 +76,6 @@
Timeout: cf.KVStoreTimeout,
LivenessChannelInterval: livenessChannelInterval,
PathPrefix: cf.KVStoreDataPrefix}
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &core.backend)
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &core.backend)
return &core
}
@@ -112,18 +107,9 @@
if p != nil {
p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
}
- var err error
- core.clusterDataProxy, err = core.clusterDataRoot.CreateProxy(ctx, "/", false)
- if err != nil {
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
- return fmt.Errorf("Failed to create cluster data proxy")
- }
- core.localDataProxy, err = core.localDataRoot.CreateProxy(ctx, "/", false)
- if err != nil {
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
- return fmt.Errorf("Failed to create local data proxy")
- }
+ core.clusterDataProxy = model.NewProxy(&core.backend, "/")
+ core.localDataProxy = model.NewProxy(&core.backend, "/")
// core.kmp must be created before deviceMgr and adapterMgr, as they will make
// private copies of the poiner to core.kmp.
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 4c2b9f6..69391fb 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -19,6 +19,7 @@
import (
"context"
"encoding/hex"
+ "errors"
"fmt"
"github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -100,21 +101,17 @@
var device *voltha.Device
if deviceToCreate == nil {
// Load the existing device
- loadedDevice, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
+ device := &voltha.Device{}
+ have, err := agent.clusterDataProxy.Get(ctx, "devices/"+agent.deviceID, device)
if err != nil {
return nil, err
+ } else if !have {
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
}
- if loadedDevice != nil {
- var ok bool
- if device, ok = loadedDevice.(*voltha.Device); ok {
- agent.deviceType = device.Adapter
- agent.device = proto.Clone(device).(*voltha.Device)
- } else {
- return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
- }
- } else {
- return nil, status.Errorf(codes.NotFound, "device-%s-loading-failed", agent.deviceID)
- }
+
+ agent.deviceType = device.Adapter
+ agent.device = proto.Clone(device).(*voltha.Device)
+
logger.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
@@ -133,12 +130,8 @@
}
// Add the initial device to the local model
- added, err := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceID, device, "")
- if err != nil {
- return nil, err
- }
- if added == nil {
- return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceID)
+ if err := agent.clusterDataProxy.AddWithID(ctx, "devices", agent.deviceID, device); err != nil {
+ return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
}
agent.device = device
}
@@ -163,13 +156,9 @@
logger.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
// Remove the device from the KV store
- removed, err := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, "")
- if err != nil {
+ if err := agent.clusterDataProxy.Remove(ctx, "devices/"+agent.deviceID); err != nil {
return err
}
- if removed == nil {
- logger.Debugw("device-already-removed", log.Fields{"device-id": agent.deviceID})
- }
close(agent.exitChannel)
@@ -189,18 +178,17 @@
defer agent.requestQueue.RequestComplete()
logger.Debug("reconciling-device-agent-devicetype")
// TODO: context timeout
- device, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
- if err != nil {
+ device := &voltha.Device{}
+ if have, err := agent.clusterDataProxy.Get(ctx, "devices/"+agent.deviceID, device); err != nil {
logger.Errorw("kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
return
+ } else if !have {
+ return // not found in kv
}
- if device != nil {
- if d, ok := device.(*voltha.Device); ok {
- agent.deviceType = d.Adapter
- agent.device = proto.Clone(d).(*voltha.Device)
- logger.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
- }
- }
+
+ agent.deviceType = device.Adapter
+ agent.device = device
+ logger.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}
// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
@@ -1527,13 +1515,13 @@
//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
// It is an internal helper function.
func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid)
- if err != nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
+ if agent.stopped {
+ return errors.New("device agent stopped")
}
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
+
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if err := agent.clusterDataProxy.Update(updateCtx, "devices/"+agent.deviceID, device); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
logger.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index b88f38b..afe84e8 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -382,26 +382,25 @@
func (dMgr *DeviceManager) ListDevices(ctx context.Context) (*voltha.Devices, error) {
logger.Debug("ListDevices")
result := &voltha.Devices{}
- devices, err := dMgr.clusterDataProxy.List(ctx, "/devices", 0, false, "")
- if err != nil {
+
+ var devices []*voltha.Device
+ if err := dMgr.clusterDataProxy.List(ctx, "devices", &devices); err != nil {
logger.Errorw("failed-to-list-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
}
- if devices != nil {
- for _, d := range devices.([]interface{}) {
- device := d.(*voltha.Device)
- // If device is not in memory then set it up
- if !dMgr.IsDeviceInCache(device.Id) {
- logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if _, err := agent.start(ctx, nil); err != nil {
- logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
- } else {
- dMgr.addDeviceAgentToMap(agent)
- }
+
+ for _, device := range devices {
+ // If device is not in memory then set it up
+ if !dMgr.IsDeviceInCache(device.Id) {
+ logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
+ agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ if _, err := agent.start(ctx, nil); err != nil {
+ logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
+ } else {
+ dMgr.addDeviceAgentToMap(agent)
}
- result.Items = append(result.Items, device)
}
+ result.Items = append(result.Items, device)
}
logger.Debugw("ListDevices-end", log.Fields{"len": len(result.Items)})
return result, nil
@@ -410,22 +409,20 @@
//isParentDeviceExist checks whether device is already preprovisioned.
func (dMgr *DeviceManager) isParentDeviceExist(ctx context.Context, newDevice *voltha.Device) (bool, error) {
hostPort := newDevice.GetHostAndPort()
- devices, err := dMgr.clusterDataProxy.List(ctx, "/devices", 0, false, "")
- if err != nil {
+ var devices []*voltha.Device
+ if err := dMgr.clusterDataProxy.List(ctx, "devices", &devices); err != nil {
logger.Errorw("Failed to list devices from cluster data proxy", log.Fields{"error": err})
return false, err
}
- if devices != nil {
- for _, device := range devices.([]interface{}) {
- if !device.(*voltha.Device).Root {
- continue
- }
- if hostPort != "" && hostPort == device.(*voltha.Device).GetHostAndPort() && device.(*voltha.Device).AdminState != voltha.AdminState_DELETED {
- return true, nil
- }
- if newDevice.MacAddress != "" && newDevice.MacAddress == device.(*voltha.Device).MacAddress && device.(*voltha.Device).AdminState != voltha.AdminState_DELETED {
- return true, nil
- }
+ for _, device := range devices {
+ if !device.Root {
+ continue
+ }
+ if hostPort != "" && hostPort == device.GetHostAndPort() && device.AdminState != voltha.AdminState_DELETED {
+ return true, nil
+ }
+ if newDevice.MacAddress != "" && newDevice.MacAddress == device.MacAddress && device.AdminState != voltha.AdminState_DELETED {
+ return true, nil
}
}
return false, nil
@@ -433,17 +430,15 @@
//getDeviceFromModelretrieves the device data from the model.
func (dMgr *DeviceManager) getDeviceFromModel(ctx context.Context, deviceID string) (*voltha.Device, error) {
- device, err := dMgr.clusterDataProxy.Get(ctx, "/devices/"+deviceID, 0, false, "")
- if err != nil {
+ device := &voltha.Device{}
+ if have, err := dMgr.clusterDataProxy.Get(ctx, "devices/"+deviceID, device); err != nil {
logger.Errorw("failed-to-get-device-info-from-cluster-proxy", log.Fields{"error": err})
return nil, err
+ } else if !have {
+ return nil, status.Error(codes.NotFound, deviceID)
}
- if device != nil {
- if d, ok := device.(*voltha.Device); ok {
- return d, nil
- }
- }
- return nil, status.Error(codes.NotFound, deviceID)
+
+ return device, nil
}
// loadDevice loads the deviceID in memory, if not present
@@ -947,21 +942,17 @@
if deviceType == "" && vendorID != "" {
logger.Debug("device-type-is-nil-fetching-device-type")
- deviceTypesIf, err := dMgr.adapterMgr.clusterDataProxy.List(ctx, "/device_types", 0, false, "")
- if err != nil {
+ var deviceTypes []*voltha.DeviceType
+ if err := dMgr.adapterMgr.clusterDataProxy.List(ctx, "device_types", &deviceTypes); err != nil {
logger.Errorw("failed-to-get-device-type-info", log.Fields{"error": err})
return nil, err
}
- if deviceTypesIf != nil {
- OLoop:
- for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
- if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
- for _, v := range dType.VendorIds {
- if v == vendorID {
- deviceType = dType.Adapter
- break OLoop
- }
- }
+ OLoop:
+ for _, dType := range deviceTypes {
+ for _, v := range dType.VendorIds {
+ if v == vendorID {
+ deviceType = dType.Adapter
+ break OLoop
}
}
}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 2616540..76067bb 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -47,7 +47,7 @@
deviceMgr *DeviceManager
ldeviceMgr *LogicalDeviceManager
clusterDataProxy *model.Proxy
- exitChannel chan int
+ stopped bool
deviceRoutes *route.DeviceRoutes
lockDeviceRoutes sync.RWMutex
logicalPortsNo map[uint32]bool //value is true for NNI port
@@ -63,7 +63,6 @@
func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalDeviceManager,
deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout time.Duration) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
- agent.exitChannel = make(chan int, 1)
agent.logicalDeviceID = id
agent.serialNumber = sn
agent.rootDeviceID = deviceID
@@ -119,15 +118,11 @@
ld.Ports = []*voltha.LogicalPort{}
// Save the logical device
- added, err := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, "")
- if err != nil {
+ if err := agent.clusterDataProxy.AddWithID(ctx, "logical_devices", ld.Id, ld); err != nil {
+ logger.Errorw("failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
- if added == nil {
- logger.Errorw("failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
- } else {
- logger.Debugw("logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
- }
+ logger.Debugw("logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
@@ -141,14 +136,14 @@
} else {
// load from dB - the logical may not exist at this time. On error, just return and the calling function
// will destroy this agent.
- logicalDevice, err := agent.clusterDataProxy.Get(ctx, "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
+ ld := &voltha.LogicalDevice{}
+ have, err := agent.clusterDataProxy.Get(ctx, "logical_devices/"+agent.logicalDeviceID, ld)
if err != nil {
return err
- }
- ld, ok := logicalDevice.(*voltha.LogicalDevice)
- if !ok {
+ } else if !have {
return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
}
+
// Update the root device Id
agent.rootDeviceID = ld.RootDeviceId
@@ -186,15 +181,13 @@
defer agent.requestQueue.RequestComplete()
//Remove the logical device from the model
- if removed, err := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); err != nil {
+ if err := agent.clusterDataProxy.Remove(ctx, "logical_devices/"+agent.logicalDeviceID); err != nil {
returnErr = err
- } else if removed == nil {
- returnErr = status.Errorf(codes.Aborted, "failed-to-remove-logical-ldevice-%s", agent.logicalDeviceID)
} else {
logger.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
}
- close(agent.exitChannel)
+ agent.stopped = true
logger.Info("logical_device-agent-stopped")
})
@@ -539,16 +532,16 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
+ if agent.stopped {
+ return errors.New("logical device agent stopped")
+ }
+
updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/logical_devices/"+agent.logicalDeviceID, logicalDevice, false, "")
- if err != nil {
+ if err := agent.clusterDataProxy.Update(updateCtx, "logical_devices/"+agent.logicalDeviceID, logicalDevice); err != nil {
logger.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
return err
}
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceID)
- }
- //agent.logicalDevice = (proto.Clone(logicalDevice)).(*voltha.LogicalDevice)
+
agent.logicalDevice = logicalDevice
return nil
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 0babfad..175fe06 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -487,9 +487,8 @@
clonedLD.DatapathId = rand.Uint64()
lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
- added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
+ err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "logical_devices", clonedLD.Id, clonedLD)
assert.Nil(t, err)
- assert.NotNil(t, added)
lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
return lDeviceAgent
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index f2554d6..86e6b63 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -137,18 +137,13 @@
//listLogicalDevices returns the list of all logical devices
func (ldMgr *LogicalDeviceManager) listLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
logger.Debug("ListAllLogicalDevices")
- result := &voltha.LogicalDevices{}
- logicalDevices, err := ldMgr.clusterDataProxy.List(ctx, "/logical_devices", 0, true, "")
- if err != nil {
+
+ var logicalDevices []*voltha.LogicalDevice
+ if err := ldMgr.clusterDataProxy.List(ctx, "logical_devices", &logicalDevices); err != nil {
logger.Errorw("failed-to-list-logical-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
}
- if logicalDevices != nil {
- for _, logicalDevice := range logicalDevices.([]interface{}) {
- result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
- }
- }
- return result, nil
+ return &voltha.LogicalDevices{Items: logicalDevices}, nil
}
func (ldMgr *LogicalDeviceManager) createLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
@@ -218,17 +213,15 @@
//getLogicalDeviceFromModel retrieves the logical device data from the model.
func (ldMgr *LogicalDeviceManager) getLogicalDeviceFromModel(ctx context.Context, lDeviceID string) (*voltha.LogicalDevice, error) {
- logicalDevice, err := ldMgr.clusterDataProxy.Get(ctx, "/logical_devices/"+lDeviceID, 0, false, "")
- if err != nil {
+ logicalDevice := &voltha.LogicalDevice{}
+ if have, err := ldMgr.clusterDataProxy.Get(ctx, "logical_devices/"+lDeviceID, logicalDevice); err != nil {
logger.Errorw("failed-to-get-logical-devices-from-cluster-proxy", log.Fields{"error": err})
return nil, err
+ } else if !have {
+ return nil, status.Error(codes.NotFound, lDeviceID)
}
- if logicalDevice != nil {
- if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- return lDevice, nil
- }
- }
- return nil, status.Error(codes.NotFound, lDeviceID)
+
+ return logicalDevice, nil
}
// load loads a logical device manager in memory
diff --git a/vendor/github.com/golang/protobuf/descriptor/descriptor.go b/vendor/github.com/golang/protobuf/descriptor/descriptor.go
deleted file mode 100644
index ac7e51b..0000000
--- a/vendor/github.com/golang/protobuf/descriptor/descriptor.go
+++ /dev/null
@@ -1,93 +0,0 @@
-// Go support for Protocol Buffers - Google's data interchange format
-//
-// Copyright 2016 The Go Authors. All rights reserved.
-// https://github.com/golang/protobuf
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-// Package descriptor provides functions for obtaining protocol buffer
-// descriptors for generated Go types.
-//
-// These functions cannot go in package proto because they depend on the
-// generated protobuf descriptor messages, which themselves depend on proto.
-package descriptor
-
-import (
- "bytes"
- "compress/gzip"
- "fmt"
- "io/ioutil"
-
- "github.com/golang/protobuf/proto"
- protobuf "github.com/golang/protobuf/protoc-gen-go/descriptor"
-)
-
-// extractFile extracts a FileDescriptorProto from a gzip'd buffer.
-func extractFile(gz []byte) (*protobuf.FileDescriptorProto, error) {
- r, err := gzip.NewReader(bytes.NewReader(gz))
- if err != nil {
- return nil, fmt.Errorf("failed to open gzip reader: %v", err)
- }
- defer r.Close()
-
- b, err := ioutil.ReadAll(r)
- if err != nil {
- return nil, fmt.Errorf("failed to uncompress descriptor: %v", err)
- }
-
- fd := new(protobuf.FileDescriptorProto)
- if err := proto.Unmarshal(b, fd); err != nil {
- return nil, fmt.Errorf("malformed FileDescriptorProto: %v", err)
- }
-
- return fd, nil
-}
-
-// Message is a proto.Message with a method to return its descriptor.
-//
-// Message types generated by the protocol compiler always satisfy
-// the Message interface.
-type Message interface {
- proto.Message
- Descriptor() ([]byte, []int)
-}
-
-// ForMessage returns a FileDescriptorProto and a DescriptorProto from within it
-// describing the given message.
-func ForMessage(msg Message) (fd *protobuf.FileDescriptorProto, md *protobuf.DescriptorProto) {
- gz, path := msg.Descriptor()
- fd, err := extractFile(gz)
- if err != nil {
- panic(fmt.Sprintf("invalid FileDescriptorProto for %T: %v", msg, err))
- }
-
- md = fd.MessageType[path[0]]
- for _, i := range path[1:] {
- md = md.NestedType[i]
- }
- return fd, md
-}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index f1ddebd..5b3c29c 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -33,7 +33,6 @@
github.com/gogo/protobuf/proto
github.com/gogo/protobuf/protoc-gen-gogo/descriptor
# github.com/golang/protobuf v1.3.2
-github.com/golang/protobuf/descriptor
github.com/golang/protobuf/jsonpb
github.com/golang/protobuf/proto
github.com/golang/protobuf/protoc-gen-go/descriptor