VOL-1174: Ported transaction support to go data model
Change-Id: I4cabefac36c95f690aa121c71f36b6aaf41180b0
diff --git a/db/model/merge.go b/db/model/merge.go
index 97257b4..ef490d8 100644
--- a/db/model/merge.go
+++ b/db/model/merge.go
@@ -76,7 +76,7 @@
}
}
for v, _ := range changes.KeyMap1 {
- if _, ok := changes.KeyMap2[v]; ok && lst1[changes.KeyMap1[v]].GetHash() != lst1[changes.KeyMap2[v]].GetHash() {
+ if _, ok := changes.KeyMap2[v]; ok && lst1[changes.KeyMap1[v]].GetHash() != lst2[changes.KeyMap2[v]].GetHash() {
changes.ChangedKeys[v] = struct{}{}
}
}
@@ -100,7 +100,7 @@
configChanged = true
}
- newChildren := reflect.ValueOf(dstRev.GetChildren()).Elem().Interface().(map[string][]Revision)
+ newChildren := reflect.ValueOf(dstRev.GetChildren()).Interface().(map[string][]Revision)
childrenFields := ChildrenFields(forkRev.GetData())
for fieldName, field := range childrenFields {
@@ -108,7 +108,7 @@
srcList := srcRev.GetChildren()[fieldName]
dstList := dstRev.GetChildren()[fieldName]
- if revisionsAreEqual(forkList, srcList) {
+ if revisionsAreEqual(dstList, srcList) {
for _, rev := range srcList {
mergeChildFunc(rev)
}
@@ -126,7 +126,7 @@
if field.IsContainer {
changes = append(
changes, ChangeTuple{POST_LISTCHANGE,
- NewOperationContext("", nil, fieldName, "")},
+ NewOperationContext("", nil, fieldName, "")},
)
}
}
@@ -139,23 +139,30 @@
if revisionsAreEqual(dstList, forkList) {
src := newChangeAnalysis(forkList, srcList, field.Key)
- newList := reflect.ValueOf(srcList).Elem().Interface().([]Revision)
+ newList := reflect.ValueOf(srcList).Interface().([]Revision)
for key, _ := range src.AddedKeys {
- idx := src.KeyMap1[key]
+ idx := src.KeyMap2[key]
newRev := mergeChildFunc(newList[idx])
- newList[idx] = newRev
- changes = append(changes, ChangeTuple{POST_ADD,newRev.GetData()})
+ // FIXME: newRev may come back as nil... exclude those entries for now
+ if newRev != nil {
+ newList[idx] = newRev
+ changes = append(changes, ChangeTuple{POST_ADD, newRev.GetData()})
+ }
}
for key, _ := range src.RemovedKeys {
oldRev := forkList[src.KeyMap1[key]]
- changes = append(changes, ChangeTuple{POST_REMOVE,oldRev.GetData()})
+ changes = append(changes, ChangeTuple{POST_REMOVE, oldRev.GetData()})
}
for key, _ := range src.ChangedKeys {
idx := src.KeyMap2[key]
newRev := mergeChildFunc(newList[idx])
- newList[idx] = newRev
+
+ // FIXME: newRev may come back as nil... exclude those entries for now
+ if newRev != nil {
+ newList[idx] = newRev
+ }
}
newChildren[fieldName] = newList
@@ -163,7 +170,7 @@
src := newChangeAnalysis(forkList, srcList, field.Key)
dst := newChangeAnalysis(forkList, dstList, field.Key)
- newList := reflect.ValueOf(dstList).Elem().Interface().([]Revision)
+ newList := reflect.ValueOf(dstList).Interface().([]Revision)
for key, _ := range src.AddedKeys {
if _, exists := dst.AddedKeys[key]; exists {
@@ -177,7 +184,7 @@
} else {
newRev := mergeChildFunc(srcList[src.KeyMap2[key]])
newList = append(newList, newRev)
- changes = append(changes, ChangeTuple{POST_ADD,newRev.GetData()})
+ changes = append(changes, ChangeTuple{POST_ADD, newRev.GetData()})
}
}
for key, _ := range src.ChangedKeys {
@@ -213,7 +220,7 @@
newList[len(newList)-1] = nil
newList = newList[:len(newList)-1]
- changes = append(changes, ChangeTuple{POST_REMOVE,oldRev.GetData()})
+ changes = append(changes, ChangeTuple{POST_REMOVE, oldRev.GetData()})
}
}
@@ -233,10 +240,11 @@
rev = rev.UpdateAllChildren(newChildren, dstRev.GetBranch())
if configChanged {
- changes = append(changes, ChangeTuple{POST_UPDATE,rev.GetData()})
+ changes = append(changes, ChangeTuple{POST_UPDATE, rev.GetData()})
}
return rev, changes
- }
+ } else {
+ return nil, nil
- return nil, nil
+ }
}
diff --git a/db/model/node.go b/db/model/node.go
index 77e0cc7..0a77fff 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -15,6 +15,9 @@
*/
package model
+// TODO: proper error handling
+// TODO: proper logging
+
import (
"fmt"
"github.com/golang/protobuf/proto"
@@ -590,7 +593,7 @@
childBranch := rev.GetBranch()
if childBranch.Txid == txid {
- rev = childBranch.Node.mergeTxBranch(txid, dryRun)
+ rev, _ = childBranch.Node.mergeTxBranch(txid, dryRun)
}
return rev
@@ -598,7 +601,7 @@
return f
}
-func (n *Node) mergeTxBranch(txid string, dryRun bool) Revision {
+func (n *Node) mergeTxBranch(txid string, dryRun bool) (Revision, error) {
srcBranch := n.Branches[txid]
dstBranch := n.Branches[NONE]
@@ -613,7 +616,8 @@
delete(n.Branches, txid)
}
- return rev
+ // TODO: return proper error when one occurs
+ return rev, nil
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 26d25a0..4a97941 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -188,7 +188,7 @@
}
func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
- newRev := reflect.ValueOf(npr).Interface().(NonPersistedRevision)
+ newRev := reflect.ValueOf(npr).Elem().Interface().(NonPersistedRevision)
newRev.SetBranch(branch)
newRev.SetChildren(children)
newRev.Finalize()
diff --git a/db/model/proxy.go b/db/model/proxy.go
index 79087a3..4b7a59f 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -82,7 +82,7 @@
} else {
fullPath = p.Path + path
}
- return p.Node.Update(fullPath, data, strict, txid, nil)
+ return p.Root.Update(fullPath, data, strict, txid, nil)
}
func (p *Proxy) Add(path string, data interface{}, txid string) interface{} {
@@ -96,7 +96,7 @@
} else {
fullPath = p.Path + path
}
- return p.Node.Add(fullPath, data, txid, nil)
+ return p.Root.Add(fullPath, data, txid, nil)
}
func (p *Proxy) Remove(path string, txid string) interface{} {
@@ -110,7 +110,7 @@
} else {
fullPath = p.Path + path
}
- return p.Node.Remove(fullPath, txid, nil)
+ return p.Root.Remove(fullPath, txid, nil)
}
func (p *Proxy) openTransaction() *Transaction {
diff --git a/db/model/root.go b/db/model/root.go
index d10e9f1..b4885c1 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -27,7 +27,7 @@
type Root struct {
*Node
- DirtyNodes map[string]*Node
+ DirtyNodes map[string][]*Node
KvStore *Backend
Loading bool
RevisionClass interface{}
@@ -38,7 +38,7 @@
func NewRoot(initialData interface{}, kvStore *Backend, revisionClass interface{}) *Root {
root := &Root{}
root.KvStore = kvStore
- root.DirtyNodes = make(map[string]*Node)
+ root.DirtyNodes = make(map[string][]*Node)
root.Loading = false
if kvStore != nil /*&& TODO: RevisionClass is not a subclass of PersistedRevision ??? */ {
revisionClass = reflect.TypeOf(PersistedRevision{})
@@ -63,23 +63,23 @@
func (r *Root) MakeTxBranch() string {
txid_bin, _ := uuid.New().MarshalBinary()
txid := hex.EncodeToString(txid_bin)[:12]
- r.DirtyNodes[txid] = r.Node
+ r.DirtyNodes[txid] = []*Node{r.Node}
r.Node.makeTxBranch(txid)
return txid
}
func (r *Root) DeleteTxBranch(txid string) {
- for _, dirtyNode := range r.DirtyNodes {
+ for _, dirtyNode := range r.DirtyNodes[txid] {
dirtyNode.deleteTxBranch(txid)
}
delete(r.DirtyNodes, txid)
}
func (r *Root) FoldTxBranch(txid string) {
- if err := r.Node.mergeTxBranch(txid, true); err != nil {
+ if _, err := r.mergeTxBranch(txid, true); err != nil {
r.DeleteTxBranch(txid)
} else {
- r.Node.mergeTxBranch(txid, false)
+ r.mergeTxBranch(txid, false)
r.executeCallbacks()
}
}
@@ -120,10 +120,8 @@
}
if txid != "" {
- //dirtied := r.DirtyNodes[txid]
-
trackDirty := func(node *Node) *Branch {
- //dirtied.Add(Node)
+ r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
return node.makeTxBranch(txid)
}
result = r.Node.Update(path, data, strict, txid, trackDirty)
@@ -148,10 +146,8 @@
}
if txid != "" {
- //dirtied := r.DirtyNodes[txid]
-
trackDirty := func(node *Node) *Branch {
- //dirtied.Add(Node)
+ r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
return node.makeTxBranch(txid)
}
result = r.Node.Add(path, data, txid, trackDirty)
@@ -176,10 +172,8 @@
}
if txid != "" {
- //dirtied := r.DirtyNodes[txid]
-
trackDirty := func(node *Node) *Branch {
- //dirtied.Add(Node)
+ r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
return node.makeTxBranch(txid)
}
result = r.Node.Remove(path, txid, trackDirty)
diff --git a/db/model/transaction.go b/db/model/transaction.go
index 1bed0d1..0ee3643 100644
--- a/db/model/transaction.go
+++ b/db/model/transaction.go
@@ -29,34 +29,34 @@
}
return tx
}
-func (t *Transaction) Get(path string, depth int, deep bool) Revision {
+func (t *Transaction) Get(path string, depth int, deep bool) interface{} {
if t.txid == "" {
fmt.Errorf("closed transaction")
return nil
}
// TODO: need to review the return values at the different layers!!!!!
- return t.proxy.Get(path, depth, deep, t.txid).(Revision)
+ return t.proxy.Get(path, depth, deep, t.txid)
}
-func (t *Transaction) Update(path string, data interface{}, strict bool) Revision {
+func (t *Transaction) Update(path string, data interface{}, strict bool) interface{} {
if t.txid == "" {
fmt.Errorf("closed transaction")
return nil
}
- return t.proxy.Update(path, data, strict, t.txid).(Revision)
+ return t.proxy.Update(path, data, strict, t.txid)
}
-func (t *Transaction) Add(path string, data interface{}) Revision {
+func (t *Transaction) Add(path string, data interface{}) interface{} {
if t.txid == "" {
fmt.Errorf("closed transaction")
return nil
}
- return t.proxy.Add(path, data, t.txid).(Revision)
+ return t.proxy.Add(path, data, t.txid)
}
-func (t *Transaction) Remove(path string) Revision {
+func (t *Transaction) Remove(path string) interface{} {
if t.txid == "" {
fmt.Errorf("closed transaction")
return nil
}
- return t.proxy.Remove(path, t.txid).(Revision)
+ return t.proxy.Remove(path, t.txid)
}
func (t *Transaction) Cancel() {
t.proxy.cancelTransaction(t.txid)
diff --git a/db/model/transaction_test.go b/db/model/transaction_test.go
new file mode 100644
index 0000000..4a0cc29
--- /dev/null
+++ b/db/model/transaction_test.go
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package model
+
+import (
+ "github.com/opencord/voltha-go/protos/voltha"
+ "github.com/opencord/voltha-go/common/log"
+ "testing"
+ "github.com/google/uuid"
+ "encoding/hex"
+ "strconv"
+ "reflect"
+)
+
+type transactionTest struct {
+ Root *Root
+ Backend *Backend
+ Proxy *Proxy
+ DbPrefix string
+ DbType string
+ DbHost string
+ DbPort int
+ DbTimeout int
+}
+
+var (
+ tx = &transactionTest{
+ DbPrefix: "service/voltha/data/core/0001",
+ DbType: "etcd",
+ //DbHost: "10.102.58.0",
+ DbHost: "localhost",
+ DbPort: 2379,
+ DbTimeout: 5,
+ }
+ txTargetDevId string
+ txDevId string
+)
+
+func init() {
+ if _, err := log.SetLogger(log.CONSOLE, 0, log.Fields{"instanceId": "transaction_test"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("cannot setup logging")
+ }
+ defer log.CleanUp()
+
+ tx.Backend = NewBackend(tx.DbType, tx.DbHost, tx.DbPort, tx.DbTimeout, tx.DbPrefix)
+
+ msgClass := &voltha.Voltha{}
+ root := NewRoot(msgClass, tx.Backend, nil)
+ tx.Root = root.Load(msgClass)
+
+ GetProfiling().Report()
+
+ tx.Proxy = tx.Root.Node.GetProxy("/", false)
+
+}
+
+func Test_Transaction_1_GetDevices(t *testing.T) {
+ getTx := tx.Proxy.openTransaction()
+
+ devices := getTx.Get("/devices", 1, false)
+
+ if len(devices.([]interface{})) == 0 {
+ t.Error("there are no available devices to retrieve")
+ } else {
+ // Save the target device id for later tests
+ txTargetDevId = devices.([]interface{})[0].(*voltha.Device).Id
+ t.Logf("retrieved devices: %+v", devices)
+ }
+
+ tx.Proxy.commitTransaction(getTx.txid)
+}
+
+func Test_Transaction_2_GetDevice(t *testing.T) {
+
+ basePath := "/devices/" + txTargetDevId
+
+ getDevWithPortsTx := tx.Proxy.openTransaction()
+ device1 := getDevWithPortsTx.Get(basePath+"/ports", 1, false)
+ t.Logf("retrieved device with ports: %+v", device1)
+ tx.Proxy.commitTransaction(getDevWithPortsTx.txid)
+
+ getDevTx := tx.Proxy.openTransaction()
+ device2 := getDevTx.Get(basePath, 0, false)
+ t.Logf("retrieved device: %+v", device2)
+ tx.Proxy.commitTransaction(getDevTx.txid)
+}
+
+func Test_Transaction_3_AddDevice(t *testing.T) {
+ devIdBin, _ := uuid.New().MarshalBinary()
+ txDevId = "0001" + hex.EncodeToString(devIdBin)[:12]
+
+ device := &voltha.Device{
+ Id: txDevId,
+ Type: "simulated_olt",
+ Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
+ AdminState: voltha.AdminState_PREPROVISIONED,
+ }
+
+ addTx := tx.Proxy.openTransaction()
+
+ if added := addTx.Add("/devices", device); added == nil {
+ t.Error("Failed to add device")
+ } else {
+ t.Logf("Added device : %+v", added)
+ }
+ tx.Proxy.commitTransaction(addTx.txid)
+}
+
+func Test_Transaction_4_UpdateDevice(t *testing.T) {
+ updateTx := tx.Proxy.openTransaction()
+ if retrieved := updateTx.Get("/devices/"+txTargetDevId, 1, false); retrieved == nil {
+ t.Error("Failed to get device")
+ } else {
+ var fwVersion int
+ if retrieved.(*voltha.Device).FirmwareVersion == "n/a" {
+ fwVersion = 0
+ } else {
+ fwVersion, _ = strconv.Atoi(retrieved.(*voltha.Device).FirmwareVersion)
+ fwVersion += 1
+ }
+
+ cloned := reflect.ValueOf(retrieved).Elem().Interface().(voltha.Device)
+ cloned.FirmwareVersion = strconv.Itoa(fwVersion)
+ t.Logf("Before update : %+v", cloned)
+
+ // FIXME: The makeBranch passed in function is nil or not being executed properly!!!!!
+ if afterUpdate := updateTx.Update("/devices/"+txTargetDevId, &cloned, false); afterUpdate == nil {
+ t.Error("Failed to update device")
+ } else {
+ t.Logf("Updated device : %+v", afterUpdate.(Revision).GetData())
+ }
+ }
+ tx.Proxy.commitTransaction(updateTx.txid)
+}
+
+func Test_Transaction_5_RemoveDevice(t *testing.T) {
+ removeTx := tx.Proxy.openTransaction()
+ if removed := removeTx.Remove("/devices/"+txDevId); removed == nil {
+ t.Error("Failed to remove device")
+ } else {
+ t.Logf("Removed device : %+v", removed)
+ }
+ tx.Proxy.commitTransaction(removeTx.txid)
+}