VOL-1334 : Fixed concurrency issues
- Semaphores were added at the different layers of the model
- Made the proxy interfaces more robust
- Eliminated problems while retrieving latest data in concurrent mode
Change-Id: I7854105d7effa10e5cb704f5d9917569ab184f84
diff --git a/db/model/backend.go b/db/model/backend.go
index 25d568e..693ec02 100644
--- a/db/model/backend.go
+++ b/db/model/backend.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -21,6 +22,7 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"strconv"
+ "sync"
"time"
)
@@ -28,7 +30,9 @@
//TODO: missing retry stuff
//TODO: missing proper logging
+// Backend structure holds details for accessing the kv store
type Backend struct {
+ sync.RWMutex
Client kvstore.Client
StoreType string
Host string
@@ -37,6 +41,7 @@
PathPrefix string
}
+// NewBackend creates a new instance of a Backend structure
func NewBackend(storeType string, host string, port int, timeout int, pathPrefix string) *Backend {
var err error
@@ -70,23 +75,53 @@
path := fmt.Sprintf("%s/%s", b.PathPrefix, key)
return path
}
+
+// List retrieves one or more items that match the specified key
func (b *Backend) List(key string) (map[string]*kvstore.KVPair, error) {
- return b.Client.List(b.makePath(key), b.Timeout)
+ b.Lock()
+ defer b.Unlock()
+
+ formattedPath := b.makePath(key)
+ log.Debugf("List key: %s, path: %s", key, formattedPath)
+
+ return b.Client.List(formattedPath, b.Timeout)
}
+
+// Get retrieves an item that matches the specified key
func (b *Backend) Get(key string) (*kvstore.KVPair, error) {
+ b.Lock()
+ defer b.Unlock()
+
formattedPath := b.makePath(key)
log.Debugf("Get key: %s, path: %s", key, formattedPath)
+
start := time.Now()
err, pair := b.Client.Get(formattedPath, b.Timeout)
stop := time.Now()
+
GetProfiling().AddToDatabaseRetrieveTime(stop.Sub(start).Seconds())
+
return err, pair
}
+
+// Put stores an item value under the specifed key
func (b *Backend) Put(key string, value interface{}) error {
+ b.Lock()
+ defer b.Unlock()
+
formattedPath := b.makePath(key)
log.Debugf("Put key: %s, value: %+v, path: %s", key, string(value.([]byte)), formattedPath)
+
return b.Client.Put(formattedPath, value, b.Timeout)
}
+
+// Delete removes an item under the specified key
func (b *Backend) Delete(key string) error {
- return b.Client.Delete(b.makePath(key), b.Timeout)
+ b.Lock()
+ defer b.Unlock()
+
+ formattedPath := b.makePath(key)
+ log.Debugf("Delete key: %s, path: %s", key, formattedPath)
+
+ return b.Client.Delete(formattedPath, b.Timeout)
}
diff --git a/db/model/base_test.go b/db/model/base_test.go
index 1be34f6..010dff9 100644
--- a/db/model/base_test.go
+++ b/db/model/base_test.go
@@ -20,6 +20,7 @@
"github.com/opencord/voltha-go/protos/common"
"github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
+ "sync"
)
type ModelTestConfig struct {
@@ -35,14 +36,22 @@
var (
modelTestConfig = &ModelTestConfig{
- DbPrefix: "service/voltha/data/core/0001",
- DbType: "etcd",
- DbHost: "localhost",
+ DbPrefix: "service/voltha/data/core/0001",
+ DbType: "etcd",
+ DbHost: "localhost",
//DbHost: "10.106.153.44",
DbPort: 2379,
DbTimeout: 5,
}
+ logports = []*voltha.LogicalPort{
+ {
+ Id: "123",
+ DeviceId: "logicalport-0-device-id",
+ DevicePortNo: 123,
+ RootPort: false,
+ },
+ }
ports = []*voltha.Port{
{
PortNo: 123,
@@ -62,15 +71,25 @@
Items: []*openflow_13.OfpFlowStats{stats},
}
device = &voltha.Device{
- Id: devId,
+ Id: devID,
Type: "simulated_olt",
Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
AdminState: voltha.AdminState_PREPROVISIONED,
Flows: flows,
Ports: ports,
}
- devId string
- targetDeviceId string
+
+ logicalDevice = &voltha.LogicalDevice{
+ Id: devID,
+ DatapathId: 0,
+ Ports: logports,
+ Flows: flows,
+ }
+
+ devID string
+ ldevID string
+ targetDevID string
+ targetLogDevID string
)
func init() {
@@ -89,6 +108,7 @@
msgClass := &voltha.Voltha{}
root := NewRoot(msgClass, modelTestConfig.Backend)
+ //root := NewRoot(msgClass, nil)
if modelTestConfig.Backend != nil {
modelTestConfig.Root = root.Load(msgClass)
@@ -98,7 +118,7 @@
GetProfiling().Report()
- modelTestConfig.RootProxy = modelTestConfig.Root.GetProxy("/", false)
+ modelTestConfig.RootProxy = modelTestConfig.Root.node.CreateProxy("/", false)
}
func commonCallback(args ...interface{}) interface{} {
@@ -107,10 +127,34 @@
for i := 0; i < len(args); i++ {
log.Infof("ARG %d : %+v", i, args[i])
}
+
+ mutex := sync.Mutex{}
execStatus := args[1].(*bool)
// Inform the caller that the callback was executed
+ mutex.Lock()
*execStatus = true
+ mutex.Unlock()
+
+ return nil
+}
+
+func commonCallback2(args ...interface{}) interface{} {
+ log.Infof("Running common callback - arg count: %s", len(args))
+
+ return nil
+}
+
+func commonCallbackFunc(args ...interface{}) interface{} {
+ log.Infof("Running common callback - arg count: %d", len(args))
+
+ for i := 0; i < len(args); i++ {
+ log.Infof("ARG %d : %+v", i, args[i])
+ }
+ execStatusFunc := args[1].(func(bool))
+
+ // Inform the caller that the callback was executed
+ execStatusFunc(true)
return nil
}
@@ -121,6 +165,7 @@
log.Infof("Running first callback - name: %s, id: %s\n", name, id)
return nil
}
+
func secondCallback(args ...interface{}) interface{} {
name := args[0].(map[string]string)
id := args[1]
@@ -129,6 +174,7 @@
//panic("Generating a panic in second callback")
return nil
}
+
func thirdCallback(args ...interface{}) interface{} {
name := args[0]
id := args[1].(*voltha.Device)
diff --git a/db/model/branch.go b/db/model/branch.go
index ae0441a..3408f18 100644
--- a/db/model/branch.go
+++ b/db/model/branch.go
@@ -13,12 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
+import "sync"
+
// TODO: implement weak references or something equivalent
// TODO: missing proper logging
+// Branch structure is used to classify a collection of transaction based revisions
type Branch struct {
+ sync.RWMutex
Node *node
Txid string
Origin Revision
@@ -26,24 +31,65 @@
Latest Revision
}
+// NewBranch creates a new instance of the Branch structure
func NewBranch(node *node, txid string, origin Revision, autoPrune bool) *Branch {
- cb := &Branch{}
- cb.Node = node
- cb.Txid = txid
- cb.Origin = origin
- cb.Revisions = make(map[string]Revision)
- cb.Latest = origin
+ b := &Branch{}
+ b.Node = node
+ b.Txid = txid
+ b.Origin = origin
+ b.Revisions = make(map[string]Revision)
+ b.Latest = origin
- return cb
+ return b
}
-// TODO: Check if the following are required
-func (cb *Branch) get(hash string) Revision {
- return cb.Revisions[hash]
+// SetLatest assigns the latest revision for this branch
+func (b *Branch) SetLatest(latest Revision) {
+ b.Lock()
+ defer b.Unlock()
+
+ b.Latest = latest
}
-func (cb *Branch) GetLatest() Revision {
- return cb.Latest
+
+// GetLatest retrieves the latest revision of the branch
+func (b *Branch) GetLatest() Revision {
+ b.Lock()
+ defer b.Unlock()
+
+ return b.Latest
}
-func (cb *Branch) GetOrigin() Revision {
- return cb.Origin
+
+// GetOrigin retrieves the original revision of the branch
+func (b *Branch) GetOrigin() Revision {
+ b.Lock()
+ defer b.Unlock()
+
+ return b.Origin
+}
+
+// AddRevision inserts a new revision to the branch
+func (b *Branch) AddRevision(revision Revision) {
+ if revision != nil && b.GetRevision(revision.GetHash()) == nil {
+ b.SetRevision(revision.GetHash(), revision)
+ }
+}
+
+// GetRevision pulls a revision entry at the specified hash
+func (b *Branch) GetRevision(hash string) Revision {
+ b.Lock()
+ defer b.Unlock()
+
+ if revision, ok := b.Revisions[hash]; !ok {
+ return revision
+ }
+
+ return nil
+}
+
+// SetRevision updates a revision entry at the specified hash
+func (b *Branch) SetRevision(hash string, revision Revision) {
+ b.Lock()
+ defer b.Unlock()
+
+ b.Revisions[hash] = revision
}
diff --git a/db/model/branch_test.go b/db/model/branch_test.go
index 1f2eec2..26129fa 100644
--- a/db/model/branch_test.go
+++ b/db/model/branch_test.go
@@ -58,7 +58,7 @@
}
func Test_ConfigBranch_GetRevision(t *testing.T) {
- rev := BRANCH.get(HASH)
+ rev := BRANCH.GetRevision(HASH)
t.Logf("Got revision for hash:%s rev:%+v\n", HASH, rev)
}
func Test_ConfigBranch_LatestRevision(t *testing.T) {
diff --git a/db/model/callback_type.go b/db/model/callback_type.go
index 8f55688..b530dee 100644
--- a/db/model/callback_type.go
+++ b/db/model/callback_type.go
@@ -13,10 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
+// CallbackType is an enumerated value to express when a callback should be executed
type CallbackType uint8
+// Enumerated list of callback types
const (
GET CallbackType = iota
PRE_UPDATE
diff --git a/db/model/child_type.go b/db/model/child_type.go
index a96883c..224da9f 100644
--- a/db/model/child_type.go
+++ b/db/model/child_type.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -26,20 +27,21 @@
"sync"
)
-type singleton struct {
- ChildrenFieldsCache map[interface{}]map[string]*ChildType
+type singletonChildTypeCache struct {
+ Cache map[interface{}]map[string]*ChildType
}
-var instance *singleton
-var once sync.Once
+var instanceChildTypeCache *singletonChildTypeCache
+var onceChildTypeCache sync.Once
-func GetInstance() *singleton {
- once.Do(func() {
- instance = &singleton{}
+func getChildTypeCache() *singletonChildTypeCache {
+ onceChildTypeCache.Do(func() {
+ instanceChildTypeCache = &singletonChildTypeCache{}
})
- return instance
+ return instanceChildTypeCache
}
+// ChildType structure contains construct details of an object
type ChildType struct {
ClassModule string
ClassType reflect.Type
@@ -48,22 +50,22 @@
KeyFromStr func(s string) interface{}
}
+// ChildrenFields retrieves list of child objects associated to a given interface
func ChildrenFields(cls interface{}) map[string]*ChildType {
if cls == nil {
return nil
}
var names map[string]*ChildType
- var names_exist bool
+ var namesExist bool
- if GetInstance().ChildrenFieldsCache == nil {
- GetInstance().ChildrenFieldsCache = make(map[interface{}]map[string]*ChildType)
+ if getChildTypeCache().Cache == nil {
+ getChildTypeCache().Cache = make(map[interface{}]map[string]*ChildType)
}
msgType := reflect.TypeOf(cls)
+ inst := getChildTypeCache()
- inst := GetInstance()
-
- if names, names_exist = inst.ChildrenFieldsCache[msgType.String()]; !names_exist {
+ if names, namesExist = inst.Cache[msgType.String()]; !namesExist {
names = make(map[string]*ChildType)
_, md := desc.ForMessage(cls.(desc.Message))
@@ -125,7 +127,9 @@
}
}
- GetInstance().ChildrenFieldsCache[msgType.String()] = names
+ getChildTypeCache().Cache[msgType.String()] = names
+ } else {
+ log.Debugf("Cache entry for %s: %+v", msgType.String(), inst.Cache[msgType.String()])
}
return names
diff --git a/db/model/child_type_test.go b/db/model/child_type_test.go
index 4659805..1fd81bc 100644
--- a/db/model/child_type_test.go
+++ b/db/model/child_type_test.go
@@ -24,10 +24,10 @@
*/
func Test_ChildType_01_CacheIsEmpty(t *testing.T) {
- if GetInstance().ChildrenFieldsCache != nil || len(GetInstance().ChildrenFieldsCache) > 0 {
- t.Errorf("GetInstance().ChildrenFieldsCache should be empty: %+v\n", GetInstance().ChildrenFieldsCache)
+ if getChildTypeCache().Cache != nil || len(getChildTypeCache().Cache) > 0 {
+ t.Errorf("getChildTypeCache().Cache should be empty: %+v\n", getChildTypeCache().Cache)
}
- t.Logf("GetInstance().ChildrenFieldsCache is empty - %+v\n", GetInstance().ChildrenFieldsCache)
+ t.Logf("getChildTypeCache().Cache is empty - %+v\n", getChildTypeCache().Cache)
}
/*
@@ -54,17 +54,17 @@
*/
func Test_ChildType_03_CacheHasOneEntry(t *testing.T) {
- if GetInstance().ChildrenFieldsCache == nil || len(GetInstance().ChildrenFieldsCache) != 1 {
- t.Errorf("GetInstance().ChildrenFieldsCache should have one entry: %+v\n", GetInstance().ChildrenFieldsCache)
+ if getChildTypeCache().Cache == nil || len(getChildTypeCache().Cache) != 1 {
+ t.Errorf("getChildTypeCache().Cache should have one entry: %+v\n", getChildTypeCache().Cache)
}
- t.Logf("GetInstance().ChildrenFieldsCache has one entry: %+v\n", GetInstance().ChildrenFieldsCache)
+ t.Logf("getChildTypeCache().Cache has one entry: %+v\n", getChildTypeCache().Cache)
}
/*
*/
-func Test_ChildType_04_ChildrenFieldsCache_Keys(t *testing.T) {
- for k := range GetInstance().ChildrenFieldsCache {
- t.Logf("GetInstance().ChildrenFieldsCache Key:%+v\n", k)
+func Test_ChildType_04_Cache_Keys(t *testing.T) {
+ for k := range getChildTypeCache().Cache {
+ t.Logf("getChildTypeCache().Cache Key:%+v\n", k)
}
}
diff --git a/db/model/data_revision.go b/db/model/data_revision.go
index 9bf840d..0763d09 100644
--- a/db/model/data_revision.go
+++ b/db/model/data_revision.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -25,33 +26,37 @@
"reflect"
)
+// DataRevision stores the data associated to a revision along with its calculated checksum hash value
type DataRevision struct {
Data interface{}
Hash string
}
-func NewDataRevision(data interface{}) *DataRevision {
- cdr := &DataRevision{}
- cdr.Data = data
- cdr.Hash = cdr.hashData(data)
+// NewDataRevision creates a new instance of a DataRevision structure
+func NewDataRevision(root *root, data interface{}) *DataRevision {
+ dr := DataRevision{}
+ dr.Data = data
+ dr.Hash = dr.hashData(root, data)
- return cdr
+ return &dr
}
-func (cr *DataRevision) hashData(data interface{}) string {
+func (dr *DataRevision) hashData(root *root, data interface{}) string {
var buffer bytes.Buffer
if IsProtoMessage(data) {
if pbdata, err := proto.Marshal(data.(proto.Message)); err != nil {
- log.Errorf("problem to marshal protobuf data --> err: %s", err.Error())
+ log.Debugf("problem to marshal protobuf data --> err: %s", err.Error())
} else {
buffer.Write(pbdata)
+ // To ensure uniqueness in case data is nil, also include data type
+ buffer.Write([]byte(reflect.TypeOf(data).String()))
}
} else if reflect.ValueOf(data).IsValid() {
dataObj := reflect.New(reflect.TypeOf(data).Elem())
if json, err := json.Marshal(dataObj.Interface()); err != nil {
- log.Errorf("problem to marshal data --> err: %s", err.Error())
+ log.Debugf("problem to marshal data --> err: %s", err.Error())
} else {
buffer.Write(json)
}
@@ -60,5 +65,9 @@
buffer.Write(dataObj.Bytes())
}
+ // Add the root pointer that owns the current data for extra uniqueness
+ rootPtr := fmt.Sprintf("%p", root)
+ buffer.Write([]byte(rootPtr))
+
return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
}
diff --git a/db/model/event_bus.go b/db/model/event_bus.go
index aaeb7ac..71e0922 100644
--- a/db/model/event_bus.go
+++ b/db/model/event_bus.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -22,13 +23,15 @@
"github.com/opencord/voltha-go/protos/voltha"
)
+// EventBus contains the details required to communicate with the event bus mechanism
type EventBus struct {
client *EventBusClient
topic string
}
+// ignoredCallbacks keeps a list of callbacks that should not be advertised on the event bus
var (
- IGNORED_CALLBACKS = map[CallbackType]struct{}{
+ ignoredCallbacks = map[CallbackType]struct{}{
PRE_ADD: {},
GET: {},
POST_LISTCHANGE: {},
@@ -37,6 +40,7 @@
}
)
+// NewEventBus creates a new instance of the EventBus structure
func NewEventBus() *EventBus {
bus := &EventBus{
client: NewEventBusClient(),
@@ -45,13 +49,13 @@
return bus
}
-//func (bus *EventBus) Advertise(eventType CallbackType, data interface{}, hash string) {
+// Advertise will publish the provided information to the event bus
func (bus *EventBus) Advertise(args ...interface{}) interface{} {
eventType := args[0].(CallbackType)
hash := args[1].(string)
data := args[2:]
- if _, ok := IGNORED_CALLBACKS[eventType]; ok {
+ if _, ok := ignoredCallbacks[eventType]; ok {
log.Debugf("ignoring event - type:%s, data:%+v", eventType, data)
}
var kind voltha.ConfigEventType_ConfigEventType
@@ -68,14 +72,14 @@
var err error
if IsProtoMessage(data) {
if msg, err = proto.Marshal(data[0].(proto.Message)); err != nil {
- log.Errorf("problem marshalling proto data: %+v, err:%s", data[0], err.Error())
+ log.Debugf("problem marshalling proto data: %+v, err:%s", data[0], err.Error())
}
} else if data[0] != nil {
if msg, err = json.Marshal(data[0]); err != nil {
- log.Errorf("problem marshalling json data: %+v, err:%s", data[0], err.Error())
+ log.Debugf("problem marshalling json data: %+v, err:%s", data[0], err.Error())
}
} else {
- log.Errorf("no data to advertise : %+v", data[0])
+ log.Debugf("no data to advertise : %+v", data[0])
}
event := voltha.ConfigEvent{
diff --git a/db/model/event_bus_client.go b/db/model/event_bus_client.go
index 227cb3c..f038cad 100644
--- a/db/model/event_bus_client.go
+++ b/db/model/event_bus_client.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -20,13 +21,16 @@
"github.com/opencord/voltha-go/protos/voltha"
)
+// EventBusClient is an abstraction layer structure to communicate with an event bus mechanism
type EventBusClient struct {
}
+// NewEventBusClient creates a new EventBusClient instance
func NewEventBusClient() *EventBusClient {
return &EventBusClient{}
}
+// Publish sends a event to the bus
func (ebc *EventBusClient) Publish(topic string, event voltha.ConfigEvent) {
log.Debugf("publishing event:%+v, topic:%s\n", event, topic)
}
diff --git a/db/model/merge.go b/db/model/merge.go
index 0546bd3..e40f98b 100644
--- a/db/model/merge.go
+++ b/db/model/merge.go
@@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
"github.com/opencord/voltha-go/common/log"
- "reflect"
)
func revisionsAreEqual(a, b []Revision) bool {
@@ -65,17 +65,17 @@
_, v := GetAttributeValue(rev.GetData(), keyName, 0)
changes.KeyMap2[v.String()] = i
}
- for v, _ := range changes.KeyMap2 {
+ for v := range changes.KeyMap2 {
if _, ok := changes.KeyMap1[v]; !ok {
changes.AddedKeys[v] = struct{}{}
}
}
- for v, _ := range changes.KeyMap1 {
+ for v := range changes.KeyMap1 {
if _, ok := changes.KeyMap2[v]; !ok {
changes.RemovedKeys[v] = struct{}{}
}
}
- for v, _ := range changes.KeyMap1 {
+ for v := range changes.KeyMap1 {
if _, ok := changes.KeyMap2[v]; ok && lst1[changes.KeyMap1[v]].GetHash() != lst2[changes.KeyMap2[v]].GetHash() {
changes.ChangedKeys[v] = struct{}{}
}
@@ -84,6 +84,7 @@
return changes
}
+// Merge3Way takes care of combining the revision contents of the same data set
func Merge3Way(
forkRev, srcRev, dstRev Revision,
mergeChildFunc func(Revision) Revision,
@@ -101,7 +102,14 @@
configChanged = true
}
- newChildren := reflect.ValueOf(dstRev.GetChildren()).Interface().(map[string][]Revision)
+ //newChildren := reflect.ValueOf(dstRev.GetChildren()).Interface().(map[string][]Revision)
+ newChildren := make(map[string][]Revision)
+ for entryName, childrenEntry := range dstRev.GetChildren() {
+ //newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
+ newChildren[entryName] = make([]Revision, len(childrenEntry))
+ copy(newChildren[entryName], childrenEntry)
+ }
+
childrenFields := ChildrenFields(forkRev.GetData())
for fieldName, field := range childrenFields {
@@ -140,9 +148,10 @@
if revisionsAreEqual(dstList, forkList) {
src := newChangeAnalysis(forkList, srcList, field.Key)
- newList := reflect.ValueOf(srcList).Interface().([]Revision)
+ newList := make([]Revision, len(srcList))
+ copy(newList, srcList)
- for key, _ := range src.AddedKeys {
+ for key := range src.AddedKeys {
idx := src.KeyMap2[key]
newRev := mergeChildFunc(newList[idx])
@@ -152,12 +161,12 @@
changes = append(changes, ChangeTuple{POST_ADD, newList[idx].GetData(), newRev.GetData()})
}
}
- for key, _ := range src.RemovedKeys {
+ for key := range src.RemovedKeys {
oldRev := forkList[src.KeyMap1[key]]
revsToDiscard = append(revsToDiscard, oldRev)
changes = append(changes, ChangeTuple{POST_REMOVE, oldRev.GetData(), nil})
}
- for key, _ := range src.ChangedKeys {
+ for key := range src.ChangedKeys {
idx := src.KeyMap2[key]
newRev := mergeChildFunc(newList[idx])
@@ -174,9 +183,10 @@
src := newChangeAnalysis(forkList, srcList, field.Key)
dst := newChangeAnalysis(forkList, dstList, field.Key)
- newList := reflect.ValueOf(dstList).Interface().([]Revision)
+ newList := make([]Revision, len(dstList))
+ copy(newList, dstList)
- for key, _ := range src.AddedKeys {
+ for key := range src.AddedKeys {
if _, exists := dst.AddedKeys[key]; exists {
childDstRev := dstList[dst.KeyMap2[key]]
childSrcRev := srcList[src.KeyMap2[key]]
@@ -191,7 +201,7 @@
changes = append(changes, ChangeTuple{POST_ADD, srcList[src.KeyMap2[key]], newRev.GetData()})
}
}
- for key, _ := range src.ChangedKeys {
+ for key := range src.ChangedKeys {
if _, removed := dst.RemovedKeys[key]; removed {
log.Error("conflict error - revision has been removed")
} else if _, changed := dst.ChangedKeys[key]; changed {
@@ -212,7 +222,7 @@
}
// TODO: how do i sort this map in reverse order?
- for key, _ := range src.RemovedKeys {
+ for key := range src.RemovedKeys {
if _, changed := dst.ChangedKeys[key]; changed {
log.Error("conflict error - revision has changed")
}
@@ -236,7 +246,7 @@
}
}
- if !dryRun {
+ if !dryRun && len(newChildren) > 0{
if configChanged {
rev = srcRev
} else {
@@ -247,15 +257,15 @@
discarded.Drop("", true)
}
- dstRev.GetBranch().Latest.Drop("", configChanged)
+ dstRev.GetBranch().GetLatest().Drop("", configChanged)
rev = rev.UpdateAllChildren(newChildren, dstRev.GetBranch())
if configChanged {
- changes = append(changes, ChangeTuple{POST_UPDATE, dstRev.GetBranch().Latest.GetData(), rev.GetData()})
+ // FIXME: what type of previous/latest data do we want to show? Specific node or Root
+ changes = append(changes, ChangeTuple{POST_UPDATE, dstRev.GetBranch().GetLatest().GetData(), rev.GetData()})
}
return rev, changes
- } else {
- return nil, nil
-
}
+
+ return nil, nil
}
diff --git a/db/model/node.go b/db/model/node.go
index a3e6ea7..7ea41ce 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
// TODO: proper error handling
@@ -23,13 +24,17 @@
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"reflect"
+ "runtime/debug"
"strings"
+ "sync"
)
+// When a branch has no transaction id, everything gets stored in NONE
const (
NONE string = "none"
)
+// Node interface is an abstraction of the node data structure
type Node interface {
MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
@@ -47,14 +52,12 @@
DeleteTxBranch(txid string)
FoldTxBranch(txid string)
- GetProxy(path string, exclusive bool) *Proxy
-
- ExecuteCallbacks()
- AddCallback(callback CallbackFunction, args ...interface{})
- AddNotificationCallback(callback CallbackFunction, args ...interface{})
+ CreateProxy(path string, exclusive bool) *Proxy
+ GetProxy() *Proxy
}
type node struct {
+ sync.RWMutex
Root *root
Type interface{}
Branches map[string]*Branch
@@ -64,12 +67,14 @@
AutoPrune bool
}
+// ChangeTuple holds details of modifications made to a revision
type ChangeTuple struct {
Type CallbackType
PreviousData interface{}
LatestData interface{}
}
+// NewNode creates a new instance of the node data structure
func NewNode(root *root, initialData interface{}, autoPrune bool, txid string) *node {
n := &node{}
@@ -85,6 +90,9 @@
dataCopy := proto.Clone(initialData.(proto.Message))
n.initialize(dataCopy, txid)
} else if reflect.ValueOf(initialData).IsValid() {
+ // FIXME: this block does not reflect the original implementation
+ // it should be checking if the provided initial_data is already a type!??!
+ // it should be checked before IsProtoMessage
n.Type = reflect.ValueOf(initialData).Interface()
} else {
// not implemented error
@@ -94,39 +102,33 @@
return n
}
+// MakeNode creates a new node in the tree
func (n *node) MakeNode(data interface{}, txid string) *node {
return NewNode(n.Root, data, true, txid)
}
+// MakeRevision create a new revision of the node in the tree
func (n *node) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
- if n.Root.RevisionClass.(reflect.Type) == reflect.TypeOf(PersistedRevision{}) {
- return NewPersistedRevision(branch, data, children)
- }
-
- return NewNonPersistedRevision(branch, data, children)
+ return n.GetRoot().MakeRevision(branch, data, children)
}
-func (n *node) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
- n.makeLatest(branch, revision, changeAnnouncement)
-}
+// makeLatest will mark the revision of a node as being the latest
func (n *node) makeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
- if _, ok := branch.Revisions[revision.GetHash()]; !ok {
- branch.Revisions[revision.GetHash()] = revision
- }
+ branch.AddRevision(revision)
- if branch.Latest == nil || revision.GetHash() != branch.Latest.GetHash() {
- branch.Latest = revision
+ if branch.GetLatest() == nil || revision.GetHash() != branch.GetLatest().GetHash() {
+ branch.SetLatest(revision)
}
if changeAnnouncement != nil && branch.Txid == "" {
- if n.Proxy != nil {
+ if n.GetProxy() != nil {
for _, change := range changeAnnouncement {
log.Debugf("invoking callback - changeType: %+v, previous:%+v, latest: %+v",
change.Type,
change.PreviousData,
change.LatestData)
- n.Root.AddCallback(
- n.Proxy.InvokeCallbacks,
+ n.GetRoot().AddCallback(
+ n.GetProxy().InvokeCallbacks,
change.Type,
true,
change.PreviousData,
@@ -139,7 +141,7 @@
change.Type,
change.PreviousData,
change.LatestData)
- n.Root.AddNotificationCallback(
+ n.GetRoot().AddNotificationCallback(
n.makeEventBus().Advertise,
change.Type,
revision.GetHash(),
@@ -149,27 +151,24 @@
}
}
+// Latest returns the latest revision of node with or without the transaction id
func (n *node) Latest(txid ...string) Revision {
var branch *Branch
- var exists bool
if len(txid) > 0 && txid[0] != "" {
- if branch, exists = n.Branches[txid[0]]; exists {
- return branch.Latest
+ if branch = n.GetBranch(txid[0]); branch != nil {
+ return branch.GetLatest()
}
- } else if branch, exists = n.Branches[NONE]; exists {
- return branch.Latest
+ } else if branch = n.GetBranch(NONE); branch != nil {
+ return branch.GetLatest()
}
return nil
}
-func (n *node) GetHash(hash string) Revision {
- return n.Branches[NONE].Revisions[hash]
-}
-
+// initialize prepares the content of a node along with its possible ramifications
func (n *node) initialize(data interface{}, txid string) {
- var children map[string][]Revision
- children = make(map[string][]Revision)
+ n.Lock()
+ children := make(map[string][]Revision)
for fieldName, field := range ChildrenFields(n.Type) {
_, fieldValue := GetAttributeValue(data, fieldName, 0)
@@ -181,7 +180,9 @@
for i := 0; i < fieldValue.Len(); i++ {
v := fieldValue.Index(i)
- rev := n.MakeNode(v.Interface(), txid).Latest(txid)
+ if rev := n.MakeNode(v.Interface(), txid).Latest(txid); rev != nil {
+ children[fieldName] = append(children[fieldName], rev)
+ }
_, key := GetAttributeValue(v.Interface(), field.Key, 0)
for _, k := range keysSeen {
@@ -189,59 +190,60 @@
log.Errorf("duplicate key - %s", k)
}
}
- children[fieldName] = append(children[fieldName], rev)
keysSeen = append(keysSeen, key.String())
}
} else {
for i := 0; i < fieldValue.Len(); i++ {
v := fieldValue.Index(i)
- children[fieldName] = append(children[fieldName], n.MakeNode(v.Interface(), txid).Latest())
+ if newNodeRev := n.MakeNode(v.Interface(), txid).Latest(); newNodeRev != nil {
+ children[fieldName] = append(children[fieldName], newNodeRev)
+ }
}
}
} else {
- children[fieldName] = append(children[fieldName], n.MakeNode(fieldValue.Interface(), txid).Latest())
+ if newNodeRev := n.MakeNode(fieldValue.Interface(), txid).Latest(); newNodeRev != nil {
+ children[fieldName] = append(children[fieldName], newNodeRev)
+ }
}
} else {
log.Errorf("field is invalid - %+v", fieldValue)
}
}
- // FIXME: ClearField??? No such method in go protos. Reset?
- //data.ClearField(field_name)
+ n.Unlock()
+
branch := NewBranch(n, "", nil, n.AutoPrune)
rev := n.MakeRevision(branch, data, children)
n.makeLatest(branch, rev, nil)
if txid == "" {
- n.Branches[NONE] = branch
+ n.SetBranch(NONE, branch)
} else {
- n.Branches[txid] = branch
+ n.SetBranch(txid, branch)
}
}
+// findRevByKey retrieves a specific revision from a node tree
func (n *node) findRevByKey(revs []Revision, keyName string, value interface{}) (int, Revision) {
+ n.Lock()
+ defer n.Unlock()
for i, rev := range revs {
dataValue := reflect.ValueOf(rev.GetData())
dataStruct := GetAttributeStructure(rev.GetData(), keyName, 0)
fieldValue := dataValue.Elem().FieldByName(dataStruct.Name)
- //log.Debugf("fieldValue: %+v, type: %+v, value: %+v", fieldValue.Interface(), fieldValue.Type(), value)
a := fmt.Sprintf("%s", fieldValue.Interface())
b := fmt.Sprintf("%s", value)
if a == b {
- return i, rev
+ return i, revs[i]
}
}
- log.Errorf("key %s=%s not found", keyName, value)
-
return -1, nil
}
-//
-// Get operation
-//
+// Get retrieves the data from a node tree that resides at the specified path
func (n *node) Get(path string, hash string, depth int, deep bool, txid string) interface{} {
if deep {
depth = -1
@@ -254,20 +256,20 @@
var branch *Branch
var rev Revision
- // FIXME: should empty txid be cleaned up?
- if branch = n.Branches[txid]; txid == "" || branch == nil {
- branch = n.Branches[NONE]
+ if branch = n.GetBranch(txid); txid == "" || branch == nil {
+ branch = n.GetBranch(NONE)
}
if hash != "" {
- rev = branch.Revisions[hash]
+ rev = branch.GetRevision(hash)
} else {
- rev = branch.Latest
+ rev = branch.GetLatest()
}
return n.getPath(rev, path, depth)
}
+// getPath traverses the specified path and retrieves the data associated to it
func (n *node) getPath(rev Revision, path string, depth int) interface{} {
if path == "" {
return n.getData(rev, depth)
@@ -286,8 +288,10 @@
field := names[name]
if field.IsContainer {
+ children := make([]Revision, len(rev.GetChildren()[name]))
+ copy(children, rev.GetChildren()[name])
+
if field.Key != "" {
- children := rev.GetChildren()[name]
if path != "" {
partition = strings.SplitN(path, "/", 2)
key := partition[0]
@@ -314,60 +318,57 @@
// TODO: raise error
return response
}
- for _, childRev := range rev.GetChildren()[name] {
+ for _, childRev := range children {
childNode := childRev.GetNode()
value := childNode.getData(childRev, depth)
response = append(response, value)
}
return response
}
- } else {
- childRev := rev.GetChildren()[name][0]
- childNode := childRev.GetNode()
- return childNode.getPath(childRev, path, depth)
}
- return nil
+
+ childRev := rev.GetChildren()[name][0]
+ childNode := childRev.GetNode()
+ return childNode.getPath(childRev, path, depth)
}
+// getData retrieves the data from a node revision
func (n *node) getData(rev Revision, depth int) interface{} {
- msg := rev.Get(depth)
+ msg := rev.GetBranch().GetLatest().Get(depth)
var modifiedMsg interface{}
- if n.Proxy != nil {
- log.Debug("invoking proxy GET Callbacks")
- if modifiedMsg = n.Proxy.InvokeCallbacks(GET, false, msg); modifiedMsg != nil {
+ if n.GetProxy() != nil {
+ log.Debug("invoking proxy GET Callbacks : %+v", msg)
+ if modifiedMsg = n.GetProxy().InvokeCallbacks(GET, false, msg); modifiedMsg != nil {
msg = modifiedMsg
}
}
+
return msg
}
-//
-// Update operation
-//
+// Update changes the content of a node at the specified path with the provided data
func (n *node) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
- // FIXME: is this required ... a bit overkill to take out a "/"
for strings.HasPrefix(path, "/") {
path = path[1:]
}
var branch *Branch
- var ok bool
if txid == "" {
- branch = n.Branches[NONE]
- } else if branch, ok = n.Branches[txid]; !ok {
+ branch = n.GetBranch(NONE)
+ } else if branch = n.GetBranch(txid); &branch == nil {
branch = makeBranch(n)
}
- log.Debugf("Branch data : %+v, Passed data: %+v", branch.Latest.GetData(), data)
-
+ if branch.GetLatest() != nil {
+ log.Debugf("Branch data : %+v, Passed data: %+v", branch.GetLatest().GetData(), data)
+ }
if path == "" {
return n.doUpdate(branch, data, strict)
}
- // TODO missing some code here...
- rev := branch.Latest
+ rev := branch.GetLatest()
partition := strings.SplitN(path, "/", 2)
name := partition[0]
@@ -393,14 +394,12 @@
path = partition[1]
}
keyValue := field.KeyFromStr(key)
- // TODO. Est-ce que le copy ne fonctionne pas? dois-je plutôt faire un clone de chaque item?
- for _, v := range rev.GetChildren()[name] {
- revCopy := reflect.ValueOf(v).Interface().(Revision)
- children = append(children, revCopy)
- }
+
+ children = make([]Revision, len(rev.GetChildren()[name]))
+ copy(children, rev.GetChildren()[name])
+
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
childNode := childRev.GetNode()
- childNode.Proxy = n.Proxy
newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
@@ -409,39 +408,45 @@
log.Debug("clear-hash - %s %+v", newChildRev.GetHash(), newChildRev)
newChildRev.ClearHash()
}
- return branch.Latest
+ return branch.GetLatest()
}
_, newKey := GetAttributeValue(newChildRev.GetData(), field.Key, 0)
- log.Debugf("newKey is %s", newKey.Interface())
+
_newKeyType := fmt.Sprintf("%s", newKey)
_keyValueType := fmt.Sprintf("%s", keyValue)
+
if _newKeyType != _keyValueType {
log.Errorf("cannot change key field")
}
+
children[idx] = newChildRev
- rev = rev.UpdateChildren(name, children, branch)
- branch.Latest.Drop(txid, false)
- n.Root.MakeLatest(branch, rev, nil)
+
+ updatedRev := rev.UpdateChildren(name, children, branch)
+ branch.GetLatest().Drop(txid, false)
+ n.makeLatest(branch, updatedRev, nil)
+
return newChildRev
+
} else {
log.Errorf("cannot index into container with no keys")
}
} else {
childRev := rev.GetChildren()[name][0]
childNode := childRev.GetNode()
- childNode.Proxy = n.Proxy
newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
- rev = rev.UpdateChildren(name, []Revision{newChildRev}, branch)
- branch.Latest.Drop(txid, false)
- n.Root.MakeLatest(branch, rev, nil)
+ updatedRev := rev.UpdateChildren(name, []Revision{newChildRev}, branch)
+ rev.Drop(txid, false)
+ n.makeLatest(branch, updatedRev, nil)
return newChildRev
}
return nil
}
func (n *node) doUpdate(branch *Branch, data interface{}, strict bool) Revision {
- log.Debugf("Comparing types - expected: %+v, actual: %+v", reflect.ValueOf(n.Type).Type(), reflect.TypeOf(data))
+ log.Debugf("Comparing types - expected: %+v, actual: %+v &&&&&& %s", reflect.ValueOf(n.Type).Type(),
+ reflect.TypeOf(data),
+ string(debug.Stack()))
if reflect.TypeOf(data) != reflect.ValueOf(n.Type).Type() {
// TODO raise error
@@ -454,33 +459,33 @@
// return nil
//}
- if n.Proxy != nil {
+ if n.GetProxy() != nil {
log.Debug("invoking proxy PRE_UPDATE Callbacks")
- n.Proxy.InvokeCallbacks(PRE_UPDATE, false, branch.Latest.GetData(), data)
+ n.GetProxy().InvokeCallbacks(PRE_UPDATE, false, branch.GetLatest(), data)
}
- if !reflect.DeepEqual(branch.Latest.GetData(), data) {
+
+ if branch.GetLatest().GetData().(proto.Message).String() != data.(proto.Message).String() {
if strict {
// TODO: checkAccessViolations(data, Branch.GetLatest.data)
log.Debugf("checking access violations")
}
- rev := branch.Latest.UpdateData(data, branch)
- changes := []ChangeTuple{{POST_UPDATE, branch.Latest.GetData(), rev.GetData()}}
+
+ rev := branch.GetLatest().UpdateData(data, branch)
+ changes := []ChangeTuple{{POST_UPDATE, branch.GetLatest().GetData(), rev.GetData()}}
// FIXME VOL-1293: the following statement corrupts the kv when using a subproxy (commenting for now)
// FIXME VOL-1293 cont'd: need to figure out the required conditions otherwise we are not cleaning up entries
- //branch.Latest.Drop(branch.Txid, true)
+ //branch.GetLatest().Drop(branch.Txid, true)
- n.Root.Proxy = n.Proxy
- n.Root.MakeLatest(branch, rev, changes)
+ n.makeLatest(branch, rev, changes)
+
return rev
- } else {
- return branch.Latest
}
+
+ return branch.GetLatest()
}
-//
-// Add operation
-//
+// Add inserts a new node at the specified path with the provided data
func (n *node) Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
for strings.HasPrefix(path, "/") {
path = path[1:]
@@ -491,14 +496,13 @@
}
var branch *Branch
- var ok bool
if txid == "" {
- branch = n.Branches[NONE]
- } else if branch, ok = n.Branches[txid]; !ok {
+ branch = n.GetBranch(NONE)
+ } else if branch = n.GetBranch(txid); branch == nil {
branch = makeBranch(n)
}
- rev := branch.Latest
+ rev := branch.GetLatest()
partition := strings.SplitN(path, "/", 2)
name := partition[0]
@@ -510,37 +514,39 @@
}
field := ChildrenFields(n.Type)[name]
+
var children []Revision
if field.IsContainer {
if path == "" {
if field.Key != "" {
- if n.Proxy != nil {
+ if n.GetProxy() != nil {
log.Debug("invoking proxy PRE_ADD Callbacks")
- n.Proxy.InvokeCallbacks(PRE_ADD, false, data)
+ n.GetProxy().InvokeCallbacks(PRE_ADD, false, data)
}
- for _, v := range rev.GetChildren()[name] {
- revCopy := reflect.ValueOf(v).Interface().(Revision)
- children = append(children, revCopy)
- }
+ children = make([]Revision, len(rev.GetChildren()[name]))
+ copy(children, rev.GetChildren()[name])
+
_, key := GetAttributeValue(data, field.Key, 0)
+
if _, exists := n.findRevByKey(children, field.Key, key.String()); exists != nil {
// TODO raise error
log.Errorf("duplicate key found: %s", key.String())
- } else {
- childRev := n.MakeNode(data, txid).Latest(txid)
- children = append(children, childRev)
- rev := rev.UpdateChildren(name, children, branch)
- changes := []ChangeTuple{{POST_ADD, nil, rev.GetData()}}
- branch.Latest.Drop(txid, false)
- n.Root.MakeLatest(branch, rev, changes)
- return rev
+ return exists
}
+ childRev := n.MakeNode(data, txid).Latest(txid)
+ children = append(children, childRev)
+ rev = rev.UpdateChildren(name, children, branch)
+ changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
- } else {
- log.Errorf("cannot add to non-keyed container")
+ rev.Drop(txid, false)
+ n.makeLatest(branch, rev, changes)
+
+ return childRev
}
+ log.Errorf("cannot add to non-keyed container")
+
} else if field.Key != "" {
partition := strings.SplitN(path, "/", 2)
key := partition[0]
@@ -550,27 +556,33 @@
path = partition[1]
}
keyValue := field.KeyFromStr(key)
+
+ children = make([]Revision, len(rev.GetChildren()[name]))
copy(children, rev.GetChildren()[name])
+
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
+
childNode := childRev.GetNode()
newChildRev := childNode.Add(path, data, txid, makeBranch)
+
children[idx] = newChildRev
- rev := rev.UpdateChildren(name, children, branch)
- branch.Latest.Drop(txid, false)
- n.Root.MakeLatest(branch, rev, nil)
- return rev
+
+ rev = rev.UpdateChildren(name, branch.GetLatest().GetChildren()[name], branch)
+ rev.Drop(txid, false)
+ n.makeLatest(branch, rev.GetBranch().GetLatest(), nil)
+
+ return newChildRev
} else {
log.Errorf("cannot add to non-keyed container")
}
} else {
log.Errorf("cannot add to non-container field")
}
+
return nil
}
-//
-// Remove operation
-//
+// Remove eliminates a node at the specified path
func (n *node) Remove(path string, txid string, makeBranch MakeBranchFunction) Revision {
for strings.HasPrefix(path, "/") {
path = path[1:]
@@ -580,14 +592,13 @@
log.Errorf("cannot remove for non-container mode")
}
var branch *Branch
- var ok bool
if txid == "" {
- branch = n.Branches[NONE]
- } else if branch, ok = n.Branches[txid]; !ok {
+ branch = n.GetBranch(NONE)
+ } else if branch = n.GetBranch(txid); branch == nil {
branch = makeBranch(n)
}
- rev := branch.Latest
+ rev := branch.GetLatest()
partition := strings.SplitN(path, "/", 2)
name := partition[0]
@@ -613,42 +624,36 @@
path = partition[1]
}
keyValue := field.KeyFromStr(key)
+ children = make([]Revision, len(rev.GetChildren()[name]))
+ copy(children, rev.GetChildren()[name])
if path != "" {
- for _, v := range rev.GetChildren()[name] {
- newV := reflect.ValueOf(v).Interface().(Revision)
- children = append(children, newV)
- }
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
childNode := childRev.GetNode()
newChildRev := childNode.Remove(path, txid, makeBranch)
children[idx] = newChildRev
rev := rev.UpdateChildren(name, children, branch)
- branch.Latest.Drop(txid, false)
- n.Root.MakeLatest(branch, rev, nil)
- return rev
- } else {
- for _, v := range rev.GetChildren()[name] {
- newV := reflect.ValueOf(v).Interface().(Revision)
- children = append(children, newV)
- }
- idx, childRev := n.findRevByKey(children, field.Key, keyValue)
- if n.Proxy != nil {
- data := childRev.GetData()
- n.Proxy.InvokeCallbacks(PRE_REMOVE, false, data)
- postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, data, nil})
- } else {
- postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, childRev.GetData(), nil})
- }
- childRev.Drop(txid, true)
- children = append(children[:idx], children[idx+1:]...)
- rev := rev.UpdateChildren(name, children, branch)
- branch.Latest.Drop(txid, false)
- n.Root.MakeLatest(branch, rev, postAnnouncement)
+ branch.GetLatest().Drop(txid, false)
+ n.makeLatest(branch, rev, nil)
return rev
}
- } else {
- log.Errorf("cannot add to non-keyed container")
+ idx, childRev := n.findRevByKey(children, field.Key, keyValue)
+ if n.GetProxy() != nil {
+ data := childRev.GetData()
+ n.GetProxy().InvokeCallbacks(PRE_REMOVE, false, data)
+ postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, data, nil})
+ } else {
+ postAnnouncement = append(postAnnouncement, ChangeTuple{POST_REMOVE, childRev.GetData(), nil})
+ }
+ childRev.Drop(txid, true)
+ children = append(children[:idx], children[idx+1:]...)
+ rev := rev.UpdateChildren(name, children, branch)
+ branch.GetLatest().Drop(txid, false)
+ n.makeLatest(branch, rev, postAnnouncement)
+ return rev
+
}
+ log.Errorf("cannot add to non-keyed container")
+
} else {
log.Errorf("cannot add to non-container field")
}
@@ -658,16 +663,21 @@
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+// MakeBranchFunction is a type for function references intented to create a branch
type MakeBranchFunction func(*node) *Branch
+// MakeBranch creates a new branch for the provided transaction id
func (n *node) MakeBranch(txid string) *Branch {
- branchPoint := n.Branches[NONE].Latest
+ branchPoint := n.GetBranch(NONE).GetLatest()
branch := NewBranch(n, txid, branchPoint, true)
- n.Branches[txid] = branch
+ n.SetBranch(txid, branch)
return branch
}
+// DeleteBranch removes a branch with the specified id
func (n *node) DeleteBranch(txid string) {
+ n.Lock()
+ defer n.Unlock()
delete(n.Branches, txid)
}
@@ -684,19 +694,20 @@
return f
}
+// MergeBranch will integrate the contents of a transaction branch within the latest branch of a given node
func (n *node) MergeBranch(txid string, dryRun bool) (Revision, error) {
- srcBranch := n.Branches[txid]
- dstBranch := n.Branches[NONE]
+ srcBranch := n.GetBranch(txid)
+ dstBranch := n.GetBranch(NONE)
forkRev := srcBranch.Origin
- srcRev := srcBranch.Latest
- dstRev := dstBranch.Latest
+ srcRev := srcBranch.GetLatest()
+ dstRev := dstBranch.GetLatest()
rev, changes := Merge3Way(forkRev, srcRev, dstRev, n.mergeChild(txid, dryRun), dryRun)
if !dryRun {
- n.Root.MakeLatest(dstBranch, rev, changes)
- delete(n.Branches, txid)
+ n.makeLatest(dstBranch, rev, changes)
+ n.DeleteBranch(txid)
}
// TODO: return proper error when one occurs
@@ -707,8 +718,8 @@
//func (n *node) diff(hash1, hash2, txid string) {
// branch := n.Branches[txid]
-// rev1 := branch.get(hash1)
-// rev2 := branch.get(hash2)
+// rev1 := branch.GetHash(hash1)
+// rev2 := branch.GetHash(hash2)
//
// if rev1.GetHash() == rev2.GetHash() {
// // empty patch
@@ -740,14 +751,12 @@
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ node Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-func (n *node) GetProxy(path string, exclusive bool) *Proxy {
- //r := NewRoot(n.Type, n.KvStore)
- //r.node = n
- //r.KvStore = n.KvStore
-
- return n.getProxy(path, path, exclusive)
+// CreateProxy returns a reference to a sub-tree of the data model
+func (n *node) CreateProxy(path string, exclusive bool) *Proxy {
+ return n.createProxy(path, path, exclusive)
}
-func (n *node) getProxy(path string, fullPath string, exclusive bool) *Proxy {
+
+func (n *node) createProxy(path string, fullPath string, exclusive bool) *Proxy {
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -755,7 +764,7 @@
return n.makeProxy(path, fullPath, exclusive)
}
- rev := n.Branches[NONE].Latest
+ rev := n.GetBranch(NONE).GetLatest()
partition := strings.SplitN(path, "/", 2)
name := partition[0]
if len(partition) < 2 {
@@ -765,7 +774,7 @@
}
field := ChildrenFields(n.Type)[name]
- if field != nil && field.IsContainer {
+ if field.IsContainer {
if path == "" {
log.Error("cannot proxy a container field")
} else if field.Key != "" {
@@ -778,31 +787,31 @@
}
keyValue := field.KeyFromStr(key)
var children []Revision
- for _, v := range rev.GetChildren()[name] {
- newV := reflect.ValueOf(v).Interface().(Revision)
- children = append(children, newV)
- }
+ children = make([]Revision, len(rev.GetChildren()[name]))
+ copy(children, rev.GetChildren()[name])
_, childRev := n.findRevByKey(children, field.Key, keyValue)
childNode := childRev.GetNode()
- return childNode.getProxy(path, fullPath, exclusive)
+ return childNode.createProxy(path, fullPath, exclusive)
} else {
log.Error("cannot index into container with no keys")
}
} else {
childRev := rev.GetChildren()[name][0]
childNode := childRev.GetNode()
- return childNode.getProxy(path, fullPath, exclusive)
+ return childNode.createProxy(path, fullPath, exclusive)
}
return nil
}
func (n *node) makeProxy(path string, fullPath string, exclusive bool) *Proxy {
+ n.Lock()
+ defer n.Unlock()
r := &root{
node: n,
- Callbacks: n.Root.Callbacks,
- NotificationCallbacks: n.Root.NotificationCallbacks,
+ Callbacks: n.Root.GetCallbacks(),
+ NotificationCallbacks: n.Root.GetNotificationCallbacks(),
DirtyNodes: n.Root.DirtyNodes,
KvStore: n.Root.KvStore,
Loading: n.Root.Loading,
@@ -810,16 +819,19 @@
}
if n.Proxy == nil {
- n.Proxy = NewProxy(r, path, fullPath, exclusive)
+ n.Proxy = NewProxy(r, n, path, fullPath, exclusive)
} else {
if n.Proxy.Exclusive {
log.Error("node is already owned exclusively")
}
}
+
return n.Proxy
}
func (n *node) makeEventBus() *EventBus {
+ n.Lock()
+ defer n.Unlock()
if n.EventBus == nil {
n.EventBus = NewEventBus()
}
@@ -828,14 +840,44 @@
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Persistence Loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+// LoadLatest accesses the persistent storage to construct the data model based on the stored information
func (n *node) LoadLatest(hash string) {
branch := NewBranch(n, "", nil, n.AutoPrune)
pr := &PersistedRevision{}
- rev := pr.Load(branch, n.Root.KvStore, n.Type, hash)
+ rev := pr.Load(branch, n.GetRoot().KvStore, n.Type, hash)
n.makeLatest(branch, rev, nil)
- n.Branches[NONE] = branch
+ n.SetBranch(NONE, branch)
}
-func (n *node) ExecuteCallbacks() {
- n.Root.ExecuteCallbacks()
+func (n *node) SetProxy(proxy *Proxy) {
+ n.Lock()
+ defer n.Unlock()
+ n.Proxy = proxy
+}
+
+func (n *node) GetProxy() *Proxy {
+ n.Lock()
+ defer n.Unlock()
+ return n.Proxy
+}
+
+func (n *node) GetBranch(key string) *Branch {
+ n.Lock()
+ defer n.Unlock()
+ if branch, exists := n.Branches[key]; exists {
+ return branch
+ }
+ return nil
+}
+
+func (n *node) SetBranch(key string, branch *Branch) {
+ n.Lock()
+ defer n.Unlock()
+ n.Branches[key] = branch
+}
+
+func (n *node) GetRoot() *root {
+ n.Lock()
+ defer n.Unlock()
+ return n.Root
}
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 29cadf7..811e35d 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -15,6 +15,7 @@
*/
package model
+import "C"
import (
"bytes"
"crypto/md5"
@@ -22,13 +23,27 @@
"github.com/golang/protobuf/proto"
"reflect"
"sort"
+ "sync"
)
-var (
- RevisionCache = make(map[string]interface{})
-)
+type revCacheSingleton struct {
+ sync.RWMutex
+ Cache map[string]interface{}
+}
+
+var revCacheInstance *revCacheSingleton
+var revCacheOnce sync.Once
+
+func GetRevCache() *revCacheSingleton {
+ revCacheOnce.Do(func() {
+ revCacheInstance = &revCacheSingleton{Cache: make(map[string]interface{})}
+ })
+ return revCacheInstance
+}
type NonPersistedRevision struct {
+ mutex sync.RWMutex
+ Root *root
Config *DataRevision
Children map[string][]Revision
Hash string
@@ -36,53 +51,73 @@
WeakRef string
}
-func NewNonPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
- cr := &NonPersistedRevision{}
- cr.Branch = branch
- cr.Config = NewDataRevision(data)
- cr.Children = children
- cr.Finalize()
-
- return cr
+func NewNonPersistedRevision(root *root, branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ r := &NonPersistedRevision{}
+ r.Root = root
+ r.Branch = branch
+ r.Config = NewDataRevision(root, data)
+ r.Children = children
+ r.Finalize()
+ return r
}
func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Config = config
}
func (npr *NonPersistedRevision) GetConfig() *DataRevision {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
return npr.Config
}
func (npr *NonPersistedRevision) SetChildren(children map[string][]Revision) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Children = children
}
func (npr *NonPersistedRevision) GetChildren() map[string][]Revision {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
return npr.Children
}
func (npr *NonPersistedRevision) SetHash(hash string) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Hash = hash
}
func (npr *NonPersistedRevision) GetHash() string {
+ //npr.mutex.Lock()
+ //defer npr.mutex.Unlock()
return npr.Hash
}
func (npr *NonPersistedRevision) ClearHash() {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Hash = ""
}
func (npr *NonPersistedRevision) SetBranch(branch *Branch) {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
npr.Branch = branch
}
func (npr *NonPersistedRevision) GetBranch() *Branch {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
return npr.Branch
}
func (npr *NonPersistedRevision) GetData() interface{} {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
if npr.Config == nil {
return nil
}
@@ -90,19 +125,24 @@
}
func (npr *NonPersistedRevision) GetNode() *node {
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
return npr.Branch.Node
}
func (npr *NonPersistedRevision) Finalize() {
- npr.SetHash(npr.hashContent())
+ GetRevCache().Lock()
+ defer GetRevCache().Unlock()
- if _, exists := RevisionCache[npr.Hash]; !exists {
- RevisionCache[npr.Hash] = npr
+ npr.Hash = npr.hashContent()
+
+ if _, exists := GetRevCache().Cache[npr.Hash]; !exists {
+ GetRevCache().Cache[npr.Hash] = npr
}
- if _, exists := RevisionCache[npr.Config.Hash]; !exists {
- RevisionCache[npr.Config.Hash] = npr.Config
+ if _, exists := GetRevCache().Cache[npr.Config.Hash]; !exists {
+ GetRevCache().Cache[npr.Config.Hash] = npr.Config
} else {
- npr.Config = RevisionCache[npr.Config.Hash].(*DataRevision)
+ npr.Config = GetRevCache().Cache[npr.Config.Hash].(*DataRevision)
}
}
@@ -114,12 +154,13 @@
buffer.WriteString(npr.Config.Hash)
}
- for key, _ := range npr.Children {
+ for key := range npr.Children {
childrenKeys = append(childrenKeys, key)
}
+
sort.Strings(childrenKeys)
- if npr.Children != nil && len(npr.Children) > 0 {
+ if len(npr.Children) > 0 {
// Loop through sorted Children keys
for _, key := range childrenKeys {
for _, child := range npr.Children[key] {
@@ -134,30 +175,38 @@
}
func (npr *NonPersistedRevision) Get(depth int) interface{} {
- originalData := npr.GetData()
- data := reflect.ValueOf(originalData).Interface()
+ // 1. Clone the data to avoid any concurrent access issues
+ // 2. The current rev might still be pointing to an old config
+ // thus, force the revision to get its latest value
+ latestRev := npr.GetBranch().GetLatest()
+ originalData := proto.Clone(latestRev.GetData().(proto.Message))
+
+ data := originalData
+ // Get back to the interface type
+ //data := reflect.ValueOf(originalData).Interface()
if depth != 0 {
- for fieldName, field := range ChildrenFields(npr.GetData()) {
+ for fieldName, field := range ChildrenFields(latestRev.GetData()) {
childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
if field.IsContainer {
- for _, rev := range npr.Children[fieldName] {
+ for _, rev := range latestRev.GetChildren()[fieldName] {
childData := rev.Get(depth - 1)
foundEntry := false
for i := 0; i < childDataHolder.Len(); i++ {
- if reflect.DeepEqual(childDataHolder.Index(i).Interface(), childData) {
+ cdh_if := childDataHolder.Index(i).Interface()
+ if cdh_if.(proto.Message).String() == childData.(proto.Message).String() {
foundEntry = true
break
}
}
if !foundEntry {
- // avoid duplicates by adding if the child was not found in the holder
+ // avoid duplicates by adding it only if the child was not found in the holder
childDataHolder = reflect.Append(childDataHolder, reflect.ValueOf(childData))
}
}
} else {
- if revs := npr.Children[fieldName]; revs != nil && len(revs) > 0 {
- rev := npr.Children[fieldName][0]
+ if revs := latestRev.GetChildren()[fieldName]; revs != nil && len(revs) > 0 {
+ rev := latestRev.GetChildren()[fieldName][0]
if rev != nil {
childData := rev.Get(depth - 1)
if reflect.TypeOf(childData) == reflect.TypeOf(childDataHolder.Interface()) {
@@ -174,52 +223,81 @@
result := data
if result != nil {
- clone := proto.Clone(data.(proto.Message))
- result = reflect.ValueOf(clone).Interface()
+ // We need to send back a copy of the retrieved object
+ result = proto.Clone(data.(proto.Message))
}
return result
}
func (npr *NonPersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
- // TODO: Need to keep the hash for the old revision.
- // TODO: This will allow us to get rid of the unnecessary data
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
- newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
- newRev.SetBranch(branch)
- newRev.SetConfig(NewDataRevision(data))
+ newRev := NonPersistedRevision{}
+ newRev.Config = NewDataRevision(npr.Root, data)
+ newRev.Hash = npr.Hash
+ newRev.Branch = branch
+
+ newRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range npr.Children {
+ newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
+ }
+
newRev.Finalize()
return &newRev
}
func (npr *NonPersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
- newChildren := make(map[string][]Revision)
- for entryName, childrenEntry := range npr.Children {
- for _, revisionEntry := range childrenEntry {
- newEntry := reflect.ValueOf(revisionEntry).Interface().(Revision)
- newChildren[entryName] = append(newChildren[entryName], newEntry)
- }
- }
- newChildren[name] = children
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
- newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
- newRev.SetBranch(branch)
- newRev.SetChildren(newChildren)
+ newRev := NonPersistedRevision{}
+ newRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range npr.Children {
+ newRev.Children[entryName] = make([]Revision, len(childrenEntry))
+ copy(newRev.Children[entryName], childrenEntry)
+ }
+
+ newRev.Children[name] = make([]Revision, len(children))
+ copy(newRev.Children[name], children)
+
+ newRev.Config = NewDataRevision(npr.Root, npr.Config.Data)
+ newRev.Hash = npr.Hash
+ newRev.Branch = branch
newRev.Finalize()
return &newRev
}
func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
- newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
- newRev.SetBranch(branch)
- newRev.SetChildren(children)
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+
+ newRev := &NonPersistedRevision{}
+ newRev.Config = npr.Config
+ newRev.Hash = npr.Hash
+ newRev.Branch = branch
+ newRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range npr.Children {
+ newRev.Children[entryName] = make([]Revision, len(childrenEntry))
+ copy(newRev.Children[entryName], childrenEntry)
+ }
newRev.Finalize()
- return &newRev
+ return newRev
}
func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
- //npr.SetConfig(nil)
+ GetRevCache().Lock()
+ defer GetRevCache().Unlock()
+
+ npr.mutex.Lock()
+ defer npr.mutex.Unlock()
+
+ if includeConfig {
+ delete(GetRevCache().Cache, npr.Config.Hash)
+ }
+ delete(GetRevCache().Cache, npr.Hash)
}
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index b62c569..3682694 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -23,25 +24,30 @@
"github.com/opencord/voltha-go/common/log"
"io/ioutil"
"reflect"
+ "runtime/debug"
+ "sync"
"time"
)
+// PersistedRevision holds information of revision meant to be saved in a persistent storage
type PersistedRevision struct {
+ mutex sync.RWMutex
Revision
Compress bool
kvStore *Backend
}
+// NewPersistedRevision creates a new instance of a PersistentRevision structure
func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
pr := &PersistedRevision{}
- pr.kvStore = branch.Node.Root.KvStore
- pr.Revision = NewNonPersistedRevision(branch, data, children)
+ pr.kvStore = branch.Node.GetRoot().KvStore
+ pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
pr.Finalize()
return pr
}
+// Finalize is responsible of saving the revision in the persistent storage
func (pr *PersistedRevision) Finalize() {
- //pr.Revision.Finalize()
pr.store()
}
@@ -55,6 +61,7 @@
return
}
if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
+ log.Debugf("Entry already exists - hash:%s, stack: %s", pr.Revision.GetHash(), string(debug.Stack()))
return
}
@@ -84,10 +91,17 @@
w.Close()
blob = b.Bytes()
}
- pr.kvStore.Put(pr.GetHash(), blob)
+ if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
+ log.Warnf("Problem storing revision - error: %s, hash: %s, data: %s", err.Error(), pr.GetHash(),
+ string(blob))
+ } else {
+ log.Debugf("Stored entry - hash:%s, blob: %s, stack: %s", pr.Revision.GetHash(), string(blob),
+ string(debug.Stack()))
+ }
}
}
+// Load retrieves a revision from th persistent storage
func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
blob, _ := kvStore.Get(hash)
@@ -132,13 +146,10 @@
return rev
}
-func (pr *PersistedRevision) assignValue(a, b Revision) Revision {
- a = b
- return a
-}
-
+// storeConfig saves the data associated to a revision in the persistent storage
func (pr *PersistedRevision) storeConfig() {
if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
+ log.Debugf("Config already exists - hash:%s, stack: %s", pr.GetConfig().Hash, string(debug.Stack()))
return
}
if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
@@ -151,10 +162,19 @@
w.Close()
blob = b.Bytes()
}
- pr.kvStore.Put(pr.GetConfig().Hash, blob)
+
+ if err := pr.kvStore.Put(pr.GetConfig().Hash, blob); err != nil {
+ log.Warnf("Problem storing revision config - error: %s, hash: %s, data: %+v", err.Error(),
+ pr.GetConfig().Hash,
+ pr.GetConfig().Data)
+ } else {
+ log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetConfig().Hash, pr.GetConfig().Data,
+ string(debug.Stack()))
+ }
}
}
+// loadConfig restores the data associated to a revision from the persistent storage
func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
blob, _ := kvStore.Get(hash)
start := time.Now()
@@ -185,9 +205,11 @@
return data.Interface()
}
+// UpdateData modifies the information in the data model and saves it in the persistent storage
func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
newNPR := pr.Revision.UpdateData(data, branch)
+ log.Debugf("Updating data %+v", data)
newPR := &PersistedRevision{
Revision: newNPR,
Compress: pr.Compress,
@@ -199,6 +221,7 @@
return newPR
}
+// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
newNPR := pr.Revision.UpdateChildren(name, children, branch)
@@ -213,6 +236,7 @@
return newPR
}
+// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
newNPR := pr.Revision.UpdateAllChildren(children, branch)
@@ -230,6 +254,8 @@
// Drop takes care of eliminating a revision hash that is no longer needed
// and its associated config when required
func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
if pr.kvStore != nil && txid == "" {
if includeConfig {
log.Debugf("removing rev config - hash: %s", pr.GetConfig().Hash)
@@ -252,4 +278,6 @@
}
log.Debugf("Attempted to remove revision:%s linked to transaction:%s", pr.GetHash(), txid)
}
+
+ pr.Revision.Drop(txid, includeConfig)
}
diff --git a/db/model/profiling.go b/db/model/profiling.go
index b93d2fc..874b035 100644
--- a/db/model/profiling.go
+++ b/db/model/profiling.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -20,7 +21,9 @@
"sync"
)
+// Profiling is used to store performance details collected at runtime
type profiling struct {
+ sync.RWMutex
DatabaseRetrieveTime float64
DatabaseRetrieveCount int
InMemoryModelTime float64
@@ -31,37 +34,65 @@
InMemoryLockCount int
}
-var profiling_instance *profiling
-var profiling_once sync.Once
+var profilingInstance *profiling
+var profilingOnce sync.Once
+// GetProfiling returns a singleton instance of the Profiling structure
func GetProfiling() *profiling {
- profiling_once.Do(func() {
- profiling_instance = &profiling{}
+ profilingOnce.Do(func() {
+ profilingInstance = &profiling{}
})
- return profiling_instance
+ return profilingInstance
}
+// AddToDatabaseRetrieveTime appends a time period to retrieve data from the database
func (p *profiling) AddToDatabaseRetrieveTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
p.DatabaseRetrieveTime += period
- p.DatabaseRetrieveCount += 1
+ p.DatabaseRetrieveCount++
}
+
+// AddToInMemoryModelTime appends a time period to construct/deconstruct data in memory
func (p *profiling) AddToInMemoryModelTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
p.InMemoryModelTime += period
- p.InMemoryModelCount += 1
+ p.InMemoryModelCount++
}
+
+// AddToInMemoryProcessTime appends a time period to process data
func (p *profiling) AddToInMemoryProcessTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
p.InMemoryProcessTime += period
}
+
+// AddToDatabaseStoreTime appends a time period to store data in the database
func (p *profiling) AddToDatabaseStoreTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
p.DatabaseStoreTime += period
}
+// AddToInMemoryLockTime appends a time period when a code block was locked
func (p *profiling) AddToInMemoryLockTime(period float64) {
+ p.Lock()
+ defer p.Unlock()
+
p.InMemoryLockTime += period
- p.InMemoryLockCount += 1
+ p.InMemoryLockCount++
}
+// Reset initializes the profile counters
func (p *profiling) Reset() {
+ p.Lock()
+ defer p.Unlock()
+
p.DatabaseRetrieveTime = 0
p.DatabaseRetrieveCount = 0
p.InMemoryModelTime = 0
@@ -72,7 +103,11 @@
p.InMemoryLockCount = 0
}
+// Report will provide the current profile counter status
func (p *profiling) Report() {
+ p.Lock()
+ defer p.Unlock()
+
log.Infof("[ Profiling Report ]")
log.Infof("Database Retrieval : %f", p.DatabaseRetrieveTime)
log.Infof("Database Retrieval Count : %d", p.DatabaseRetrieveCount)
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 3827ff3..65da561 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -26,6 +27,7 @@
"sync"
)
+// OperationContext holds details on the information used during an operation
type OperationContext struct {
Path string
Data interface{}
@@ -33,6 +35,7 @@
ChildKey string
}
+// NewOperationContext instantiates a new OperationContext structure
func NewOperationContext(path string, data interface{}, fieldName string, childKey string) *OperationContext {
oc := &OperationContext{
Path: path,
@@ -43,27 +46,32 @@
return oc
}
+// Update applies new data to the context structure
func (oc *OperationContext) Update(data interface{}) *OperationContext {
oc.Data = data
return oc
}
+// Proxy holds the information for a specific location with the data model
type Proxy struct {
- sync.Mutex
+ sync.RWMutex
Root *root
+ Node *node
Path string
FullPath string
Exclusive bool
- Callbacks map[CallbackType]map[string]CallbackTuple
+ Callbacks map[CallbackType]map[string]*CallbackTuple
}
-func NewProxy(root *root, path string, fullPath string, exclusive bool) *Proxy {
- callbacks := make(map[CallbackType]map[string]CallbackTuple)
+// NewProxy instantiates a new proxy to a specific location
+func NewProxy(root *root, node *node, path string, fullPath string, exclusive bool) *Proxy {
+ callbacks := make(map[CallbackType]map[string]*CallbackTuple)
if fullPath == "/" {
fullPath = ""
}
p := &Proxy{
Root: root,
+ Node: node,
Exclusive: exclusive,
Path: path,
FullPath: fullPath,
@@ -72,9 +80,73 @@
return p
}
+// GetRoot returns the root attribute of the proxy
+func (p *Proxy) GetRoot() *root {
+ p.Lock()
+ defer p.Unlock()
+ return p.Root
+}
+
+// getPath returns the path attribute of the proxy
+func (p *Proxy) getPath() string {
+ p.Lock()
+ defer p.Unlock()
+ return p.Path
+}
+
+// getFullPath returns the full path attribute of the proxy
+func (p *Proxy) getFullPath() string {
+ p.Lock()
+ defer p.Unlock()
+ return p.FullPath
+}
+
+// getCallbacks returns the full list of callbacks associated to the proxy
+func (p *Proxy) getCallbacks(callbackType CallbackType) map[string]*CallbackTuple {
+ p.Lock()
+ defer p.Unlock()
+ if cb, exists := p.Callbacks[callbackType]; exists {
+ return cb
+ }
+ return nil
+}
+
+// getCallback returns a specific callback matching the type and function hash
+func (p *Proxy) getCallback(callbackType CallbackType, funcHash string) *CallbackTuple {
+ p.Lock()
+ defer p.Unlock()
+ if tuple, exists := p.Callbacks[callbackType][funcHash]; exists {
+ return tuple
+ }
+ return nil
+}
+
+// setCallbacks applies a callbacks list to a type
+func (p *Proxy) setCallbacks(callbackType CallbackType, callbacks map[string]*CallbackTuple) {
+ p.Lock()
+ defer p.Unlock()
+ p.Callbacks[callbackType] = callbacks
+}
+
+// setCallback applies a callback to a type and hash value
+func (p *Proxy) setCallback(callbackType CallbackType, funcHash string, tuple *CallbackTuple) {
+ p.Lock()
+ defer p.Unlock()
+ p.Callbacks[callbackType][funcHash] = tuple
+}
+
+// DeleteCallback removes a callback matching the type and hash
+func (p *Proxy) DeleteCallback(callbackType CallbackType, funcHash string) {
+ p.Lock()
+ defer p.Unlock()
+ delete(p.Callbacks[callbackType], funcHash)
+}
+
+// parseForControlledPath verifies if a proxy path matches a pattern
+// for locations that need to be access controlled.
func (p *Proxy) parseForControlledPath(path string) (pathLock string, controlled bool) {
- // TODO: Add other path prefixes they may need control
- if strings.HasPrefix(path, "/devices") {
+ // TODO: Add other path prefixes that may need control
+ if strings.HasPrefix(path, "/devices") || strings.HasPrefix(path, "/logical_devices"){
split := strings.SplitN(path, "/", -1)
switch len(split) {
case 2:
@@ -91,28 +163,29 @@
return pathLock, controlled
}
+// Get will retrieve information from the data model at the specified path location
func (p *Proxy) Get(path string, depth int, deep bool, txid string) interface{} {
var effectivePath string
if path == "/" {
- effectivePath = p.FullPath
+ effectivePath = p.getFullPath()
} else {
- effectivePath = p.FullPath + path
+ effectivePath = p.getFullPath() + path
}
pathLock, controlled := p.parseForControlledPath(effectivePath)
- var pac interface{}
- var exists bool
+ log.Debugf("Path: %s, Effective: %s, PathLock: %s", path, effectivePath, pathLock)
- p.Lock()
- if pac, exists = GetProxyAccessControl().Cache.LoadOrStore(path, NewProxyAccessControl(p, pathLock)); !exists {
- defer GetProxyAccessControl().Cache.Delete(pathLock)
- }
- p.Unlock()
+ pac := PAC().ReservePath(effectivePath, p, pathLock)
+ defer PAC().ReleasePath(pathLock)
+ pac.SetProxy(p)
- return pac.(ProxyAccessControl).Get(path, depth, deep, txid, controlled)
+ rv := pac.Get(path, depth, deep, txid, controlled)
+
+ return rv
}
+// Update will modify information in the data model at the specified location with the provided data
func (p *Proxy) Update(path string, data interface{}, strict bool, txid string) interface{} {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
@@ -121,27 +194,54 @@
var fullPath string
var effectivePath string
if path == "/" {
- fullPath = p.Path
- effectivePath = p.FullPath
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
} else {
- fullPath = p.Path + path
- effectivePath = p.FullPath + path
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath()
}
pathLock, controlled := p.parseForControlledPath(effectivePath)
- var pac interface{}
- var exists bool
+ log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
- p.Lock()
- if pac, exists = GetProxyAccessControl().Cache.LoadOrStore(path, NewProxyAccessControl(p, pathLock)); !exists {
- defer GetProxyAccessControl().Cache.Delete(pathLock)
- }
- p.Unlock()
+ pac := PAC().ReservePath(effectivePath, p, pathLock)
+ defer PAC().ReleasePath(pathLock)
+ pac.SetProxy(p)
- return pac.(ProxyAccessControl).Update(fullPath, data, strict, txid, controlled)
+ return pac.Update(fullPath, data, strict, txid, controlled)
}
+// AddWithID will insert new data at specified location.
+// This method also allows the user to specify the ID of the data entry to ensure
+// that access control is active while inserting the information.
+func (p *Proxy) AddWithID(path string, id string, data interface{}, txid string) interface{} {
+ if !strings.HasPrefix(path, "/") {
+ log.Errorf("invalid path: %s", path)
+ return nil
+ }
+ var fullPath string
+ var effectivePath string
+ if path == "/" {
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
+ } else {
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path + "/" + id
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
+
+ pac := PAC().ReservePath(path, p, pathLock)
+ defer PAC().ReleasePath(pathLock)
+ pac.SetProxy(p)
+
+ return pac.Add(fullPath, data, txid, controlled)
+}
+
+// Add will insert new data at specified location.
func (p *Proxy) Add(path string, data interface{}, txid string) interface{} {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
@@ -150,27 +250,25 @@
var fullPath string
var effectivePath string
if path == "/" {
- fullPath = p.Path
- effectivePath = p.FullPath
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
} else {
- fullPath = p.Path + path
- effectivePath = p.FullPath + path
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path
}
pathLock, controlled := p.parseForControlledPath(effectivePath)
- var pac interface{}
- var exists bool
+ log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
- p.Lock()
- if pac, exists = GetProxyAccessControl().Cache.LoadOrStore(path, NewProxyAccessControl(p, pathLock)); !exists {
- defer GetProxyAccessControl().Cache.Delete(pathLock)
- }
- p.Unlock()
+ pac := PAC().ReservePath(path, p, pathLock)
+ defer PAC().ReleasePath(pathLock)
+ pac.SetProxy(p)
- return pac.(ProxyAccessControl).Add(fullPath, data, txid, controlled)
+ return pac.Add(fullPath, data, txid, controlled)
}
+// Remove will delete an entry at the specified location
func (p *Proxy) Remove(path string, txid string) interface{} {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
@@ -179,83 +277,99 @@
var fullPath string
var effectivePath string
if path == "/" {
- fullPath = p.Path
- effectivePath = p.FullPath
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
} else {
- fullPath = p.Path + path
- effectivePath = p.FullPath + path
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path
}
pathLock, controlled := p.parseForControlledPath(effectivePath)
- var pac interface{}
- var exists bool
+ log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
- p.Lock()
- if pac, exists = GetProxyAccessControl().Cache.LoadOrStore(path, NewProxyAccessControl(p, pathLock)); !exists {
- defer GetProxyAccessControl().Cache.Delete(pathLock)
- }
- p.Unlock()
+ pac := PAC().ReservePath(effectivePath, p, pathLock)
+ defer PAC().ReleasePath(pathLock)
+ pac.SetProxy(p)
- return pac.(ProxyAccessControl).Remove(fullPath, txid, controlled)
+ return pac.Remove(fullPath, txid, controlled)
}
+// OpenTransaction creates a new transaction branch to isolate operations made to the data model
func (p *Proxy) OpenTransaction() *Transaction {
- txid := p.Root.MakeTxBranch()
+ txid := p.GetRoot().MakeTxBranch()
return NewTransaction(p, txid)
}
+// commitTransaction will apply and merge modifications made in the transaction branch to the data model
func (p *Proxy) commitTransaction(txid string) {
- p.Root.FoldTxBranch(txid)
+ p.GetRoot().FoldTxBranch(txid)
}
+// cancelTransaction will terminate a transaction branch along will all changes within it
func (p *Proxy) cancelTransaction(txid string) {
- p.Root.DeleteTxBranch(txid)
+ p.GetRoot().DeleteTxBranch(txid)
}
+// CallbackFunction is a type used to define callback functions
type CallbackFunction func(args ...interface{}) interface{}
+
+// CallbackTuple holds the function and arguments details of a callback
type CallbackTuple struct {
callback CallbackFunction
args []interface{}
}
-func (tuple *CallbackTuple) Execute(contextArgs interface{}) interface{} {
+// Execute will process the a callback with its provided arguments
+func (tuple *CallbackTuple) Execute(contextArgs []interface{}) interface{} {
args := []interface{}{}
- args = append(args, tuple.args...)
- if contextArgs != nil {
- args = append(args, contextArgs)
+
+ for _, ta := range tuple.args {
+ args = append(args, ta)
}
+
+ if contextArgs != nil {
+ for _, ca := range contextArgs {
+ args = append(args, ca)
+ }
+ }
+
return tuple.callback(args...)
}
+// RegisterCallback associates a callback to the proxy
func (p *Proxy) RegisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
- if _, exists := p.Callbacks[callbackType]; !exists {
- p.Callbacks[callbackType] = make(map[string]CallbackTuple)
+ if p.getCallbacks(callbackType) == nil {
+ p.setCallbacks(callbackType, make(map[string]*CallbackTuple))
}
funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
log.Debugf("value of function: %s", funcName)
funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
- p.Callbacks[callbackType][funcHash] = CallbackTuple{callback, args}
+ p.setCallback(callbackType, funcHash, &CallbackTuple{callback, args})
}
+// UnregisterCallback removes references to a callback within a proxy
func (p *Proxy) UnregisterCallback(callbackType CallbackType, callback CallbackFunction, args ...interface{}) {
- if _, exists := p.Callbacks[callbackType]; !exists {
+ if p.getCallbacks(callbackType) == nil {
log.Errorf("no such callback type - %s", callbackType.String())
return
}
- // TODO: Not sure if this is the best way to do it.
+
funcName := runtime.FuncForPC(reflect.ValueOf(callback).Pointer()).Name()
- log.Debugf("value of function: %s", funcName)
funcHash := fmt.Sprintf("%x", md5.Sum([]byte(funcName)))[:12]
- if _, exists := p.Callbacks[callbackType][funcHash]; !exists {
+
+ log.Debugf("value of function: %s", funcName)
+
+ if p.getCallback(callbackType, funcHash) == nil {
log.Errorf("function with hash value: '%s' not registered with callback type: '%s'", funcHash, callbackType)
return
}
- delete(p.Callbacks[callbackType], funcHash)
+
+ p.DeleteCallback(callbackType, funcHash)
}
-func (p *Proxy) invoke(callback CallbackTuple, context ...interface{}) (result interface{}, err error) {
+func (p *Proxy) invoke(callback *CallbackTuple, context []interface{}) (result interface{}, err error) {
defer func() {
if r := recover(); r != nil {
errStr := fmt.Sprintf("callback error occurred: %+v", r)
@@ -269,6 +383,7 @@
return result, err
}
+// InvokeCallbacks executes all callbacks associated to a specific type
func (p *Proxy) InvokeCallbacks(args ...interface{}) (result interface{}) {
callbackType := args[0].(CallbackType)
proceedOnError := args[1].(bool)
@@ -276,8 +391,9 @@
var err error
- if _, exists := p.Callbacks[callbackType]; exists {
- for _, callback := range p.Callbacks[callbackType] {
+ if callbacks := p.getCallbacks(callbackType); callbacks != nil {
+ p.Lock()
+ for _, callback := range callbacks {
if result, err = p.invoke(callback, context); err != nil {
if !proceedOnError {
log.Info("An error occurred. Stopping callback invocation")
@@ -286,6 +402,8 @@
log.Info("An error occurred. Invoking next callback")
}
}
+ p.Unlock()
}
+
return result
}
diff --git a/db/model/proxy_access_control.go b/db/model/proxy_access_control.go
index 7f4fad5..e9a4ffa 100644
--- a/db/model/proxy_access_control.go
+++ b/db/model/proxy_access_control.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -22,29 +23,60 @@
"time"
)
-type _singletonProxyAccessControl struct {
- Cache sync.Map
+type singletonProxyAccessControl struct {
+ sync.RWMutex
+ cache map[string]ProxyAccessControl
}
-var _instanceProxyAccessControl *_singletonProxyAccessControl
-var _onceProxyAccessControl sync.Once
+var instanceProxyAccessControl *singletonProxyAccessControl
+var onceProxyAccessControl sync.Once
-func GetProxyAccessControl() *_singletonProxyAccessControl {
- _onceProxyAccessControl.Do(func() {
- _instanceProxyAccessControl = &_singletonProxyAccessControl{}
+// PAC provides access to the proxy access control singleton instance
+func PAC() *singletonProxyAccessControl {
+ onceProxyAccessControl.Do(func() {
+ instanceProxyAccessControl = &singletonProxyAccessControl{cache: make(map[string]ProxyAccessControl)}
})
- return _instanceProxyAccessControl
+ return instanceProxyAccessControl
}
+// ReservePath will apply access control for a specific path within the model
+func (singleton *singletonProxyAccessControl) ReservePath(path string, proxy *Proxy, pathLock string) ProxyAccessControl {
+ singleton.Lock()
+ defer singleton.Unlock()
+ var pac ProxyAccessControl
+ var exists bool
+ if pac, exists = singleton.cache[path]; !exists {
+ pac = NewProxyAccessControl(proxy, pathLock)
+ singleton.cache[path] = pac
+ }
+
+ if exists {
+ log.Debugf("PAC exists for path: %s... re-using", path)
+ } else {
+ log.Debugf("PAC does not exists for path: %s... creating", path)
+ }
+ return pac
+}
+
+// ReleasePath will remove access control for a specific path within the model
+func (singleton *singletonProxyAccessControl) ReleasePath(pathLock string) {
+ singleton.Lock()
+ defer singleton.Unlock()
+ delete(singleton.cache, pathLock)
+}
+
+// ProxyAccessControl is the abstraction interface to the base proxyAccessControl structure
type ProxyAccessControl interface {
Get(path string, depth int, deep bool, txid string, control bool) interface{}
Update(path string, data interface{}, strict bool, txid string, control bool) interface{}
Add(path string, data interface{}, txid string, control bool) interface{}
Remove(path string, txid string, control bool) interface{}
+ SetProxy(proxy *Proxy)
}
+// proxyAccessControl holds details of the path and proxy that requires access control
type proxyAccessControl struct {
- //sync.Mutex
+ sync.RWMutex
Proxy *Proxy
PathLock chan struct{}
Path string
@@ -53,6 +85,7 @@
stop time.Time
}
+// NewProxyAccessControl creates a new instance of an access control structure
func NewProxyAccessControl(proxy *Proxy, path string) ProxyAccessControl {
return &proxyAccessControl{
Proxy: proxy,
@@ -61,58 +94,109 @@
}
}
+// lock will prevent access to a model path
func (pac *proxyAccessControl) lock() {
- log.CleanUp()
- log.Debugf("Before lock ... pac: %+v, stack = %s", pac, string(debug.Stack()))
pac.PathLock <- struct{}{}
- pac.start = time.Now()
- log.Debugf("Got lock ... pac: %+v, stack = %s", pac, string(debug.Stack()))
- //time.Sleep(1 * time.Second)
- log.Debugf("<<<<< %s >>>>>> locked, stack=%s", pac.Path, string(debug.Stack()))
-}
-func (pac *proxyAccessControl) unlock() {
- log.CleanUp()
- log.Debugf("Before unlock ... pac: %+v, stack = %s", pac, string(debug.Stack()))
- <-pac.PathLock
- pac.stop = time.Now()
- GetProfiling().AddToInMemoryLockTime(pac.stop.Sub(pac.start).Seconds())
- log.Debugf("Got unlock ... pac: %+v, stack = %s", pac, string(debug.Stack()))
- log.Debugf("<<<<< %s >>>>>> unlocked, stack=%s", pac.Path, string(debug.Stack()))
+ pac.setStart(time.Now())
}
+// unlock will release control of a model path
+func (pac *proxyAccessControl) unlock() {
+ <-pac.PathLock
+ pac.setStop(time.Now())
+ GetProfiling().AddToInMemoryLockTime(pac.getStop().Sub(pac.getStart()).Seconds())
+}
+
+// getStart is used for profiling purposes and returns the time at which access control was applied
+func (pac *proxyAccessControl) getStart() time.Time {
+ pac.Lock()
+ defer pac.Unlock()
+ return pac.start
+}
+
+// getStart is used for profiling purposes and returns the time at which access control was removed
+func (pac *proxyAccessControl) getStop() time.Time {
+ pac.Lock()
+ defer pac.Unlock()
+ return pac.stop
+}
+
+// getPath returns the access controlled path
+func (pac *proxyAccessControl) getPath() string {
+ pac.Lock()
+ defer pac.Unlock()
+ return pac.Path
+}
+
+// getProxy returns the proxy used to reach a specific location in the data model
+func (pac *proxyAccessControl) getProxy() *Proxy {
+ pac.Lock()
+ defer pac.Unlock()
+ return pac.Proxy
+}
+
+// setStart is for profiling purposes and applies a start time value at which access control was started
+func (pac *proxyAccessControl) setStart(time time.Time) {
+ pac.Lock()
+ defer pac.Unlock()
+ pac.start = time
+}
+
+// setStop is for profiling purposes and applies a stop time value at which access control was stopped
+func (pac *proxyAccessControl) setStop(time time.Time) {
+ pac.Lock()
+ defer pac.Unlock()
+ pac.stop = time
+}
+
+// SetProxy is used to changed the proxy object of an access controlled path
+func (pac *proxyAccessControl) SetProxy(proxy *Proxy) {
+ pac.Lock()
+ defer pac.Unlock()
+ pac.Proxy = proxy
+}
+
+// Get retrieves data linked to a data model path
func (pac *proxyAccessControl) Get(path string, depth int, deep bool, txid string, control bool) interface{} {
if control {
pac.lock()
defer pac.unlock()
log.Debugf("controlling get, stack = %s", string(debug.Stack()))
}
- pac.Proxy.Root.Proxy = pac.Proxy
- return pac.Proxy.Root.Get(path, "", depth, deep, txid)
+
+ // FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
+ // The data traversal through reflection currently corrupts the content
+
+ return pac.getProxy().GetRoot().Get(path, "", 0, deep, txid)
}
+
+// Update changes the content of the data model at the specified location with the provided data
func (pac *proxyAccessControl) Update(path string, data interface{}, strict bool, txid string, control bool) interface{} {
if control {
pac.lock()
defer pac.unlock()
log.Debugf("controlling update, stack = %s", string(debug.Stack()))
}
- pac.Proxy.Root.Proxy = pac.Proxy
- return pac.Proxy.Root.Update(path, data, strict, txid, nil)
+ return pac.getProxy().GetRoot().Update(path, data, strict, txid, nil).GetData()
}
+
+// Add creates a new data model entry at the specified location with the provided data
func (pac *proxyAccessControl) Add(path string, data interface{}, txid string, control bool) interface{} {
if control {
pac.lock()
defer pac.unlock()
log.Debugf("controlling add, stack = %s", string(debug.Stack()))
}
- pac.Proxy.Root.Proxy = pac.Proxy
- return pac.Proxy.Root.Add(path, data, txid, nil)
+ return pac.getProxy().GetRoot().Add(path, data, txid, nil).GetData()
+
}
+
+// Remove discards information linked to the data model path
func (pac *proxyAccessControl) Remove(path string, txid string, control bool) interface{} {
if control {
pac.lock()
defer pac.unlock()
log.Debugf("controlling remove, stack = %s", string(debug.Stack()))
}
- pac.Proxy.Root.Proxy = pac.Proxy
- return pac.Proxy.Root.Remove(path, txid, nil)
+ return pac.getProxy().GetRoot().Remove(path, txid, nil)
}
diff --git a/db/model/proxy_concurrency_test.go b/db/model/proxy_concurrency_test.go
index 85d385a..3ebe06a 100644
--- a/db/model/proxy_concurrency_test.go
+++ b/db/model/proxy_concurrency_test.go
@@ -40,8 +40,8 @@
)
func Test_ConcurrentProxy_1_1_Add_NewDevice(t *testing.T) {
- devIdBin, _ := uuid.New().MarshalBinary()
- devId = "0001" + hex.EncodeToString(devIdBin)[:12]
+ devIDBin, _ := uuid.New().MarshalBinary()
+ devID = "0001" + hex.EncodeToString(devIDBin)[:12]
preAddExecuted := false
postAddExecuted := false
@@ -49,14 +49,14 @@
modelTestConfig.RootProxy.RegisterCallback(PRE_ADD, commonCallback, "PRE_ADD instructions", &preAddExecuted)
modelTestConfig.RootProxy.RegisterCallback(POST_ADD, commonCallback, "POST_ADD instructions", &postAddExecuted)
- device.Id = devId
+ device.Id = devID
if added := modelTestConfig.RootProxy.Add("/devices", device, ""); added == nil {
t.Error("Failed to add device")
} else {
t.Logf("Added device : %+v", added)
}
- if d := modelTestConfig.RootProxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := modelTestConfig.RootProxy.Get("/devices/"+devID, 0, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find added device")
} else {
djson, _ := json.Marshal(d)
@@ -74,9 +74,9 @@
func Test_ConcurrentProxy_1_Add_ExistingDevice(t *testing.T) {
t.Parallel()
- device.Id = devId
+ device.Id = devID
if added := modelTestConfig.RootProxy.Add("/devices", device, ""); added == nil {
- t.Logf("Successfully detected that the device already exists: %s", devId)
+ t.Logf("Successfully detected that the device already exists: %s", devID)
} else {
t.Errorf("A new device should not have been created : %+v", added)
}
@@ -97,7 +97,7 @@
func Test_ConcurrentProxy_Get_Update_DeviceAdminState(t *testing.T) {
t.Parallel()
- if retrieved := modelTestConfig.RootProxy.Get("/devices/"+devId, 1, false, ""); retrieved == nil {
+ if retrieved := modelTestConfig.RootProxy.Get("/devices/"+devID, 1, false, ""); retrieved == nil {
t.Error("Failed to get device")
} else {
retrieved.(*voltha.Device).AdminState = voltha.AdminState_DISABLED
@@ -116,12 +116,12 @@
"POST_UPDATE instructions (root proxy)", &postUpdateExecuted,
)
- if afterUpdate := modelTestConfig.RootProxy.Update("/devices/"+devId, retrieved, false, ""); afterUpdate == nil {
+ if afterUpdate := modelTestConfig.RootProxy.Update("/devices/"+devID, retrieved, false, ""); afterUpdate == nil {
t.Error("Failed to update device")
} else {
t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
}
- if d := modelTestConfig.RootProxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := modelTestConfig.RootProxy.Get("/devices/"+devID, 0, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated device (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -140,8 +140,8 @@
func Test_ConcurrentProxy_Get_SingleDevice(t *testing.T) {
//t.Parallel()
- if d := modelTestConfig.RootProxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
- t.Errorf("Failed to find device : %s", devId)
+ if d := modelTestConfig.RootProxy.Get("/devices/"+devID, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Errorf("Failed to find device : %s", devID)
} else {
djson, _ := json.Marshal(d)
t.Logf("Found device: %s", string(djson))
@@ -152,8 +152,8 @@
func Test_ConcurrentProxy_Get_SingleDeviceFlows(t *testing.T) {
t.Parallel()
- if d := modelTestConfig.RootProxy.Get("/devices/"+devId+"/flows", 0, false, ""); !reflect.ValueOf(d).IsValid() {
- t.Errorf("Failed to find device : %s", devId)
+ if d := modelTestConfig.RootProxy.Get("/devices/"+devID+"/flows", 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Errorf("Failed to find device : %s", devID)
} else {
djson, _ := json.Marshal(d)
t.Logf("Found device: %s", string(djson))
@@ -163,8 +163,8 @@
func Test_ConcurrentProxy_Get_SingleDevicePorts(t *testing.T) {
t.Parallel()
- if d := modelTestConfig.RootProxy.Get("/devices/"+devId+"/ports", 0, false, ""); !reflect.ValueOf(d).IsValid() {
- t.Errorf("Failed to find device : %s", devId)
+ if d := modelTestConfig.RootProxy.Get("/devices/"+devID+"/ports", 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Errorf("Failed to find device : %s", devID)
} else {
djson, _ := json.Marshal(d)
t.Logf("Found device: %s", string(djson))
@@ -175,7 +175,7 @@
func Test_ConcurrentProxy_Get_Update_DeviceFirmware(t *testing.T) {
t.Parallel()
- if retrieved := modelTestConfig.RootProxy.Get("/devices/"+devId, 1, false, ""); retrieved == nil {
+ if retrieved := modelTestConfig.RootProxy.Get("/devices/"+devID, 1, false, ""); retrieved == nil {
t.Error("Failed to get device")
} else {
var fwVersion int
@@ -183,7 +183,7 @@
fwVersion = 0
} else {
fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
- fwVersion += 1
+ fwVersion++
}
preUpdateExecuted := false
@@ -202,12 +202,12 @@
retrieved.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
- if afterUpdate := modelTestConfig.RootProxy.Update("/devices/"+devId, retrieved, false, ""); afterUpdate == nil {
+ if afterUpdate := modelTestConfig.RootProxy.Update("/devices/"+devID, retrieved, false, ""); afterUpdate == nil {
t.Error("Failed to update device")
} else {
t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
}
- if d := modelTestConfig.RootProxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := modelTestConfig.RootProxy.Get("/devices/"+devID, 0, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated device (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -227,7 +227,7 @@
// t.Parallel()
//
// // Get a device proxy and update a specific port
-// //devflowsProxy := modelTestConfig.Root.GetProxy("/devices/"+devId+"/flows", false)
+// //devflowsProxy := modelTestConfig.Root.CreateProxy("/devices/"+devID+"/flows", false)
// flows := modelTestConfig.RootProxy.Get("/", 0, false, "")
// flows.([]interface{})[0].(*openflow_13.Flows).Items[0].TableId = 2244
//
@@ -245,26 +245,26 @@
// "POST_UPDATE instructions (flows proxy)", &postUpdateExecuted,
// )
//
-// kvFlows := modelTestConfig.RootProxy.Get("/devices/"+devId+"/flows", 0, false, "")
+// kvFlows := modelTestConfig.RootProxy.Get("/devices/"+devID+"/flows", 0, false, "")
//
// if reflect.DeepEqual(flows, kvFlows) {
// t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
// }
//
-// if updated := modelTestConfig.RootProxy.Update("/devices/"+devId+"/flows", flows.([]interface{})[0], false, ""); updated == nil {
+// if updated := modelTestConfig.RootProxy.Update("/devices/"+devID+"/flows", flows.([]interface{})[0], false, ""); updated == nil {
// t.Error("Failed to update flow")
// } else {
// t.Logf("Updated flows : %+v", updated)
// }
//
-// if d := modelTestConfig.RootProxy.Get("/devices/"+devId+"/flows", 0, false, ""); d == nil {
+// if d := modelTestConfig.RootProxy.Get("/devices/"+devID+"/flows", 0, false, ""); d == nil {
// t.Error("Failed to find updated flows (flows proxy)")
// } else {
// djson, _ := json.Marshal(d)
// t.Logf("Found flows (flows proxy): %s", string(djson))
// }
//
-// if d := modelTestConfig.RootProxy.Get("/devices/"+devId+"/flows", 0, false, ""); !reflect.ValueOf(d).IsValid() {
+// if d := modelTestConfig.RootProxy.Get("/devices/"+devID+"/flows", 0, false, ""); !reflect.ValueOf(d).IsValid() {
// t.Error("Failed to find updated flows (root proxy)")
// } else {
// djson, _ := json.Marshal(d)
@@ -294,16 +294,16 @@
// "POST_REMOVE instructions (root proxy)", &postRemoveExecuted,
// )
//
-// if removed := modelTestConfig.RootProxy.Remove("/devices/"+devId, ""); removed == nil {
+// if removed := modelTestConfig.RootProxy.Remove("/devices/"+devID, ""); removed == nil {
// t.Error("Failed to remove device")
// } else {
// t.Logf("Removed device : %+v", removed)
// }
-// if d := modelTestConfig.RootProxy.Get("/devices/"+devId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+// if d := modelTestConfig.RootProxy.Get("/devices/"+devID, 0, false, ""); reflect.ValueOf(d).IsValid() {
// djson, _ := json.Marshal(d)
// t.Errorf("Device was not removed - %s", djson)
// } else {
-// t.Logf("Device was removed: %s", devId)
+// t.Logf("Device was removed: %s", devID)
// }
//
// if !preRemoveExecuted {
@@ -344,7 +344,7 @@
// b.Error("Failed to update device")
// } else {
// if d := modelTestConfig.RootProxy.Get("/devices/"+target.Id, 0, false, ""); !reflect.ValueOf(d).IsValid() {
-// b.Errorf("Failed to find device : %s", devId)
+// b.Errorf("Failed to find device : %s", devID)
// } else {
// //djson, _ := json.Marshal(d)
// //b.Logf("Checking updated device device: %s", string(djson))
@@ -369,7 +369,7 @@
//
// for n := 0; n < b.N; n++ {
// if d := modelTestConfig.RootProxy.Get("/devices/"+target.Id, 0, false, ""); !reflect.ValueOf(d).IsValid() {
-// b.Errorf("Failed to find device : %s", devId)
+// b.Errorf("Failed to find device : %s", devID)
// } else {
// //djson, _ := json.Marshal(d)
// //b.Logf("Found device: %s", string(djson))
@@ -401,7 +401,7 @@
fwVersion = 0
} else {
fwVersion, _ = strconv.Atoi(target.FirmwareVersion)
- fwVersion += 1
+ fwVersion++
}
target.FirmwareVersion = strconv.Itoa(fwVersion)
@@ -411,7 +411,7 @@
b.Error("Failed to update device")
} else {
if d := modelTestConfig.RootProxy.Get("/devices/"+target.Id, 0, false, ""); !reflect.ValueOf(d).IsValid() {
- b.Errorf("Failed to find device : %s", devId)
+ b.Errorf("Failed to find device : %s", devID)
} else if d.(*voltha.Device).FirmwareVersion != target.FirmwareVersion {
b.Errorf("Firmware was not uptaded - expected: %s, actual: %s",
target.FirmwareVersion,
@@ -446,7 +446,7 @@
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if d := modelTestConfig.RootProxy.Get("/devices/"+target.Id, 0, false, ""); !reflect.ValueOf(d).IsValid() {
- b.Errorf("Failed to find device : %s", devId)
+ b.Errorf("Failed to find device : %s", devID)
} else {
//djson, _ := json.Marshal(d)
//b.Logf("Found device: %s", string(djson))
diff --git a/db/model/proxy_load_test.go b/db/model/proxy_load_test.go
new file mode 100644
index 0000000..b388d97
--- /dev/null
+++ b/db/model/proxy_load_test.go
@@ -0,0 +1,342 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "encoding/hex"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/protos/common"
+ "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "math/rand"
+ "reflect"
+ "strconv"
+ "sync"
+ "testing"
+)
+
+var (
+ ltDevProxy *Proxy
+ plt *proxyLoadTest
+ tlog log.Logger
+)
+
+type proxyLoadChanges struct {
+ ID string
+ Before interface{}
+ After interface{}
+}
+type proxyLoadTest struct {
+ sync.RWMutex
+ addedDevices [] string
+ updatedFirmwares []proxyLoadChanges
+ updatedFlows []proxyLoadChanges
+ preAddExecuted bool
+ postAddExecuted bool
+ preUpdateExecuted bool
+ postUpdateExecuted bool
+}
+
+func (plt *proxyLoadTest) SetPreAddExecuted(status bool) {
+ plt.Lock()
+ defer plt.Unlock()
+ plt.preAddExecuted = status
+}
+func (plt *proxyLoadTest) SetPostAddExecuted(status bool) {
+ plt.Lock()
+ defer plt.Unlock()
+ plt.postAddExecuted = status
+}
+func (plt *proxyLoadTest) SetPreUpdateExecuted(status bool) {
+ plt.Lock()
+ defer plt.Unlock()
+ plt.preUpdateExecuted = status
+}
+func (plt *proxyLoadTest) SetPostUpdateExecuted(status bool) {
+ plt.Lock()
+ defer plt.Unlock()
+ plt.postUpdateExecuted = status
+}
+func init() {
+ tlog, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+ log.UpdateAllLoggers(log.Fields{"instanceId": "PROXY_LOAD_TEST"})
+ defer log.CleanUp()
+
+ ltDevProxy = modelTestConfig.Root.node.CreateProxy("/", false)
+ // Register ADD instructions callbacks
+ plt = &proxyLoadTest{}
+
+ ltDevProxy.RegisterCallback(PRE_ADD, commonCallbackFunc, "PRE_ADD", plt.SetPreAddExecuted)
+ ltDevProxy.RegisterCallback(POST_ADD, commonCallbackFunc, "POST_ADD", plt.SetPostAddExecuted)
+
+ //// Register UPDATE instructions callbacks
+ ltDevProxy.RegisterCallback(PRE_UPDATE, commonCallbackFunc, "PRE_UPDATE", plt.SetPreUpdateExecuted)
+ ltDevProxy.RegisterCallback(POST_UPDATE, commonCallbackFunc, "POST_UPDATE", plt.SetPostUpdateExecuted)
+
+}
+
+func Benchmark_ProxyLoad_AddDevice(b *testing.B) {
+ defer GetProfiling().Report()
+ b.RunParallel(func(pb *testing.PB) {
+ b.Log("Started adding devices")
+ for pb.Next() {
+ ltPorts := []*voltha.Port{
+ {
+ PortNo: 123,
+ Label: "lt-port-0",
+ Type: voltha.Port_PON_OLT,
+ AdminState: common.AdminState_ENABLED,
+ OperStatus: common.OperStatus_ACTIVE,
+ DeviceId: "lt-port-0-device-id",
+ Peers: []*voltha.Port_PeerPort{},
+ },
+ }
+
+ ltStats := &openflow_13.OfpFlowStats{
+ Id: 1000,
+ }
+ ltFlows := &openflow_13.Flows{
+ Items: []*openflow_13.OfpFlowStats{ltStats},
+ }
+ ltDevice := &voltha.Device{
+ Id: "",
+ Type: "simulated_olt",
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ Flows: ltFlows,
+ Ports: ltPorts,
+ }
+
+ ltDevIDBin, _ := uuid.New().MarshalBinary()
+ ltDevID := "0001" + hex.EncodeToString(ltDevIDBin)[:12]
+ ltDevice.Id = ltDevID
+
+ plt.SetPreAddExecuted(false)
+ plt.SetPostAddExecuted(false)
+
+ var added interface{}
+ // Add the device
+ if added = ltDevProxy.AddWithID("/devices", ltDevID, ltDevice, ""); added == nil {
+ tlog.Errorf("Failed to add device: %+v", ltDevice)
+ continue
+ } else {
+ tlog.Infof("Device was added 1: %+v", added)
+ }
+
+ plt.Lock()
+ plt.addedDevices = append(plt.addedDevices, added.(*voltha.Device).Id)
+ plt.Unlock()
+ }
+ })
+
+ tlog.Infof("Number of added devices : %d", len(plt.addedDevices))
+}
+
+func Benchmark_ProxyLoad_UpdateFirmware(b *testing.B) {
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ //for i:=0; i < b.N; i++ {
+
+ if len(plt.addedDevices) > 0 {
+ var target interface{}
+ randomID := plt.addedDevices[rand.Intn(len(plt.addedDevices))]
+ firmProxy := modelTestConfig.Root.node.CreateProxy("/", false)
+ if target = firmProxy.Get("/devices/"+randomID, 0, false,
+ ""); !reflect.ValueOf(target).IsValid() {
+ tlog.Errorf("Failed to find device: %s %+v", randomID, target)
+ continue
+ }
+
+ plt.SetPreUpdateExecuted(false)
+ plt.SetPostUpdateExecuted(false)
+ firmProxy.RegisterCallback(PRE_UPDATE, commonCallbackFunc, "PRE_UPDATE", plt.SetPreUpdateExecuted)
+ firmProxy.RegisterCallback(POST_UPDATE, commonCallbackFunc, "POST_UPDATE", plt.SetPostUpdateExecuted)
+
+ var fwVersion int
+
+ before := target.(*voltha.Device).FirmwareVersion
+ if target.(*voltha.Device).FirmwareVersion == "n/a" {
+ fwVersion = 0
+ } else {
+ fwVersion, _ = strconv.Atoi(target.(*voltha.Device).FirmwareVersion)
+ fwVersion++
+ }
+
+ target.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
+ after := target.(*voltha.Device).FirmwareVersion
+
+ var updated interface{}
+ if updated = firmProxy.Update("/devices/"+randomID, target.(*voltha.Device), false,
+ ""); updated == nil {
+ tlog.Errorf("Failed to update device: %+v", target)
+ continue
+ } else {
+ tlog.Infof("Device was updated : %+v", updated)
+
+ }
+
+ if d := firmProxy.Get("/devices/"+randomID, 0, false,
+ ""); !reflect.ValueOf(d).IsValid() {
+ tlog.Errorf("Failed to get device: %s", randomID)
+ continue
+ } else if d.(*voltha.Device).FirmwareVersion == after {
+ tlog.Infof("Imm Device was updated with new value: %s %+v", randomID, d)
+ } else if d.(*voltha.Device).FirmwareVersion == before {
+ tlog.Errorf("Imm Device kept old value: %s %+v %+v", randomID, d, target)
+ } else {
+ tlog.Errorf("Imm Device has unknown value: %s %+v %+v", randomID, d, target)
+ }
+
+ plt.Lock()
+ plt.updatedFirmwares = append(
+ plt.updatedFirmwares,
+ proxyLoadChanges{ID: randomID, Before: before, After: after},
+ )
+ plt.Unlock()
+ }
+ }
+ })
+}
+
+func traverseBranches(revision Revision, depth int) {
+ if revision == nil {
+ return
+ }
+ prefix := strconv.Itoa(depth) + " ~~~~ "
+ for i := 0; i < depth; i++ {
+ prefix += " "
+ }
+
+ tlog.Debugf("%sRevision: %s %+v", prefix, revision.GetHash(), revision.GetData())
+
+ //for brIdx, brRev := range revision.GetBranch().Revisions {
+ // tlog.Debugf("%sbranchIndex: %s", prefix, brIdx)
+ // traverseBranches(brRev, depth+1)
+ //}
+ for childrenI, children := range revision.GetChildren() {
+ tlog.Debugf("%schildrenIndex: %s, length: %d", prefix, childrenI, len(children))
+
+ for _, subrev := range children {
+ //subrev.GetBranch().Latest
+ traverseBranches(subrev, depth+1)
+ }
+ }
+
+}
+func Benchmark_ProxyLoad_UpdateFlows(b *testing.B) {
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ if len(plt.addedDevices) > 0 {
+ randomID := plt.addedDevices[rand.Intn(len(plt.addedDevices))]
+
+ flowsProxy := modelTestConfig.Root.node.CreateProxy("/devices/"+randomID+"/flows", false)
+ flows := flowsProxy.Get("/", 0, false, "")
+
+ before := flows.(*openflow_13.Flows).Items[0].TableId
+ flows.(*openflow_13.Flows).Items[0].TableId = uint32(rand.Intn(3000))
+ after := flows.(*openflow_13.Flows).Items[0].TableId
+
+ flowsProxy.RegisterCallback(
+ PRE_UPDATE,
+ commonCallback2,
+ )
+ flowsProxy.RegisterCallback(
+ POST_UPDATE,
+ commonCallback2,
+ )
+
+ var updated interface{}
+ if updated = flowsProxy.Update("/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ b.Errorf("Failed to update flows for device: %+v", flows)
+ } else {
+ tlog.Infof("Flows were updated : %+v", updated)
+ }
+ plt.Lock()
+ plt.updatedFlows = append(
+ plt.updatedFlows,
+ proxyLoadChanges{ID: randomID, Before: before, After: after},
+ )
+ plt.Unlock()
+ }
+ }
+ })
+}
+
+func Benchmark_ProxyLoad_GetDevices(b *testing.B) {
+ traverseBranches(ltDevProxy.Root.node.Branches[NONE].GetLatest(), 0)
+
+ for i := 0; i < len(plt.addedDevices); i++ {
+ devToGet := plt.addedDevices[i]
+ // Verify that the added device can now be retrieved
+ if d := ltDevProxy.Get("/devices/"+devToGet, 0, false,
+ ""); !reflect.ValueOf(d).IsValid() {
+ tlog.Errorf("Failed to get device: %s", devToGet)
+ continue
+ } else {
+ tlog.Infof("Got device: %s %+v", devToGet, d)
+ }
+ }
+}
+
+func Benchmark_ProxyLoad_GetUpdatedFirmware(b *testing.B) {
+ for i := 0; i < len(plt.updatedFirmwares); i++ {
+ devToGet := plt.updatedFirmwares[i].ID
+ // Verify that the updated device can be retrieved and that the updates were actually applied
+ if d := ltDevProxy.Get("/devices/"+devToGet, 0, false,
+ ""); !reflect.ValueOf(d).IsValid() {
+ tlog.Errorf("Failed to get device: %s", devToGet)
+ continue
+ } else if d.(*voltha.Device).FirmwareVersion == plt.updatedFirmwares[i].After.(string) {
+ tlog.Infof("Device was updated with new value: %s %+v", devToGet, d)
+ } else if d.(*voltha.Device).FirmwareVersion == plt.updatedFirmwares[i].Before.(string) {
+ tlog.Errorf("Device kept old value: %s %+v %+v", devToGet, d, plt.updatedFirmwares[i])
+ } else {
+ tlog.Errorf("Device has unknown value: %s %+v %+v", devToGet, d, plt.updatedFirmwares[i])
+ }
+ }
+}
+
+func Benchmark_ProxyLoad_GetUpdatedFlows(b *testing.B) {
+ var d interface{}
+ for i := 0; i < len(plt.updatedFlows); i++ {
+ devToGet := plt.updatedFlows[i].ID
+ // Verify that the updated device can be retrieved and that the updates were actually applied
+ flowsProxy := modelTestConfig.Root.node.CreateProxy("/devices/"+devToGet+"/flows", false)
+ if d = flowsProxy.Get("/", 0, false,
+ ""); !reflect.ValueOf(d).IsValid() {
+ tlog.Errorf("Failed to get device flows: %s", devToGet)
+ continue
+ } else if d.(*openflow_13.Flows).Items[0].TableId == plt.updatedFlows[i].After.(uint32) {
+ tlog.Infof("Device was updated with new flow value: %s %+v", devToGet, d)
+ } else if d.(*openflow_13.Flows).Items[0].TableId == plt.updatedFlows[i].Before.(uint32) {
+ tlog.Errorf("Device kept old flow value: %s %+v %+v", devToGet, d, plt.updatedFlows[i])
+ } else {
+ tlog.Errorf("Device has unknown flow value: %s %+v %+v", devToGet, d, plt.updatedFlows[i])
+ }
+ //if d = ltDevProxy.Get("/devices/"+devToGet, 0, false,
+ // ""); !reflect.ValueOf(d).IsValid() {
+ // tlog.Errorf("Failed to get device: %s", devToGet)
+ // continue
+ //} else if d.(*voltha.Device).Flows.Items[0].TableId == plt.updatedFlows[i].After.(uint32) {
+ // tlog.Infof("Device was updated with new flow value: %s %+v", devToGet, d)
+ //} else if d.(*voltha.Device).Flows.Items[0].TableId == plt.updatedFlows[i].Before.(uint32) {
+ // tlog.Errorf("Device kept old flow value: %s %+v %+v", devToGet, d, plt.updatedFlows[i])
+ //} else {
+ // tlog.Errorf("Device has unknown flow value: %s %+v %+v", devToGet, d, plt.updatedFlows[i])
+ //}
+ }
+}
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index 2d831c1..666eb3d 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -19,34 +19,46 @@
"encoding/hex"
"encoding/json"
"github.com/google/uuid"
- "github.com/opencord/voltha-go/protos/voltha"
"github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "math/rand"
"reflect"
"strconv"
"testing"
)
var (
+ ldevProxy *Proxy
+ devProxy *Proxy
+ flowProxy *Proxy
)
-func Test_Proxy_1_1_Add_NewDevice(t *testing.T) {
- devIdBin, _ := uuid.New().MarshalBinary()
- devId = "0001" + hex.EncodeToString(devIdBin)[:12]
+func init() {
+ ldevProxy = modelTestConfig.Root.node.CreateProxy("/", false)
+ devProxy = modelTestConfig.Root.node.CreateProxy("/", false)
+}
+
+func Test_Proxy_1_1_1_Add_NewDevice(t *testing.T) {
+ devIDBin, _ := uuid.New().MarshalBinary()
+ devID = "0001" + hex.EncodeToString(devIDBin)[:12]
+ device.Id = devID
preAddExecuted := false
postAddExecuted := false
- modelTestConfig.RootProxy.RegisterCallback(PRE_ADD, commonCallback, "PRE_ADD instructions", &preAddExecuted)
- modelTestConfig.RootProxy.RegisterCallback(POST_ADD, commonCallback, "POST_ADD instructions", &postAddExecuted)
+ // Register ADD instructions callbacks
+ devProxy.RegisterCallback(PRE_ADD, commonCallback, "PRE_ADD instructions", &preAddExecuted)
+ devProxy.RegisterCallback(POST_ADD, commonCallback, "POST_ADD instructions", &postAddExecuted)
- device.Id = devId
- if added := modelTestConfig.RootProxy.Add("/devices", device, ""); added == nil {
+ // Add the device
+ if added := devProxy.Add("/devices", device, ""); added == nil {
t.Error("Failed to add device")
} else {
t.Logf("Added device : %+v", added)
}
- if d := modelTestConfig.RootProxy.Get("/devices/"+devId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ // Verify that the added device can now be retrieved
+ if d := devProxy.Get("/devices/"+devID, 0, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find added device")
} else {
djson, _ := json.Marshal(d)
@@ -61,31 +73,32 @@
}
}
-func Test_Proxy_1_2_Add_ExistingDevice(t *testing.T) {
- device.Id = devId
- if added := modelTestConfig.RootProxy.Add("/devices", device, ""); added == nil {
- t.Logf("Successfully detected that the device already exists: %s", devId)
+func Test_Proxy_1_1_2_Add_ExistingDevice(t *testing.T) {
+ device.Id = devID
+
+ if added := devProxy.Add("/devices", device, ""); added == nil {
+ t.Logf("Successfully detected that the device already exists: %s", devID)
} else {
t.Errorf("A new device should not have been created : %+v", added)
}
}
-func Test_Proxy_2_1_Get_AllDevices(t *testing.T) {
- devices := modelTestConfig.RootProxy.Get("/devices", 1, false, "")
+func Test_Proxy_1_2_1_Get_AllDevices(t *testing.T) {
+ devices := devProxy.Get("/devices", 1, false, "")
if len(devices.([]interface{})) == 0 {
t.Error("there are no available devices to retrieve")
} else {
// Save the target device id for later tests
- targetDeviceId = devices.([]interface{})[0].(*voltha.Device).Id
+ targetDevID = devices.([]interface{})[0].(*voltha.Device).Id
t.Logf("retrieved all devices: %+v", devices)
}
}
-func Test_Proxy_2_2_Get_SingleDevice(t *testing.T) {
- if d := modelTestConfig.RootProxy.Get("/devices/"+targetDeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
- t.Errorf("Failed to find device : %s", targetDeviceId)
+func Test_Proxy_1_2_2_Get_SingleDevice(t *testing.T) {
+ if d := devProxy.Get("/devices/"+targetDevID, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Errorf("Failed to find device : %s", targetDevID)
} else {
djson, _ := json.Marshal(d)
t.Logf("Found device: %s", string(djson))
@@ -93,49 +106,46 @@
}
-func Test_Proxy_3_1_Update_Device_WithRootProxy(t *testing.T) {
- if retrieved := modelTestConfig.RootProxy.Get("/devices/"+targetDeviceId, 1, false, ""); retrieved == nil {
+func Test_Proxy_1_3_1_Update_Device(t *testing.T) {
+ var fwVersion int
+ preUpdateExecuted := false
+ postUpdateExecuted := false
+
+ if retrieved := devProxy.Get("/devices/"+targetDevID, 1, false, ""); retrieved == nil {
t.Error("Failed to get device")
} else {
t.Logf("Found raw device (root proxy): %+v", retrieved)
- var fwVersion int
if retrieved.(*voltha.Device).FirmwareVersion == "n/a" {
fwVersion = 0
} else {
fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
- fwVersion += 1
+ fwVersion++
}
- preUpdateExecuted := false
- postUpdateExecuted := false
+ retrieved.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
- modelTestConfig.RootProxy.RegisterCallback(
+ devProxy.RegisterCallback(
PRE_UPDATE,
commonCallback,
"PRE_UPDATE instructions (root proxy)", &preUpdateExecuted,
)
- modelTestConfig.RootProxy.RegisterCallback(
+ devProxy.RegisterCallback(
POST_UPDATE,
commonCallback,
"POST_UPDATE instructions (root proxy)", &postUpdateExecuted,
)
- //cloned := reflect.ValueOf(retrieved).Elem().Interface().(voltha.Device)
- //cloned.FirmwareVersion = strconv.Itoa(fwVersion)
- retrieved.(*voltha.Device).FirmwareVersion = strconv.Itoa(fwVersion)
- //t.Logf("Before update : %+v", cloned)
-
- if afterUpdate := modelTestConfig.RootProxy.Update("/devices/"+targetDeviceId, retrieved, false, ""); afterUpdate == nil {
+ if afterUpdate := devProxy.Update("/devices/"+targetDevID, retrieved, false, ""); afterUpdate == nil {
t.Error("Failed to update device")
} else {
- t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
+ t.Logf("Updated device : %+v", afterUpdate)
}
- if d := modelTestConfig.RootProxy.Get("/devices/"+targetDeviceId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+
+ if d := devProxy.Get("/devices/"+targetDevID, 1, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated device (root proxy)")
} else {
djson, _ := json.Marshal(d)
-
t.Logf("Found device (root proxy): %s raw: %+v", string(djson), d)
}
@@ -148,54 +158,46 @@
}
}
-func Test_Proxy_3_2_Update_Flow_WithSubProxy(t *testing.T) {
+func Test_Proxy_1_3_2_Update_DeviceFlows(t *testing.T) {
// Get a device proxy and update a specific port
- devflowsProxy := modelTestConfig.Root.GetProxy("/devices/"+devId+"/flows", false)
- flows := devflowsProxy.Get("/", 0, false, "")
- //flows.([]interface{})[0].(*openflow_13.Flows).Items[0].TableId = 2222
- //flows.([]interface{})[0].(*openflow_13.Flows).Items[0].TableId = 2244
+ flowProxy = modelTestConfig.Root.node.CreateProxy("/devices/"+devID+"/flows", false)
+ flows := flowProxy.Get("/", 0, false, "")
flows.(*openflow_13.Flows).Items[0].TableId = 2244
- t.Logf("before updated flows: %+v", flows)
-
- //devPortsProxy := modelTestConfig.RootProxy.node.GetProxy("/devices/"+devId+"/ports", false)
- //port123 := devPortsProxy.Get("/123", 0, false, "")
- //t.Logf("got ports: %+v", port123)
- //port123.(*voltha.Port).OperStatus = common.OperStatus_DISCOVERED
preUpdateExecuted := false
postUpdateExecuted := false
- devflowsProxy.RegisterCallback(
+ flowProxy.RegisterCallback(
PRE_UPDATE,
commonCallback,
"PRE_UPDATE instructions (flows proxy)", &preUpdateExecuted,
)
- devflowsProxy.RegisterCallback(
+ flowProxy.RegisterCallback(
POST_UPDATE,
commonCallback,
"POST_UPDATE instructions (flows proxy)", &postUpdateExecuted,
)
- kvFlows := devflowsProxy.Get("/", 0, false, "")
+ kvFlows := flowProxy.Get("/", 0, false, "")
if reflect.DeepEqual(flows, kvFlows) {
t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
}
- if updated := devflowsProxy.Update("/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ if updated := flowProxy.Update("/", flows.(*openflow_13.Flows), false, ""); updated == nil {
t.Error("Failed to update flow")
} else {
t.Logf("Updated flows : %+v", updated)
}
- if d := devflowsProxy.Get("/", 0, false, ""); d == nil {
+ if d := flowProxy.Get("/", 0, false, ""); d == nil {
t.Error("Failed to find updated flows (flows proxy)")
} else {
djson, _ := json.Marshal(d)
t.Logf("Found flows (flows proxy): %s", string(djson))
}
- if d := modelTestConfig.RootProxy.Get("/devices/"+devId+"/flows", 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := devProxy.Get("/devices/"+devID+"/flows", 1, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated flows (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -208,38 +210,9 @@
if !postUpdateExecuted {
t.Error("POST_UPDATE callback was not executed")
}
-
- //Get a device proxy and update all its ports
-
-
- //devProxy := modelTestConfig.RootProxy.GetProxy("/devices/"+devId, false)
- //ports := devProxy.Get("/ports", 0, false, "")
- //t.Logf("got ports: %+v", ports)
- //devProxy.RegisterCallback(POST_UPDATE, commonCallback, nil)
- //
- //ports.([]interface{})[0].(*voltha.Port).OperStatus = common.OperStatus_DISCOVERED
- //
- //devProxy.Update("/ports", ports, false, "")
- //updated := devProxy.Get("/ports", 0, false, "")
- //t.Logf("got updated ports: %+v", updated)
-
- //
- // Get a device proxy, retrieve all the ports and update a specific one
- //
-
- //devProxy := modelTestConfig.RootProxy.GetProxy("/devices/"+devId, false)
- //ports := devProxy.Get("/ports", 0, false, "")
- //t.Logf("got ports: %+v", ports)
- //devProxy.RegisterCallback(POST_UPDATE, commonCallback, nil)
- //
- //ports.([]interface{})[0].(*voltha.Port).OperStatus = common.OperStatus_DISCOVERED
- //
- //devProxy.Update("/ports/123", ports.([]interface{})[0], false, "")
- //updated := devProxy.Get("/ports", 0, false, "")
- //t.Logf("got updated ports: %+v", updated)
}
-func Test_Proxy_4_1_Remove_Device(t *testing.T) {
+func Test_Proxy_1_4_1_Remove_Device(t *testing.T) {
preRemoveExecuted := false
postRemoveExecuted := false
@@ -254,16 +227,214 @@
"POST_REMOVE instructions (root proxy)", &postRemoveExecuted,
)
- if removed := modelTestConfig.RootProxy.Remove("/devices/"+devId, ""); removed == nil {
+ if removed := modelTestConfig.RootProxy.Remove("/devices/"+devID, ""); removed == nil {
t.Error("Failed to remove device")
} else {
t.Logf("Removed device : %+v", removed)
}
- if d := modelTestConfig.RootProxy.Get("/devices/"+devId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ if d := modelTestConfig.RootProxy.Get("/devices/"+devID, 0, false, ""); reflect.ValueOf(d).IsValid() {
djson, _ := json.Marshal(d)
t.Errorf("Device was not removed - %s", djson)
} else {
- t.Logf("Device was removed: %s", devId)
+ t.Logf("Device was removed: %s", devID)
+ }
+
+ if !preRemoveExecuted {
+ t.Error("PRE_UPDATE callback was not executed")
+ }
+ if !postRemoveExecuted {
+ t.Error("POST_UPDATE callback was not executed")
+ }
+}
+
+func Test_Proxy_2_1_1_Add_NewLogicalDevice(t *testing.T) {
+
+ ldIDBin, _ := uuid.New().MarshalBinary()
+ ldevID = "0001" + hex.EncodeToString(ldIDBin)[:12]
+ logicalDevice.Id = ldevID
+
+ preAddExecuted := false
+ postAddExecuted := false
+
+ // Register
+ ldevProxy.RegisterCallback(PRE_ADD, commonCallback, "PRE_ADD instructions", &preAddExecuted)
+ ldevProxy.RegisterCallback(POST_ADD, commonCallback, "POST_ADD instructions", &postAddExecuted)
+
+ if added := ldevProxy.Add("/logical_devices", logicalDevice, ""); added == nil {
+ t.Error("Failed to add logical device")
+ } else {
+ t.Logf("Added logical device : %+v", added)
+ }
+
+ if ld := ldevProxy.Get("/logical_devices/"+ldevID, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
+ t.Error("Failed to find added logical device")
+ } else {
+ ldJSON, _ := json.Marshal(ld)
+ t.Logf("Found logical device: %s", string(ldJSON))
+ }
+
+ if !preAddExecuted {
+ t.Error("PRE_ADD callback was not executed")
+ }
+ if !postAddExecuted {
+ t.Error("POST_ADD callback was not executed")
+ }
+}
+
+func Test_Proxy_2_1_2_Add_ExistingLogicalDevice(t *testing.T) {
+ logicalDevice.Id = ldevID
+ if added := ldevProxy.Add("/logical_devices", logicalDevice, ""); added == nil {
+ t.Logf("Successfully detected that the logical device already exists: %s", ldevID)
+ } else {
+ t.Errorf("A new logical device should not have been created : %+v", added)
+ }
+
+}
+
+func Test_Proxy_2_2_1_Get_AllLogicalDevices(t *testing.T) {
+ logicalDevices := ldevProxy.Get("/logical_devices", 1, false, "")
+
+ if len(logicalDevices.([]interface{})) == 0 {
+ t.Error("there are no available logical devices to retrieve")
+ } else {
+ // Save the target device id for later tests
+ targetLogDevID = logicalDevices.([]interface{})[0].(*voltha.LogicalDevice).Id
+ t.Logf("retrieved all logical devices: %+v", logicalDevices)
+ }
+}
+
+func Test_Proxy_2_2_2_Get_SingleLogicalDevice(t *testing.T) {
+ if ld := ldevProxy.Get("/logical_devices/"+targetLogDevID, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
+ t.Errorf("Failed to find logical device : %s", targetLogDevID)
+ } else {
+ ldJSON, _ := json.Marshal(ld)
+ t.Logf("Found logical device: %s", string(ldJSON))
+ }
+
+}
+
+func Test_Proxy_2_3_1_Update_LogicalDevice(t *testing.T) {
+ var fwVersion int
+ preUpdateExecuted := false
+ postUpdateExecuted := false
+
+ if retrieved := ldevProxy.Get("/logical_devices/"+targetLogDevID, 1, false, ""); retrieved == nil {
+ t.Error("Failed to get logical device")
+ } else {
+ t.Logf("Found raw logical device (root proxy): %+v", retrieved)
+
+ if retrieved.(*voltha.LogicalDevice).RootDeviceId == "" {
+ fwVersion = 0
+ } else {
+ fwVersion, _ = strconv.Atoi(retrieved.(*voltha.LogicalDevice).RootDeviceId)
+ fwVersion++
+ }
+
+ ldevProxy.RegisterCallback(
+ PRE_UPDATE,
+ commonCallback,
+ "PRE_UPDATE instructions (root proxy)", &preUpdateExecuted,
+ )
+ ldevProxy.RegisterCallback(
+ POST_UPDATE,
+ commonCallback,
+ "POST_UPDATE instructions (root proxy)", &postUpdateExecuted,
+ )
+
+ retrieved.(*voltha.LogicalDevice).RootDeviceId = strconv.Itoa(fwVersion)
+
+ if afterUpdate := ldevProxy.Update("/logical_devices/"+targetLogDevID, retrieved, false,
+ ""); afterUpdate == nil {
+ t.Error("Failed to update logical device")
+ } else {
+ t.Logf("Updated logical device : %+v", afterUpdate)
+ }
+ if d := ldevProxy.Get("/logical_devices/"+targetLogDevID, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated logical device (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+
+ t.Logf("Found logical device (root proxy): %s raw: %+v", string(djson), d)
+ }
+
+ if !preUpdateExecuted {
+ t.Error("PRE_UPDATE callback was not executed")
+ }
+ if !postUpdateExecuted {
+ t.Error("POST_UPDATE callback was not executed")
+ }
+ }
+}
+
+func Test_Proxy_2_3_2_Update_LogicalDeviceFlows(t *testing.T) {
+ // Get a device proxy and update a specific port
+ ldFlowsProxy := modelTestConfig.Root.node.CreateProxy("/logical_devices/"+ldevID+"/flows", false)
+ flows := ldFlowsProxy.Get("/", 0, false, "")
+ flows.(*openflow_13.Flows).Items[0].TableId = rand.Uint32()
+ t.Logf("before updated flows: %+v", flows)
+
+ ldFlowsProxy.RegisterCallback(
+ PRE_UPDATE,
+ commonCallback2,
+ )
+ ldFlowsProxy.RegisterCallback(
+ POST_UPDATE,
+ commonCallback2,
+ )
+
+ kvFlows := ldFlowsProxy.Get("/", 0, false, "")
+
+ if reflect.DeepEqual(flows, kvFlows) {
+ t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
+ }
+
+ if updated := ldFlowsProxy.Update("/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ t.Error("Failed to update logical device flows")
+ } else {
+ t.Logf("Updated logical device flows : %+v", updated)
+ }
+
+ if d := ldFlowsProxy.Get("/", 0, false, ""); d == nil {
+ t.Error("Failed to find updated logical device flows (flows proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found flows (flows proxy): %s", string(djson))
+ }
+
+ if d := modelTestConfig.RootProxy.Get("/logical_devices/"+ldevID+"/flows", 0, false,
+ ""); !reflect.ValueOf(d).IsValid() {
+ t.Error("Failed to find updated logical device flows (root proxy)")
+ } else {
+ djson, _ := json.Marshal(d)
+ t.Logf("Found logical device flows (root proxy): %s", string(djson))
+ }
+}
+
+func Test_Proxy_2_4_1_Remove_Device(t *testing.T) {
+ preRemoveExecuted := false
+ postRemoveExecuted := false
+
+ ldevProxy.RegisterCallback(
+ PRE_REMOVE,
+ commonCallback,
+ "PRE_REMOVE instructions (root proxy)", &preRemoveExecuted,
+ )
+ ldevProxy.RegisterCallback(
+ POST_REMOVE,
+ commonCallback,
+ "POST_REMOVE instructions (root proxy)", &postRemoveExecuted,
+ )
+
+ if removed := ldevProxy.Remove("/logical_devices/"+ldevID, ""); removed == nil {
+ t.Error("Failed to remove logical device")
+ } else {
+ t.Logf("Removed device : %+v", removed)
+ }
+ if d := ldevProxy.Get("/logical_devices/"+ldevID, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ djson, _ := json.Marshal(d)
+ t.Errorf("Device was not removed - %s", djson)
+ } else {
+ t.Logf("Device was removed: %s", ldevID)
}
if !preRemoveExecuted {
diff --git a/db/model/revision_test.go b/db/model/revision_test.go
index 5a88ff3..a12bf26 100644
--- a/db/model/revision_test.go
+++ b/db/model/revision_test.go
@@ -24,7 +24,7 @@
branch := &Branch{}
data := &voltha.Device{}
children := make(map[string][]Revision)
- rev := NewNonPersistedRevision(branch, data, children)
+ rev := NewNonPersistedRevision(nil, branch, data, children)
t.Logf("New revision created: %+v\n", rev)
}
diff --git a/db/model/root.go b/db/model/root.go
index 45eafb9..a05fbdd 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -21,25 +22,36 @@
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"reflect"
+ "sync"
"time"
)
+// Root is used to provide an abstraction to the base root structure
type Root interface {
Node
+ Load(rootClass interface{}) *root
+
+ ExecuteCallbacks()
+ AddCallback(callback CallbackFunction, args ...interface{})
+ AddNotificationCallback(callback CallbackFunction, args ...interface{})
}
+// root points to the top of the data model tree or sub-tree identified by a proxy
type root struct {
*node
Callbacks []CallbackTuple
NotificationCallbacks []CallbackTuple
- DirtyNodes map[string][]*node
- KvStore *Backend
- Loading bool
- RevisionClass interface{}
+ DirtyNodes map[string][]*node
+ KvStore *Backend
+ Loading bool
+ RevisionClass interface{}
+
+ mutex sync.RWMutex
}
+// NewRoot creates an new instance of a root object
func NewRoot(initialData interface{}, kvStore *Backend) *root {
root := &root{}
@@ -47,27 +59,34 @@
root.DirtyNodes = make(map[string][]*node)
root.Loading = false
+ // If there is no storage in place just revert to
+ // a non persistent mechanism
if kvStore != nil {
root.RevisionClass = reflect.TypeOf(PersistedRevision{})
} else {
root.RevisionClass = reflect.TypeOf(NonPersistedRevision{})
}
+
root.Callbacks = []CallbackTuple{}
root.NotificationCallbacks = []CallbackTuple{}
- root.node = NewNode(root, initialData,false, "")
+ root.node = NewNode(root, initialData, false, "")
return root
}
+// MakeTxBranch creates a new transaction branch
func (r *root) MakeTxBranch() string {
- txid_bin, _ := uuid.New().MarshalBinary()
- txid := hex.EncodeToString(txid_bin)[:12]
+ txidBin, _ := uuid.New().MarshalBinary()
+ txid := hex.EncodeToString(txidBin)[:12]
+
r.DirtyNodes[txid] = []*node{r.node}
r.node.MakeBranch(txid)
+
return txid
}
+// DeleteTxBranch removes a transaction branch
func (r *root) DeleteTxBranch(txid string) {
for _, dirtyNode := range r.DirtyNodes[txid] {
dirtyNode.DeleteBranch(txid)
@@ -75,16 +94,25 @@
delete(r.DirtyNodes, txid)
}
+// FoldTxBranch will merge the contents of a transaction branch with the root object
func (r *root) FoldTxBranch(txid string) {
- if _, err := r.MergeBranch(txid, true); err != nil {
+ // Start by doing a dry run of the merge
+ // If that fails, it bails out and the branch is deleted
+ if _, err := r.node.MergeBranch(txid, true); err != nil {
+ // Merge operation fails
r.DeleteTxBranch(txid)
} else {
- r.MergeBranch(txid, false)
+ r.node.MergeBranch(txid, false)
r.ExecuteCallbacks()
}
}
+// ExecuteCallbacks will invoke all the callbacks linked to root object
func (r *root) ExecuteCallbacks() {
+ r.mutex.Lock()
+ log.Debugf("ExecuteCallbacks has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("ExecuteCallbacks released the ROOT lock : %+v", r)
for len(r.Callbacks) > 0 {
callback := r.Callbacks[0]
r.Callbacks = r.Callbacks[1:]
@@ -97,25 +125,55 @@
}
}
-func (r *root) HasCallbacks() bool {
+func (r *root) hasCallbacks() bool {
return len(r.Callbacks) == 0
}
+// getCallbacks returns the available callbacks
+func (r *root) GetCallbacks() []CallbackTuple {
+ r.mutex.Lock()
+ log.Debugf("getCallbacks has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("getCallbacks released the ROOT lock : %+v", r)
+ return r.Callbacks
+}
+
+// getCallbacks returns the available notification callbacks
+func (r *root) GetNotificationCallbacks() []CallbackTuple {
+ r.mutex.Lock()
+ log.Debugf("GetNotificationCallbacks has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("GetNotificationCallbacks released the ROOT lock : %+v", r)
+ return r.NotificationCallbacks
+}
+
+// AddCallback inserts a new callback with its arguments
func (r *root) AddCallback(callback CallbackFunction, args ...interface{}) {
+ r.mutex.Lock()
+ log.Debugf("AddCallback has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("AddCallback released the ROOT lock : %+v", r)
r.Callbacks = append(r.Callbacks, CallbackTuple{callback, args})
}
+
+// AddNotificationCallback inserts a new notification callback with its arguments
func (r *root) AddNotificationCallback(callback CallbackFunction, args ...interface{}) {
+ r.mutex.Lock()
+ log.Debugf("AddNotificationCallback has the ROOT lock : %+v", r)
+ defer r.mutex.Unlock()
+ defer log.Debugf("AddNotificationCallback released the ROOT lock : %+v", r)
r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
}
+// Update modifies the content of an object at a given path with the provided data
func (r *root) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
- if makeBranch == nil {
+ if makeBranch != nil {
// TODO: raise error
}
- if r.HasCallbacks() {
+ if r.hasCallbacks() {
// TODO: raise error
}
@@ -129,19 +187,20 @@
result = r.node.Update(path, data, strict, "", nil)
}
- r.node.ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks()
return result
}
+// Add creates a new object at the given path with the provided data
func (r *root) Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
- if makeBranch == nil {
+ if makeBranch != nil {
// TODO: raise error
}
- if r.HasCallbacks() {
+ if r.hasCallbacks() {
// TODO: raise error
}
@@ -155,19 +214,20 @@
result = r.node.Add(path, data, "", nil)
}
- r.node.ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks()
return result
}
+// Remove discards an object at a given path
func (r *root) Remove(path string, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
- if makeBranch == nil {
+ if makeBranch != nil {
// TODO: raise error
}
- if r.HasCallbacks() {
+ if r.hasCallbacks() {
// TODO: raise error
}
@@ -181,33 +241,43 @@
result = r.node.Remove(path, "", nil)
}
- r.node.ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks()
return result
}
+// Load retrieves data from a persistent storage
func (r *root) Load(rootClass interface{}) *root {
//fakeKvStore := &Backend{}
//root := NewRoot(rootClass, nil)
//root.KvStore = r.KvStore
- r.LoadFromPersistence(rootClass)
+ r.loadFromPersistence(rootClass)
return r
}
+// MakeLatest updates a branch with the latest node revision
func (r *root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
r.makeLatest(branch, revision, changeAnnouncement)
}
+func (r *root) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
+ if r.RevisionClass.(reflect.Type) == reflect.TypeOf(PersistedRevision{}) {
+ return NewPersistedRevision(branch, data, children)
+ }
+
+ return NewNonPersistedRevision(r, branch, data, children)
+}
+
func (r *root) makeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
r.node.makeLatest(branch, revision, changeAnnouncement)
if r.KvStore != nil && branch.Txid == "" {
tags := make(map[string]string)
- for k, v := range r.Tags {
+ for k, v := range r.node.Tags {
tags[k] = v.GetHash()
}
data := &rootData{
- Latest: branch.Latest.GetHash(),
+ Latest: branch.GetLatest().GetHash(),
Tags: tags,
}
if blob, err := json.Marshal(data); err != nil {
@@ -226,7 +296,7 @@
Tags map[string]string `json:tags`
}
-func (r *root) LoadFromPersistence(rootClass interface{}) {
+func (r *root) loadFromPersistence(rootClass interface{}) {
var data rootData
r.Loading = true
@@ -239,10 +309,10 @@
stop := time.Now()
GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
for tag, hash := range data.Tags {
- r.LoadLatest(hash)
- r.Tags[tag] = r.Latest()
+ r.node.LoadLatest(hash)
+ r.node.Tags[tag] = r.node.Latest()
}
- r.LoadLatest(data.Latest)
+ r.node.LoadLatest(data.Latest)
r.Loading = false
}
diff --git a/db/model/transaction_test.go b/db/model/transaction_test.go
index 7b438b2..0e49f33 100644
--- a/db/model/transaction_test.go
+++ b/db/model/transaction_test.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -26,8 +27,8 @@
)
var (
- txTargetDevId string
- txDevId string
+ txTargetDevID string
+ txDevID string
)
func Test_Transaction_1_GetDevices(t *testing.T) {
@@ -39,7 +40,7 @@
t.Error("there are no available devices to retrieve")
} else {
// Save the target device id for later tests
- txTargetDevId = devices.([]interface{})[0].(*voltha.Device).Id
+ txTargetDevID = devices.([]interface{})[0].(*voltha.Device).Id
t.Logf("retrieved devices: %+v", devices)
}
@@ -47,8 +48,8 @@
}
func Test_Transaction_2_AddDevice(t *testing.T) {
- devIdBin, _ := uuid.New().MarshalBinary()
- txDevId = "0001" + hex.EncodeToString(devIdBin)[:12]
+ devIDBin, _ := uuid.New().MarshalBinary()
+ txDevID = "0001" + hex.EncodeToString(devIDBin)[:12]
ports := []*voltha.Port{
{
@@ -63,7 +64,7 @@
}
device := &voltha.Device{
- Id: txDevId,
+ Id: txDevID,
Type: "simulated_olt",
Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
AdminState: voltha.AdminState_PREPROVISIONED,
@@ -75,6 +76,7 @@
if added := addTx.Add("/devices", device); added == nil {
t.Error("Failed to add device")
} else {
+ txTargetDevID = added.(*voltha.Device).Id
t.Logf("Added device : %+v", added)
}
addTx.Commit()
@@ -82,7 +84,7 @@
func Test_Transaction_3_GetDevice_PostAdd(t *testing.T) {
- basePath := "/devices/" + txDevId
+ basePath := "/devices/" + txDevID
getDevWithPortsTx := modelTestConfig.RootProxy.OpenTransaction()
device1 := getDevWithPortsTx.Get(basePath+"/ports", 1, false)
@@ -98,7 +100,7 @@
func Test_Transaction_4_UpdateDevice(t *testing.T) {
updateTx := modelTestConfig.RootProxy.OpenTransaction()
- if retrieved := updateTx.Get("/devices/"+txTargetDevId, 1, false); retrieved == nil {
+ if retrieved := updateTx.Get("/devices/"+txTargetDevID, 1, false); retrieved == nil {
t.Error("Failed to get device")
} else {
var fwVersion int
@@ -106,7 +108,7 @@
fwVersion = 0
} else {
fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
- fwVersion += 1
+ fwVersion++
}
cloned := reflect.ValueOf(retrieved).Elem().Interface().(voltha.Device)
@@ -114,7 +116,7 @@
t.Logf("Before update : %+v", cloned)
// FIXME: The makeBranch passed in function is nil or not being executed properly!!!!!
- if afterUpdate := updateTx.Update("/devices/"+txTargetDevId, &cloned, false); afterUpdate == nil {
+ if afterUpdate := updateTx.Update("/devices/"+txTargetDevID, &cloned, false); afterUpdate == nil {
t.Error("Failed to update device")
} else {
t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
@@ -125,7 +127,7 @@
func Test_Transaction_5_GetDevice_PostUpdate(t *testing.T) {
- basePath := "/devices/" + txDevId
+ basePath := "/devices/" + txDevID
getDevWithPortsTx := modelTestConfig.RootProxy.OpenTransaction()
device1 := getDevWithPortsTx.Get(basePath+"/ports", 1, false)
@@ -141,7 +143,7 @@
func Test_Transaction_6_RemoveDevice(t *testing.T) {
removeTx := modelTestConfig.RootProxy.OpenTransaction()
- if removed := removeTx.Remove("/devices/" + txDevId); removed == nil {
+ if removed := removeTx.Remove("/devices/" + txDevID); removed == nil {
t.Error("Failed to remove device")
} else {
t.Logf("Removed device : %+v", removed)
@@ -151,7 +153,7 @@
func Test_Transaction_7_GetDevice_PostRemove(t *testing.T) {
- basePath := "/devices/" + txDevId
+ basePath := "/devices/" + txDevID
getDevTx := modelTestConfig.RootProxy.OpenTransaction()
device := modelTestConfig.RootProxy.Get(basePath, 0, false, "")
diff --git a/db/model/utils.go b/db/model/utils.go
index 1a460ed..f0fd618 100644
--- a/db/model/utils.go
+++ b/db/model/utils.go
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package model
import (
@@ -22,6 +23,7 @@
"strings"
)
+// IsProtoMessage determines if the specified implements proto.Message type
func IsProtoMessage(object interface{}) bool {
var ok = false
@@ -32,6 +34,7 @@
return ok
}
+// FindOwnerType will traverse a data structure and find the parent type of the specified object
func FindOwnerType(obj reflect.Value, name string, depth int, found bool) reflect.Type {
prefix := ""
for d:=0; d< depth; d++ {
@@ -56,7 +59,7 @@
return obj.Type()
}
- for i := 0; i < obj.NumField(); i += 1 {
+ for i := 0; i < obj.NumField(); i++ {
v := reflect.Indirect(obj)
json := strings.Split(v.Type().Field(i).Tag.Get("json"), ",")
@@ -74,13 +77,13 @@
n := reflect.New(obj.Type())
n.Elem().Set(s)
- for i := 0; i < n.Elem().Len(); i += 1 {
+ for i := 0; i < n.Elem().Len(); i++ {
if found {
return reflect.ValueOf(n.Elem().Index(i).Interface()).Type()
}
}
- for i := 0; i < obj.Len(); i += 1 {
+ for i := 0; i < obj.Len(); i++ {
if found {
return obj.Index(i).Type()
}
@@ -96,6 +99,7 @@
return nil
}
+// FindKeyOwner will traverse a structure to find the owner type of the specified name
func FindKeyOwner(iface interface{}, name string, depth int) interface{} {
obj := reflect.ValueOf(iface)
k := obj.Kind()
@@ -126,7 +130,7 @@
n := reflect.New(obj.Type())
n.Elem().Set(s)
- for i := 0; i < n.Elem().Len(); i += 1 {
+ for i := 0; i < n.Elem().Len(); i++ {
if rc := FindKeyOwner(n.Elem().Index(i).Interface(), name, depth+1); rc != nil {
return rc
}
@@ -138,6 +142,7 @@
return nil
}
+// GetAttributeValue traverse a structure to find the value of an attribute
// FIXME: Need to figure out if GetAttributeValue and GetAttributeStructure can become one
// Code is repeated in both, but outputs have a different purpose
// Left as-is for now to get things working
@@ -181,7 +186,7 @@
n := reflect.New(obj.Type())
n.Elem().Set(s)
- for i := 0; i < obj.Len(); i += 1 {
+ for i := 0; i < obj.Len(); i++ {
if attribName, attribValue = GetAttributeValue(obj.Index(i).Interface(), name, depth+1); attribValue.IsValid() {
return attribName, attribValue
}
@@ -194,6 +199,7 @@
}
+// GetAttributeStructure will traverse a structure to find the data structure for the named attribute
// FIXME: See GetAttributeValue(...) comment
func GetAttributeStructure(data interface{}, name string, depth int) reflect.StructField {
var result reflect.StructField
@@ -234,7 +240,7 @@
n := reflect.New(obj.Type())
n.Elem().Set(s)
- for i := 0; i < obj.Len(); i += 1 {
+ for i := 0; i < obj.Len(); i++ {
if rc := GetAttributeStructure(obj.Index(i).Interface(), name, depth+1); rc.Name != "" {
return rc
}
@@ -248,7 +254,7 @@
}
-func Clone2(a interface{}) interface{} {
+func clone2(a interface{}) interface{} {
b := reflect.ValueOf(a)
buff := new(bytes.Buffer)
enc := gob.NewEncoder(buff)
@@ -259,7 +265,7 @@
return b.Interface()
}
-func Clone(a, b interface{}) interface{} {
+func clone(a, b interface{}) interface{} {
buff := new(bytes.Buffer)
enc := gob.NewEncoder(buff)
dec := gob.NewDecoder(buff)
diff --git a/db/model/utils_test.go b/db/model/utils_test.go
index c4cc60d..618e158 100644
--- a/db/model/utils_test.go
+++ b/db/model/utils_test.go
@@ -27,13 +27,13 @@
FirmwareVersion: "someversion",
}
b := &voltha.Device{}
- Clone(reflect.ValueOf(a).Interface(), b)
+ clone(reflect.ValueOf(a).Interface(), b)
t.Logf("A: %+v, B: %+v", a, b)
b.Id = "12345"
t.Logf("A: %+v, B: %+v", a, b)
var c *voltha.Device
- c = Clone2(a).(*voltha.Device)
+ c = clone2(a).(*voltha.Device)
t.Logf("A: %+v, C: %+v", a, c)
c.Id = "12345"
t.Logf("A: %+v, C: %+v", a, c)