VOL-1774 Etcd Crash Handling
Change-Id: I1eeb726654c3972fd0a4fafae134607e5a810415
diff --git a/db/model/node.go b/db/model/node.go
index 264a9dd..7e703ff 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -41,11 +41,11 @@
// CRUD functions
Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
- Get(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
- List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
+ Get(ctx context.Context, path string, hash string, depth int, deep bool, txid string) (interface{}, error)
+ List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) (interface{}, error)
Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision
Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision
- CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy
+ CreateProxy(ctx context.Context, path string, exclusive bool) (*Proxy, error)
GetProxy() *Proxy
@@ -250,7 +250,7 @@
}
// Get retrieves the data from a node tree that resides at the specified path
-func (n *node) List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{} {
+func (n *node) List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) (interface{}, error) {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -278,18 +278,23 @@
var result interface{}
var prList []interface{}
- if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil {
+
+ pr, err := rev.LoadFromPersistence(ctx, path, txid, nil)
+ if err != nil {
+ log.Errorf("failed-to-load-from-persistence")
+ return nil, err
+ }
+ if pr != nil {
for _, revEntry := range pr {
prList = append(prList, revEntry.GetData())
}
result = prList
}
-
- return result
+ return result, nil
}
// Get retrieves the data from a node tree that resides at the specified path
-func (n *node) Get(ctx context.Context, path string, hash string, depth int, reconcile bool, txid string) interface{} {
+func (n *node) Get(ctx context.Context, path string, hash string, depth int, reconcile bool, txid string) (interface{}, error) {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -328,13 +333,13 @@
"hash": hash,
"age": entryAge,
})
- return proto.Clone(entry.(Revision).GetData().(proto.Message))
+ return proto.Clone(entry.(Revision).GetData().(proto.Message)), nil
} else {
log.Debugw("cache-entry-expired", log.Fields{"path": path, "hash": hash, "age": entryAge})
}
} else if result = n.getPath(ctx, rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
log.Debugw("using-rev-tree-entry", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
- return result
+ return result, nil
} else {
log.Debugw("not-using-cache-entry", log.Fields{
"path": path,
@@ -354,7 +359,10 @@
// If we got to this point, we are either trying to reconcile with the db
// or we simply failed at getting information from memory
if n.Root.KvStore != nil {
- if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil && len(pr) > 0 {
+ if pr, err := rev.LoadFromPersistence(ctx, path, txid, nil); err != nil {
+ log.Errorf("failed-to-load-from-persistence")
+ return nil, err
+ } else if len(pr) > 0 {
// Did we receive a single or multiple revisions?
if len(pr) > 1 {
var revs []interface{}
@@ -367,8 +375,7 @@
}
}
}
-
- return result
+ return result, nil
}
//getPath traverses the specified path and retrieves the data associated to it
@@ -949,11 +956,11 @@
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ node Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// CreateProxy returns a reference to a sub-tree of the data model
-func (n *node) CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy {
+func (n *node) CreateProxy(ctx context.Context, path string, exclusive bool) (*Proxy, error) {
return n.createProxy(ctx, path, path, n, exclusive)
}
-func (n *node) createProxy(ctx context.Context, path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+func (n *node) createProxy(ctx context.Context, path string, fullPath string, parentNode *node, exclusive bool) (*Proxy, error) {
log.Debugw("node-create-proxy", log.Fields{
"node-type": reflect.ValueOf(n.Type).Type(),
"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
@@ -965,7 +972,7 @@
path = path[1:]
}
if path == "" {
- return n.makeProxy(path, fullPath, parentNode, exclusive)
+ return n.makeProxy(path, fullPath, parentNode, exclusive), nil
}
rev := n.GetBranch(NONE).GetLatest()
@@ -998,7 +1005,7 @@
"name": name,
})
newNode := n.MakeNode(reflect.New(field.ClassType.Elem()).Interface(), "")
- return newNode.makeProxy(path, fullPath, parentNode, exclusive)
+ return newNode.makeProxy(path, fullPath, parentNode, exclusive), nil
} else if field.Key != "" {
log.Debugw("key-proxy", log.Fields{
"node-type": reflect.ValueOf(n.Type).Type(),
@@ -1026,7 +1033,10 @@
"fullPath": fullPath,
"name": name,
})
- } else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(ctx, fullPath, "", nil); revs != nil && len(revs) > 0 {
+ } else if revs, err := n.GetBranch(NONE).GetLatest().LoadFromPersistence(ctx, fullPath, "", nil); err != nil {
+ log.Errorf("failed-to-load-from-persistence")
+ return nil, err
+ } else if revs != nil && len(revs) > 0 {
log.Debugw("found-revision-matching-key-in-db", log.Fields{
"node-type": reflect.ValueOf(n.Type).Type(),
"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
@@ -1081,7 +1091,7 @@
"fullPath": fullPath,
"latest-rev": rev.GetHash(),
})
- return nil
+ return nil, nil
}
func (n *node) makeProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index 384caed..c7fb6ea 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -496,9 +496,9 @@
return npr.lastUpdate
}
-func (npr *NonPersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (npr *NonPersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error) {
// stub... required by interface
- return nil
+ return nil, nil
}
func (npr *NonPersistedRevision) SetupWatch(key string) {
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index b5f1d09..b9aad44 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -518,13 +518,14 @@
// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
// by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error) {
pr.mutex.Lock()
defer pr.mutex.Unlock()
log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
var response []Revision
+ var err error
for strings.HasPrefix(path, "/") {
path = path[1:]
@@ -533,7 +534,11 @@
if pr.kvStore != nil && path != "" {
if blobs == nil || len(blobs) == 0 {
log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
- blobs, _ = pr.kvStore.List(path)
+
+ if blobs, err = pr.kvStore.List(path); err != nil {
+ log.Errorw("failed-to-retrieve-data-from-kvstore", log.Fields{"error": err})
+ return nil, err
+ }
}
partition := strings.SplitN(path, "/", 2)
@@ -622,5 +627,5 @@
}
}
- return response
+ return response, nil
}
diff --git a/db/model/proxy.go b/db/model/proxy.go
index b5378fe..a7eedda 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -214,7 +214,7 @@
// List will retrieve information from the data model at the specified path location
// A list operation will force access to persistence storage
-func (p *Proxy) List(ctx context.Context, path string, depth int, deep bool, txid string) interface{} {
+func (p *Proxy) List(ctx context.Context, path string, depth int, deep bool, txid string) (interface{}, error) {
var effectivePath string
if path == "/" {
effectivePath = p.getFullPath()
@@ -234,14 +234,11 @@
"controlled": controlled,
"operation": p.GetOperation(),
})
-
- rv := p.GetRoot().List(ctx, path, "", depth, deep, txid)
-
- return rv
+ return p.GetRoot().List(ctx, path, "", depth, deep, txid)
}
// Get will retrieve information from the data model at the specified path location
-func (p *Proxy) Get(ctx context.Context, path string, depth int, deep bool, txid string) interface{} {
+func (p *Proxy) Get(ctx context.Context, path string, depth int, deep bool, txid string) (interface{}, error) {
var effectivePath string
if path == "/" {
effectivePath = p.getFullPath()
@@ -262,16 +259,14 @@
"operation": p.GetOperation(),
})
- rv := p.GetRoot().Get(ctx, path, "", depth, deep, txid)
-
- return rv
+ return p.GetRoot().Get(ctx, path, "", depth, deep, txid)
}
// Update will modify information in the data model at the specified location with the provided data
-func (p *Proxy) Update(ctx context.Context, path string, data interface{}, strict bool, txid string) interface{} {
+func (p *Proxy) Update(ctx context.Context, path string, data interface{}, strict bool, txid string) (interface{}, error) {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
- return nil
+ return nil, fmt.Errorf("invalid path: %s", path)
}
var fullPath string
var effectivePath string
@@ -298,26 +293,29 @@
})
if p.GetRoot().KvStore != nil {
- p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ if _, err := p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
+ return nil, err
+ }
defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
}
result := p.GetRoot().Update(ctx, fullPath, data, strict, txid, nil)
if result != nil {
- return result.GetData()
+ return result.GetData(), nil
}
- return nil
+ return nil, nil
}
// AddWithID will insert new data at specified location.
// This method also allows the user to specify the ID of the data entry to ensure
// that access control is active while inserting the information.
-func (p *Proxy) AddWithID(ctx context.Context, path string, id string, data interface{}, txid string) interface{} {
+func (p *Proxy) AddWithID(ctx context.Context, path string, id string, data interface{}, txid string) (interface{}, error) {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
- return nil
+ return nil, fmt.Errorf("invalid path: %s", path)
}
var fullPath string
var effectivePath string
@@ -344,24 +342,27 @@
})
if p.GetRoot().KvStore != nil {
- p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ if _, err := p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
+ return nil, err
+ }
defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
}
result := p.GetRoot().Add(ctx, fullPath, data, txid, nil)
if result != nil {
- return result.GetData()
+ return result.GetData(), nil
}
- return nil
+ return nil, nil
}
// Add will insert new data at specified location.
-func (p *Proxy) Add(ctx context.Context, path string, data interface{}, txid string) interface{} {
+func (p *Proxy) Add(ctx context.Context, path string, data interface{}, txid string) (interface{}, error) {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
- return nil
+ return nil, fmt.Errorf("invalid path: %s", path)
}
var fullPath string
var effectivePath string
@@ -388,24 +389,27 @@
})
if p.GetRoot().KvStore != nil {
- p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ if _, err := p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
+ return nil, err
+ }
defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
}
result := p.GetRoot().Add(ctx, fullPath, data, txid, nil)
if result != nil {
- return result.GetData()
+ return result.GetData(), nil
}
- return nil
+ return nil, nil
}
// Remove will delete an entry at the specified location
-func (p *Proxy) Remove(ctx context.Context, path string, txid string) interface{} {
+func (p *Proxy) Remove(ctx context.Context, path string, txid string) (interface{}, error) {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
- return nil
+ return nil, fmt.Errorf("invalid path: %s", path)
}
var fullPath string
var effectivePath string
@@ -432,24 +436,27 @@
})
if p.GetRoot().KvStore != nil {
- p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ if _, err := p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
+ return nil, err
+ }
defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
}
result := p.GetRoot().Remove(ctx, fullPath, txid, nil)
if result != nil {
- return result.GetData()
+ return result.GetData(), nil
}
- return nil
+ return nil, nil
}
// CreateProxy to interact with specific path directly
-func (p *Proxy) CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy {
+func (p *Proxy) CreateProxy(ctx context.Context, path string, exclusive bool) (*Proxy, error) {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
- return nil
+ return nil, fmt.Errorf("invalid path: %s", path)
}
var fullPath string
@@ -477,10 +484,12 @@
})
if p.GetRoot().KvStore != nil {
- p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ if _, err := p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
+ return nil, err
+ }
defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
}
-
return p.GetRoot().CreateProxy(ctx, fullPath, exclusive)
}
diff --git a/db/model/proxy_load_test.go b/db/model/proxy_load_test.go
index 3f3327b..e5ae1c1 100644
--- a/db/model/proxy_load_test.go
+++ b/db/model/proxy_load_test.go
@@ -23,6 +23,7 @@
"github.com/opencord/voltha-protos/v2/go/common"
"github.com/opencord/voltha-protos/v2/go/openflow_13"
"github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/stretchr/testify/assert"
"math/rand"
"reflect"
"strconv"
@@ -81,6 +82,7 @@
}
func init() {
+ var err error
BenchmarkProxy_Root = NewRoot(&voltha.Voltha{}, nil)
BenchmarkProxy_Logger, _ = log.AddPackage(log.JSON, log.DebugLevel, log.Fields{"instanceId": "PLT"})
@@ -96,7 +98,9 @@
}
log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
- BenchmarkProxy_DeviceProxy = BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/", false)
+ if BenchmarkProxy_DeviceProxy, err = BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/", false); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create benchmark proxy")
+ }
// Register ADD instructions callbacks
BenchmarkProxy_PLT = &proxyLoadTest{}
@@ -110,6 +114,7 @@
}
func BenchmarkProxy_AddDevice(b *testing.B) {
+ var err error
defer GetProfiling().Report()
b.RunParallel(func(pb *testing.PB) {
b.Log("Started adding devices")
@@ -150,7 +155,10 @@
var added interface{}
// Add the device
- if added = BenchmarkProxy_DeviceProxy.AddWithID(context.Background(), "/devices", ltDevID, ltDevice, ""); added == nil {
+ if added, err = BenchmarkProxy_DeviceProxy.AddWithID(context.Background(), "/devices", ltDevID, ltDevice, ""); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create proxy")
+ }
+ if added == nil {
BenchmarkProxy_Logger.Errorf("Failed to add device: %+v", ltDevice)
continue
} else {
@@ -174,9 +182,17 @@
if len(BenchmarkProxy_PLT.addedDevices) > 0 {
var target interface{}
randomID := BenchmarkProxy_PLT.addedDevices[rand.Intn(len(BenchmarkProxy_PLT.addedDevices))]
- firmProxy := BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/", false)
- if target = firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false,
- ""); !reflect.ValueOf(target).IsValid() {
+ firmProxy, err := BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/", false)
+ if err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create firmware proxy")
+ }
+ target, err = firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false,
+ "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to create target due to error %v", err)
+ assert.NotNil(b, err)
+ }
+ if !reflect.ValueOf(target).IsValid() {
BenchmarkProxy_Logger.Errorf("Failed to find device: %s %+v", randomID, target)
continue
}
@@ -200,8 +216,11 @@
after := target.(*voltha.Device).FirmwareVersion
var updated interface{}
- if updated = firmProxy.Update(context.Background(), "/devices/"+randomID, target.(*voltha.Device), false,
- ""); updated == nil {
+ if updated, err = firmProxy.Update(context.Background(), "/devices/"+randomID, target.(*voltha.Device), false, ""); err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to update firmware proxy due to error %v", err)
+ assert.NotNil(b, err)
+ }
+ if updated == nil {
BenchmarkProxy_Logger.Errorf("Failed to update device: %+v", target)
continue
} else {
@@ -209,8 +228,12 @@
}
- if d := firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false,
- ""); !reflect.ValueOf(d).IsValid() {
+ d, err := firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get device info from firmware proxy due to error %v", err)
+ assert.NotNil(b, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
BenchmarkProxy_Logger.Errorf("Failed to get device: %s", randomID)
continue
} else if d.(*voltha.Device).FirmwareVersion == after {
@@ -264,8 +287,15 @@
if len(BenchmarkProxy_PLT.addedDevices) > 0 {
randomID := BenchmarkProxy_PLT.addedDevices[rand.Intn(len(BenchmarkProxy_PLT.addedDevices))]
- flowsProxy := BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/devices/"+randomID+"/flows", false)
- flows := flowsProxy.Get(context.Background(), "/", 0, false, "")
+ flowsProxy, err := BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/devices/"+randomID+"/flows", false)
+ if err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create flows proxy")
+ }
+ flows, err := flowsProxy.Get(context.Background(), "/", 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get flows from flows proxy due to error: %v", err)
+ assert.NotNil(b, err)
+ }
before := flows.(*openflow_13.Flows).Items[0].TableId
flows.(*openflow_13.Flows).Items[0].TableId = uint32(rand.Intn(3000))
@@ -281,7 +311,11 @@
)
var updated interface{}
- if updated = flowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ if updated, err = flowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); err != nil {
+ BenchmarkProxy_Logger.Errorf("Cannot update flows proxy due to error: %v", err)
+ assert.NotNil(b, err)
+ }
+ if updated == nil {
b.Errorf("Failed to update flows for device: %+v", flows)
} else {
BenchmarkProxy_Logger.Infof("Flows were updated : %+v", updated)
@@ -303,8 +337,12 @@
for i := 0; i < len(BenchmarkProxy_PLT.addedDevices); i++ {
devToGet := BenchmarkProxy_PLT.addedDevices[i]
// Verify that the added device can now be retrieved
- if d := BenchmarkProxy_DeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false,
- ""); !reflect.ValueOf(d).IsValid() {
+ d, err := BenchmarkProxy_DeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get device info from device proxy due to error: %v", err)
+ assert.NotNil(b, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
BenchmarkProxy_Logger.Errorf("Failed to get device: %s", devToGet)
continue
} else {
@@ -317,8 +355,12 @@
for i := 0; i < len(BenchmarkProxy_PLT.updatedFirmwares); i++ {
devToGet := BenchmarkProxy_PLT.updatedFirmwares[i].ID
// Verify that the updated device can be retrieved and that the updates were actually applied
- if d := BenchmarkProxy_DeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false,
- ""); !reflect.ValueOf(d).IsValid() {
+ d, err := BenchmarkProxy_DeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get device info from device proxy due to error: %v", err)
+ assert.NotNil(b, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
BenchmarkProxy_Logger.Errorf("Failed to get device: %s", devToGet)
continue
} else if d.(*voltha.Device).FirmwareVersion == BenchmarkProxy_PLT.updatedFirmwares[i].After.(string) {
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index 785c65b..0ed8af9 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -21,9 +21,11 @@
"encoding/json"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
"github.com/opencord/voltha-protos/v2/go/common"
"github.com/opencord/voltha-protos/v2/go/openflow_13"
"github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/stretchr/testify/assert"
"math/rand"
"reflect"
"strconv"
@@ -53,10 +55,17 @@
func init() {
//log.AddPackage(log.JSON, log.InfoLevel, log.Fields{"instanceId": "DB_MODEL"})
//log.UpdateAllLoggers(log.Fields{"instanceId": "PROXY_LOAD_TEST"})
+ var err error
TestProxy_Root = NewRoot(&voltha.Voltha{}, nil)
- TestProxy_Root_LogicalDevice = TestProxy_Root.CreateProxy(context.Background(), "/", false)
- TestProxy_Root_Device = TestProxy_Root.CreateProxy(context.Background(), "/", false)
- TestProxy_Root_Adapter = TestProxy_Root.CreateProxy(context.Background(), "/", false)
+ if TestProxy_Root_LogicalDevice, err = TestProxy_Root.CreateProxy(context.Background(), "/", false); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create logical device proxy")
+ }
+ if TestProxy_Root_Device, err = TestProxy_Root.CreateProxy(context.Background(), "/", false); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create device proxy")
+ }
+ if TestProxy_Root_Adapter, err = TestProxy_Root.CreateProxy(context.Background(), "/", false); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create adapter proxy")
+ }
TestProxy_LogicalPorts = []*voltha.LogicalPort{
{
@@ -116,7 +125,10 @@
postAddExecuted := make(chan struct{})
preAddExecutedPtr, postAddExecutedPtr := preAddExecuted, postAddExecuted
- devicesProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/devices", false)
+ devicesProxy, err := TestProxy_Root.node.CreateProxy(context.Background(), "/devices", false)
+ if err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create devices proxy")
+ }
devicesProxy.RegisterCallback(PRE_ADD, commonCallback2, "PRE_ADD Device container changes")
devicesProxy.RegisterCallback(POST_ADD, commonCallback2, "POST_ADD Device container changes")
@@ -124,7 +136,12 @@
TestProxy_Root_Device.RegisterCallback(PRE_ADD, commonChanCallback, "PRE_ADD instructions", &preAddExecutedPtr)
TestProxy_Root_Device.RegisterCallback(POST_ADD, commonChanCallback, "POST_ADD instructions", &postAddExecutedPtr)
- if added := TestProxy_Root_Device.Add(context.Background(), "/devices", TestProxy_Device, ""); added == nil {
+ added, err := TestProxy_Root_Device.Add(context.Background(), "/devices", TestProxy_Device, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to add test proxy device due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if added == nil {
t.Error("Failed to add device")
} else {
t.Logf("Added device : %+v", added)
@@ -138,7 +155,12 @@
}
// Verify that the added device can now be retrieved
- if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ d, err := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed get device info from test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find added device")
} else {
djson, _ := json.Marshal(d)
@@ -149,7 +171,11 @@
func TestProxy_1_1_2_Add_ExistingDevice(t *testing.T) {
TestProxy_Device.Id = TestProxy_DeviceId
- added := TestProxy_Root_Device.Add(context.Background(), "/devices", TestProxy_Device, "")
+ added, err := TestProxy_Root_Device.Add(context.Background(), "/devices", TestProxy_Device, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to add device to test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
if added.(proto.Message).String() != reflect.ValueOf(TestProxy_Device).Interface().(proto.Message).String() {
t.Errorf("Devices don't match - existing: %+v returned: %+v", TestProxy_LogicalDevice, added)
}
@@ -181,7 +207,12 @@
TestProxy_Root_Adapter.RegisterCallback(POST_ADD, commonChanCallback, "POST_ADD instructions for adapters", &postAddExecutedPtr)
// Add the adapter
- if added := TestProxy_Root_Adapter.Add(context.Background(), "/adapters", TestProxy_Adapter, ""); added == nil {
+ added, err := TestProxy_Root_Adapter.Add(context.Background(), "/adapters", TestProxy_Adapter, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to add adapter to test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if added == nil {
t.Error("Failed to add adapter")
} else {
t.Logf("Added adapter : %+v", added)
@@ -190,7 +221,12 @@
verifyGotResponse(postAddExecuted)
// Verify that the added device can now be retrieved
- if d := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ d, err := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to retrieve device info from test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find added adapter")
} else {
djson, _ := json.Marshal(d)
@@ -206,8 +242,11 @@
}
func TestProxy_1_2_1_Get_AllDevices(t *testing.T) {
- devices := TestProxy_Root_Device.Get(context.Background(), "/devices", 1, false, "")
-
+ devices, err := TestProxy_Root_Device.Get(context.Background(), "/devices", 1, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get all devices info from test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
if len(devices.([]interface{})) == 0 {
t.Error("there are no available devices to retrieve")
} else {
@@ -218,7 +257,12 @@
}
func TestProxy_1_2_2_Get_SingleDevice(t *testing.T) {
- if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ d, err := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get single device info from test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
t.Errorf("Failed to find device : %s", TestProxy_TargetDeviceId)
} else {
djson, _ := json.Marshal(d)
@@ -233,7 +277,12 @@
postUpdateExecuted := make(chan struct{})
preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
- if retrieved := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 1, false, ""); retrieved == nil {
+ retrieved, err := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 1, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get device info from test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if retrieved == nil {
t.Error("Failed to get device")
} else {
t.Logf("Found raw device (root proxy): %+v", retrieved)
@@ -258,7 +307,12 @@
"POST_UPDATE instructions (root proxy)", &postUpdateExecutedPtr,
)
- if afterUpdate := TestProxy_Root_Device.Update(context.Background(), "/devices/"+TestProxy_TargetDeviceId, retrieved, false, ""); afterUpdate == nil {
+ afterUpdate, err := TestProxy_Root_Device.Update(context.Background(), "/devices/"+TestProxy_TargetDeviceId, retrieved, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to update device info test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if afterUpdate == nil {
t.Error("Failed to update device")
} else {
t.Logf("Updated device : %+v", afterUpdate)
@@ -271,7 +325,12 @@
t.Error("POST_UPDATE callback was not executed")
}
- if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ d, err := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 1, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get device info from test proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated device (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -282,8 +341,15 @@
func TestProxy_1_3_2_Update_DeviceFlows(t *testing.T) {
// Get a device proxy and update a specific port
- devFlowsProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/devices/"+TestProxy_DeviceId+"/flows", false)
- flows := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ devFlowsProxy, err := TestProxy_Root.node.CreateProxy(context.Background(), "/devices/"+TestProxy_DeviceId+"/flows", false)
+ if err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create device flows proxy")
+ }
+ flows, err := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get flows from device flows proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
flows.(*openflow_13.Flows).Items[0].TableId = 2244
preUpdateExecuted := make(chan struct{})
@@ -301,13 +367,22 @@
"POST_UPDATE instructions (flows proxy)", &postUpdateExecutedPtr,
)
- kvFlows := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ kvFlows, err := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get flows from device flows proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
if reflect.DeepEqual(flows, kvFlows) {
t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
}
- if updated := devFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ updated, err := devFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to update flows in device flows proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if updated == nil {
t.Error("Failed to update flow")
} else {
t.Logf("Updated flows : %+v", updated)
@@ -320,14 +395,24 @@
t.Error("POST_UPDATE callback was not executed")
}
- if d := devFlowsProxy.Get(context.Background(), "/", 0, false, ""); d == nil {
+ d, err := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get flows in device flows proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if d == nil {
t.Error("Failed to find updated flows (flows proxy)")
} else {
djson, _ := json.Marshal(d)
t.Logf("Found flows (flows proxy): %s", string(djson))
}
- if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId+"/flows", 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ d, err = TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId+"/flows", 1, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get flows from device flows proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated flows (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -340,9 +425,16 @@
postUpdateExecuted := make(chan struct{})
preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
- adaptersProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/adapters", false)
-
- if retrieved := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 1, false, ""); retrieved == nil {
+ adaptersProxy, err := TestProxy_Root.node.CreateProxy(context.Background(), "/adapters", false)
+ if err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create adapters proxy")
+ }
+ retrieved, err := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 1, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to retrieve adapter info from adapters proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if retrieved == nil {
t.Error("Failed to get adapter")
} else {
t.Logf("Found raw adapter (root proxy): %+v", retrieved)
@@ -360,7 +452,12 @@
"POST_UPDATE instructions for adapters", &postUpdateExecutedPtr,
)
- if afterUpdate := adaptersProxy.Update(context.Background(), "/"+TestProxy_AdapterId, retrieved, false, ""); afterUpdate == nil {
+ afterUpdate, err := adaptersProxy.Update(context.Background(), "/"+TestProxy_AdapterId, retrieved, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to update adapter info in adapters proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if afterUpdate == nil {
t.Error("Failed to update adapter")
} else {
t.Logf("Updated adapter : %+v", afterUpdate)
@@ -373,7 +470,12 @@
t.Error("POST_UPDATE callback for adapter was not executed")
}
- if d := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ d, err := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 1, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get updated adapter info from adapters proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated adapter (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -398,7 +500,12 @@
"POST_REMOVE instructions (root proxy)", &postRemoveExecutedPtr,
)
- if removed := TestProxy_Root_Device.Remove(context.Background(), "/devices/"+TestProxy_DeviceId, ""); removed == nil {
+ removed, err := TestProxy_Root_Device.Remove(context.Background(), "/devices/"+TestProxy_DeviceId, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to remove device from devices proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if removed == nil {
t.Error("Failed to remove device")
} else {
t.Logf("Removed device : %+v", removed)
@@ -411,7 +518,12 @@
t.Error("POST_REMOVE callback was not executed")
}
- if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ d, err := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get device info from devices proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if reflect.ValueOf(d).IsValid() {
djson, _ := json.Marshal(d)
t.Errorf("Device was not removed - %s", djson)
} else {
@@ -433,7 +545,12 @@
TestProxy_Root_LogicalDevice.RegisterCallback(PRE_ADD, commonChanCallback, "PRE_ADD instructions", &preAddExecutedPtr)
TestProxy_Root_LogicalDevice.RegisterCallback(POST_ADD, commonChanCallback, "POST_ADD instructions", &postAddExecutedPtr)
- if added := TestProxy_Root_LogicalDevice.Add(context.Background(), "/logical_devices", TestProxy_LogicalDevice, ""); added == nil {
+ added, err := TestProxy_Root_LogicalDevice.Add(context.Background(), "/logical_devices", TestProxy_LogicalDevice, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to add new logical device into proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if added == nil {
t.Error("Failed to add logical device")
} else {
t.Logf("Added logical device : %+v", added)
@@ -441,7 +558,12 @@
verifyGotResponse(postAddExecuted)
- if ld := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
+ ld, err := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get logical device info from logical device proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(ld).IsValid() {
t.Error("Failed to find added logical device")
} else {
ldJSON, _ := json.Marshal(ld)
@@ -459,15 +581,22 @@
func TestProxy_2_1_2_Add_ExistingLogicalDevice(t *testing.T) {
TestProxy_LogicalDevice.Id = TestProxy_LogicalDeviceId
- added := TestProxy_Root_LogicalDevice.Add(context.Background(), "/logical_devices", TestProxy_LogicalDevice, "")
+ added, err := TestProxy_Root_LogicalDevice.Add(context.Background(), "/logical_devices", TestProxy_LogicalDevice, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to add logical device due to error: %v", err)
+ assert.NotNil(t, err)
+ }
if added.(proto.Message).String() != reflect.ValueOf(TestProxy_LogicalDevice).Interface().(proto.Message).String() {
t.Errorf("Logical devices don't match - existing: %+v returned: %+v", TestProxy_LogicalDevice, added)
}
}
func TestProxy_2_2_1_Get_AllLogicalDevices(t *testing.T) {
- logicalDevices := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices", 1, false, "")
-
+ logicalDevices, err := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices", 1, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get all logical devices from proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
if len(logicalDevices.([]interface{})) == 0 {
t.Error("there are no available logical devices to retrieve")
} else {
@@ -478,7 +607,12 @@
}
func TestProxy_2_2_2_Get_SingleLogicalDevice(t *testing.T) {
- if ld := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
+ ld, err := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get single logical device from proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(ld).IsValid() {
t.Errorf("Failed to find logical device : %s", TestProxy_TargetLogicalDeviceId)
} else {
ldJSON, _ := json.Marshal(ld)
@@ -493,7 +627,12 @@
postUpdateExecuted := make(chan struct{})
preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
- if retrieved := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, ""); retrieved == nil {
+ retrieved, err := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get logical devices due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if retrieved == nil {
t.Error("Failed to get logical device")
} else {
t.Logf("Found raw logical device (root proxy): %+v", retrieved)
@@ -518,8 +657,13 @@
retrieved.(*voltha.LogicalDevice).RootDeviceId = strconv.Itoa(fwVersion)
- if afterUpdate := TestProxy_Root_LogicalDevice.Update(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, retrieved, false,
- ""); afterUpdate == nil {
+ afterUpdate, err := TestProxy_Root_LogicalDevice.Update(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, retrieved, false,
+ "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Faield to update logical device info due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if afterUpdate == nil {
t.Error("Failed to update logical device")
} else {
t.Logf("Updated logical device : %+v", afterUpdate)
@@ -532,7 +676,12 @@
t.Error("POST_UPDATE callback was not executed")
}
- if d := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ d, err := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get logical device info due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated logical device (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -544,8 +693,15 @@
func TestProxy_2_3_2_Update_LogicalDeviceFlows(t *testing.T) {
// Get a device proxy and update a specific port
- ldFlowsProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", false)
- flows := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ ldFlowsProxy, err := TestProxy_Root.node.CreateProxy(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", false)
+ if err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Failed to create logical device flows proxy")
+ }
+ flows, err := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get flows from logical device flows proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
flows.(*openflow_13.Flows).Items[0].TableId = rand.Uint32()
t.Logf("before updated flows: %+v", flows)
@@ -558,27 +714,40 @@
commonCallback2,
)
- kvFlows := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
-
+ kvFlows, err := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Faield to get flows from logical device flows proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
if reflect.DeepEqual(flows, kvFlows) {
t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
}
- if updated := ldFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ updated, err := ldFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to update flows in logical device flows proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if updated == nil {
t.Error("Failed to update logical device flows")
} else {
t.Logf("Updated logical device flows : %+v", updated)
}
- if d := ldFlowsProxy.Get(context.Background(), "/", 0, false, ""); d == nil {
+ if d, _ := ldFlowsProxy.Get(context.Background(), "/", 0, false, ""); d == nil {
t.Error("Failed to find updated logical device flows (flows proxy)")
} else {
djson, _ := json.Marshal(d)
t.Logf("Found flows (flows proxy): %s", string(djson))
}
- if d := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", 0, false,
- ""); !reflect.ValueOf(d).IsValid() {
+ d, err := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", 0, false,
+ "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get flows from logical device flows proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated logical device flows (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -602,7 +771,12 @@
"POST_REMOVE instructions (root proxy)", &postRemoveExecutedPtr,
)
- if removed := TestProxy_Root_LogicalDevice.Remove(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, ""); removed == nil {
+ removed, err := TestProxy_Root_LogicalDevice.Remove(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to remove device from logical devices proxy due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if removed == nil {
t.Error("Failed to remove logical device")
} else {
t.Logf("Removed device : %+v", removed)
@@ -615,7 +789,12 @@
t.Error("POST_REMOVE callback was not executed")
}
- if d := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ d, err := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, "")
+ if err != nil {
+ BenchmarkProxy_Logger.Errorf("Failed to get logical device info due to error: %v", err)
+ assert.NotNil(t, err)
+ }
+ if reflect.ValueOf(d).IsValid() {
djson, _ := json.Marshal(d)
t.Errorf("Device was not removed - %s", djson)
} else {
diff --git a/db/model/revision.go b/db/model/revision.go
index 29fc5e9..9addad4 100644
--- a/db/model/revision.go
+++ b/db/model/revision.go
@@ -47,7 +47,7 @@
GetNode() *node
SetLastUpdate(ts ...time.Time)
GetLastUpdate() time.Time
- LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision
+ LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error)
UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision
UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision
UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision
diff --git a/db/model/transaction.go b/db/model/transaction.go
index d7a34e7..e10236c 100644
--- a/db/model/transaction.go
+++ b/db/model/transaction.go
@@ -17,6 +17,7 @@
import (
"context"
+ "fmt"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
)
@@ -32,32 +33,32 @@
}
return tx
}
-func (t *Transaction) Get(ctx context.Context, path string, depth int, deep bool) interface{} {
+func (t *Transaction) Get(ctx context.Context, path string, depth int, deep bool) (interface{}, error) {
if t.txid == "" {
log.Errorf("closed transaction")
- return nil
+ return nil, fmt.Errorf("closed transaction")
}
// TODO: need to review the return values at the different layers!!!!!
return t.proxy.Get(ctx, path, depth, deep, t.txid)
}
-func (t *Transaction) Update(ctx context.Context, path string, data interface{}, strict bool) interface{} {
+func (t *Transaction) Update(ctx context.Context, path string, data interface{}, strict bool) (interface{}, error) {
if t.txid == "" {
log.Errorf("closed transaction")
- return nil
+ return nil, fmt.Errorf("closed transaction")
}
return t.proxy.Update(ctx, path, data, strict, t.txid)
}
-func (t *Transaction) Add(ctx context.Context, path string, data interface{}) interface{} {
+func (t *Transaction) Add(ctx context.Context, path string, data interface{}) (interface{}, error) {
if t.txid == "" {
log.Errorf("closed transaction")
- return nil
+ return nil, fmt.Errorf("closed transaction")
}
return t.proxy.Add(ctx, path, data, t.txid)
}
-func (t *Transaction) Remove(ctx context.Context, path string) interface{} {
+func (t *Transaction) Remove(ctx context.Context, path string) (interface{}, error) {
if t.txid == "" {
log.Errorf("closed transaction")
- return nil
+ return nil, fmt.Errorf("closed transaction")
}
return t.proxy.Remove(ctx, path, t.txid)
}
diff --git a/db/model/transaction_test.go b/db/model/transaction_test.go
index c33e5be..a52688a 100644
--- a/db/model/transaction_test.go
+++ b/db/model/transaction_test.go
@@ -20,8 +20,10 @@
"context"
"encoding/hex"
"github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
"github.com/opencord/voltha-protos/v2/go/common"
"github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/stretchr/testify/assert"
"strconv"
"testing"
)
@@ -34,26 +36,13 @@
)
func init() {
+ var err error
TestTransaction_Root = NewRoot(&voltha.Voltha{}, nil)
- TestTransaction_RootProxy = TestTransaction_Root.node.CreateProxy(context.Background(), "/", false)
+ if TestTransaction_RootProxy, err = TestTransaction_Root.node.CreateProxy(context.Background(), "/", false); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot create proxy")
+ }
}
-//func TestTransaction_1_GetDevices(t *testing.T) {
-// getTx := TestTransaction_RootProxy.OpenTransaction()
-//
-// devices := getTx.Get("/devices", 1, false)
-//
-// if len(devices.([]interface{})) == 0 {
-// t.Error("there are no available devices to retrieve")
-// } else {
-// // Save the target device id for later tests
-// TestTransaction_TargetDeviceId = devices.([]interface{})[0].(*voltha.Device).Id
-// t.Logf("retrieved devices: %+v", devices)
-// }
-//
-// getTx.Commit()
-//}
-
func TestTransaction_2_AddDevice(t *testing.T) {
devIDBin, _ := uuid.New().MarshalBinary()
TestTransaction_DeviceId = "0001" + hex.EncodeToString(devIDBin)[:12]
@@ -80,7 +69,12 @@
addTx := TestTransaction_RootProxy.OpenTransaction()
- if added := addTx.Add(context.Background(), "/devices", device); added == nil {
+ added, err := addTx.Add(context.Background(), "/devices", device)
+ if err != nil {
+ log.Errorf("Failed to add device due to error %v", err)
+ assert.NotNil(t, err)
+ }
+ if added == nil {
t.Error("Failed to add device")
} else {
TestTransaction_TargetDeviceId = added.(*voltha.Device).Id
@@ -94,12 +88,20 @@
basePath := "/devices/" + TestTransaction_DeviceId
getDevWithPortsTx := TestTransaction_RootProxy.OpenTransaction()
- device1 := getDevWithPortsTx.Get(context.Background(), basePath+"/ports", 1, false)
+ device1, err := getDevWithPortsTx.Get(context.Background(), basePath+"/ports", 1, false)
+ if err != nil {
+ log.Errorf("Failed to get device with ports due to error %v", err)
+ assert.NotNil(t, err)
+ }
t.Logf("retrieved device with ports: %+v", device1)
getDevWithPortsTx.Commit()
getDevTx := TestTransaction_RootProxy.OpenTransaction()
- device2 := getDevTx.Get(context.Background(), basePath, 0, false)
+ device2, err := getDevTx.Get(context.Background(), basePath, 0, false)
+ if err != nil {
+ log.Errorf("Failed to open transaction due to error %v", err)
+ assert.NotNil(t, err)
+ }
t.Logf("retrieved device: %+v", device2)
getDevTx.Commit()
@@ -107,7 +109,10 @@
func TestTransaction_4_UpdateDevice(t *testing.T) {
updateTx := TestTransaction_RootProxy.OpenTransaction()
- if retrieved := updateTx.Get(context.Background(), "/devices/"+TestTransaction_TargetDeviceId, 1, false); retrieved == nil {
+ if retrieved, err := updateTx.Get(context.Background(), "/devices/"+TestTransaction_TargetDeviceId, 1, false); err != nil {
+ log.Errorf("Failed to retrieve device info due to error %v", err)
+ assert.NotNil(t, err)
+ } else if retrieved == nil {
t.Error("Failed to get device")
} else {
var fwVersion int
@@ -123,7 +128,12 @@
t.Logf("Before update : %+v", retrieved)
// FIXME: The makeBranch passed in function is nil or not being executed properly!!!!!
- if afterUpdate := updateTx.Update(context.Background(), "/devices/"+TestTransaction_TargetDeviceId, retrieved, false); afterUpdate == nil {
+ afterUpdate, err := updateTx.Update(context.Background(), "/devices/"+TestTransaction_TargetDeviceId, retrieved, false)
+ if err != nil {
+ log.Errorf("Failed to update device info due to error %v", err)
+ assert.NotNil(t, err)
+ }
+ if afterUpdate == nil {
t.Error("Failed to update device")
} else {
t.Logf("Updated device : %+v", afterUpdate)
@@ -137,12 +147,20 @@
basePath := "/devices/" + TestTransaction_DeviceId
getDevWithPortsTx := TestTransaction_RootProxy.OpenTransaction()
- device1 := getDevWithPortsTx.Get(context.Background(), basePath+"/ports", 1, false)
+ device1, err := getDevWithPortsTx.Get(context.Background(), basePath+"/ports", 1, false)
+ if err != nil {
+ log.Errorf("Failed to device with ports info due to error %v", err)
+ assert.NotNil(t, err)
+ }
t.Logf("retrieved device with ports: %+v", device1)
getDevWithPortsTx.Commit()
getDevTx := TestTransaction_RootProxy.OpenTransaction()
- device2 := getDevTx.Get(context.Background(), basePath, 0, false)
+ device2, err := getDevTx.Get(context.Background(), basePath, 0, false)
+ if err != nil {
+ log.Errorf("Failed to get device info due to error %v", err)
+ assert.NotNil(t, err)
+ }
t.Logf("retrieved device: %+v", device2)
getDevTx.Commit()
@@ -150,7 +168,12 @@
func TestTransaction_6_RemoveDevice(t *testing.T) {
removeTx := TestTransaction_RootProxy.OpenTransaction()
- if removed := removeTx.Remove(context.Background(), "/devices/"+TestTransaction_DeviceId); removed == nil {
+ removed, err := removeTx.Remove(context.Background(), "/devices/"+TestTransaction_DeviceId)
+ if err != nil {
+ log.Errorf("Failed to remove device due to error %v", err)
+ assert.NotNil(t, err)
+ }
+ if removed == nil {
t.Error("Failed to remove device")
} else {
t.Logf("Removed device : %+v", removed)
@@ -163,7 +186,11 @@
basePath := "/devices/" + TestTransaction_DeviceId
getDevTx := TestTransaction_RootProxy.OpenTransaction()
- device := TestTransaction_RootProxy.Get(context.Background(), basePath, 0, false, "")
+ device, err := TestTransaction_RootProxy.Get(context.Background(), basePath, 0, false, "")
+ if err != nil {
+ log.Errorf("Failed to get device info post remove due to error %v", err)
+ assert.NotNil(t, err)
+ }
t.Logf("retrieved device: %+v", device)
getDevTx.Commit()