VOL-2180 context changes in voltha-go
Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.
Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/db/model/base_test.go b/db/model/base_test.go
index e9e4cac..91fa89f 100644
--- a/db/model/base_test.go
+++ b/db/model/base_test.go
@@ -16,6 +16,7 @@
package model
import (
+ "context"
"runtime/debug"
"sync"
@@ -25,7 +26,7 @@
var callbackMutex sync.Mutex
-func commonChanCallback(args ...interface{}) interface{} {
+func commonChanCallback(ctx context.Context, args ...interface{}) interface{} {
log.Infof("Running common callback - arg count: %d", len(args))
//for i := 0; i < len(args); i++ {
@@ -47,13 +48,13 @@
return nil
}
-func commonCallback2(args ...interface{}) interface{} {
+func commonCallback2(ctx context.Context, args ...interface{}) interface{} {
log.Infof("Running common2 callback - arg count: %d %+v", len(args), args)
return nil
}
-func commonCallbackFunc(args ...interface{}) interface{} {
+func commonCallbackFunc(ctx context.Context, args ...interface{}) interface{} {
log.Infof("Running common callback - arg count: %d", len(args))
for i := 0; i < len(args); i++ {
@@ -67,14 +68,14 @@
return nil
}
-func firstCallback(args ...interface{}) interface{} {
+func firstCallback(ctx context.Context, args ...interface{}) interface{} {
name := args[0]
id := args[1]
log.Infof("Running first callback - name: %s, id: %s\n", name, id)
return nil
}
-func secondCallback(args ...interface{}) interface{} {
+func secondCallback(ctx context.Context, args ...interface{}) interface{} {
name := args[0].(map[string]string)
id := args[1]
log.Infof("Running second callback - name: %s, id: %f\n", name["name"], id)
@@ -83,7 +84,7 @@
return nil
}
-func thirdCallback(args ...interface{}) interface{} {
+func thirdCallback(ctx context.Context, args ...interface{}) interface{} {
name := args[0]
id := args[1].(*voltha.Device)
log.Infof("Running third callback - name: %+v, id: %s\n", name, id.Id)
diff --git a/db/model/merge.go b/db/model/merge.go
index 1c9d0a5..5d46545 100644
--- a/db/model/merge.go
+++ b/db/model/merge.go
@@ -17,6 +17,8 @@
package model
import (
+ "context"
+
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
@@ -86,6 +88,7 @@
// Merge3Way takes care of combining the revision contents of the same data set
func Merge3Way(
+ ctx context.Context,
forkRev, srcRev, dstRev Revision,
mergeChildFunc func(Revision) Revision,
dryRun bool) (rev Revision, changes []ChangeTuple) {
@@ -261,7 +264,7 @@
// FIXME: Do not discard the latest value for now
//dstRev.GetBranch().GetLatest().Drop("", configChanged)
- rev = rev.UpdateAllChildren(newChildren, dstRev.GetBranch())
+ rev = rev.UpdateAllChildren(ctx, newChildren, dstRev.GetBranch())
if configChanged {
changes = append(changes, ChangeTuple{PostUpdate, dstRev.GetBranch().GetLatest().GetData(), rev.GetData()})
diff --git a/db/model/node.go b/db/model/node.go
index 3bead57..bec07a5 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -38,7 +38,7 @@
// Node interface is an abstraction of the node data structure
type Node interface {
- MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
+ MakeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
// CRUD functions
Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
@@ -52,11 +52,11 @@
MakeBranch(txid string) *Branch
DeleteBranch(txid string)
- MergeBranch(txid string, dryRun bool) (Revision, error)
+ MergeBranch(ctx context.Context, txid string, dryRun bool) (Revision, error)
MakeTxBranch() string
DeleteTxBranch(txid string)
- FoldTxBranch(txid string)
+ FoldTxBranch(ctx context.Context, txid string)
}
type node struct {
@@ -378,7 +378,7 @@
//getPath traverses the specified path and retrieves the data associated to it
func (n *node) getPath(ctx context.Context, rev Revision, path string, depth int) interface{} {
if path == "" {
- return n.getData(rev, depth)
+ return n.getData(ctx, rev, depth)
}
partition := strings.SplitN(path, "/", 2)
@@ -413,7 +413,7 @@
var response []interface{}
for _, childRev := range children {
childNode := childRev.getNode()
- value := childNode.getData(childRev, depth)
+ value := childNode.getData(ctx, childRev, depth)
response = append(response, value)
}
return response
@@ -425,7 +425,7 @@
}
for _, childRev := range children {
childNode := childRev.getNode()
- value := childNode.getData(childRev, depth)
+ value := childNode.getData(ctx, childRev, depth)
response = append(response, value)
}
return response
@@ -439,13 +439,13 @@
}
// getData retrieves the data from a node revision
-func (n *node) getData(rev Revision, depth int) interface{} {
+func (n *node) getData(ctx context.Context, rev Revision, depth int) interface{} {
msg := rev.GetBranch().GetLatest().Get(depth)
var modifiedMsg interface{}
if n.GetProxy() != nil {
log.Debugw("invoking-get-callbacks", log.Fields{"data": msg})
- if modifiedMsg = n.GetProxy().InvokeCallbacks(Get, false, msg); modifiedMsg != nil {
+ if modifiedMsg = n.GetProxy().InvokeCallbacks(ctx, Get, false, msg); modifiedMsg != nil {
msg = modifiedMsg
}
@@ -605,7 +605,7 @@
if n.GetProxy() != nil {
log.Debug("invoking proxy PreUpdate Callbacks")
- n.GetProxy().InvokeCallbacks(PreUpdate, false, branch.GetLatest(), data)
+ n.GetProxy().InvokeCallbacks(ctx, PreUpdate, false, branch.GetLatest(), data)
}
if branch.GetLatest().GetData().(proto.Message).String() != data.(proto.Message).String() {
@@ -666,7 +666,7 @@
if field.Key != "" {
if n.GetProxy() != nil {
log.Debug("invoking proxy PreAdd Callbacks")
- n.GetProxy().InvokeCallbacks(PreAdd, false, data)
+ n.GetProxy().InvokeCallbacks(ctx, PreAdd, false, data)
}
children = make([]Revision, len(rev.GetChildren(name)))
@@ -691,7 +691,7 @@
updatedRev := rev.UpdateChildren(ctx, name, children, branch)
changes := []ChangeTuple{{PostAdd, nil, childRev.GetData()}}
- childRev.SetupWatch(childRev.GetName())
+ childRev.SetupWatch(ctx, childRev.GetName())
n.makeLatest(branch, updatedRev, changes)
@@ -828,13 +828,13 @@
if childRev != nil && idx >= 0 {
if n.GetProxy() != nil {
data := childRev.GetData()
- n.GetProxy().InvokeCallbacks(PreRemove, false, data)
+ n.GetProxy().InvokeCallbacks(ctx, PreRemove, false, data)
postAnnouncement = append(postAnnouncement, ChangeTuple{PostRemove, data, nil})
} else {
postAnnouncement = append(postAnnouncement, ChangeTuple{PostRemove, childRev.GetData(), nil})
}
- childRev.StorageDrop(txid, true)
+ childRev.StorageDrop(ctx, txid, true)
getRevCache().Delete(childRev.GetName())
branch.LatestLock.Lock()
@@ -877,12 +877,12 @@
delete(n.Branches, txid)
}
-func (n *node) mergeChild(txid string, dryRun bool) func(Revision) Revision {
+func (n *node) mergeChild(ctx context.Context, txid string, dryRun bool) func(Revision) Revision {
f := func(rev Revision) Revision {
childBranch := rev.GetBranch()
if childBranch.Txid == txid {
- rev, _ = childBranch.Node.MergeBranch(txid, dryRun)
+ rev, _ = childBranch.Node.MergeBranch(ctx, txid, dryRun)
}
return rev
@@ -891,7 +891,7 @@
}
// MergeBranch will integrate the contents of a transaction branch within the latest branch of a given node
-func (n *node) MergeBranch(txid string, dryRun bool) (Revision, error) {
+func (n *node) MergeBranch(ctx context.Context, txid string, dryRun bool) (Revision, error) {
srcBranch := n.GetBranch(txid)
dstBranch := n.GetBranch(NONE)
@@ -899,7 +899,7 @@
srcRev := srcBranch.GetLatest()
dstRev := dstBranch.GetLatest()
- rev, changes := Merge3Way(forkRev, srcRev, dstRev, n.mergeChild(txid, dryRun), dryRun)
+ rev, changes := Merge3Way(ctx, forkRev, srcRev, dstRev, n.mergeChild(ctx, txid, dryRun), dryRun)
if !dryRun {
if rev != nil {
diff --git a/db/model/non_persisted_revision.go b/db/model/non_persisted_revision.go
index bde80b1..347be0d 100644
--- a/db/model/non_persisted_revision.go
+++ b/db/model/non_persisted_revision.go
@@ -204,7 +204,7 @@
}
// Finalize -
-func (npr *NonPersistedRevision) Finalize(skipOnExist bool) {
+func (npr *NonPersistedRevision) Finalize(ctx context.Context, skipOnExist bool) {
npr.Hash = npr.hashContent()
}
@@ -327,7 +327,7 @@
newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
}
- newRev.Finalize(false)
+ newRev.Finalize(ctx, false)
log.Debugw("update-data-complete", log.Fields{"updated": newRev.Config.Data, "provided": data})
@@ -433,13 +433,13 @@
updatedRev.SetChildren(name, children)
}
- updatedRev.Finalize(false)
+ updatedRev.Finalize(ctx, false)
return updatedRev
}
// UpdateAllChildren will replace the current list of children with the provided ones
-func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
+func (npr *NonPersistedRevision) UpdateAllChildren(ctx context.Context, children map[string][]Revision, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
@@ -454,7 +454,7 @@
for entryName, childrenEntry := range children {
newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
}
- newRev.Finalize(false)
+ newRev.Finalize(ctx, false)
return newRev
}
@@ -524,12 +524,12 @@
}
// SetupWatch -
-func (npr *NonPersistedRevision) SetupWatch(key string) {
+func (npr *NonPersistedRevision) SetupWatch(ctx context.Context, key string) {
// stub ... required by interface
}
// StorageDrop -
-func (npr *NonPersistedRevision) StorageDrop(txid string, includeConfig bool) {
+func (npr *NonPersistedRevision) StorageDrop(ctx context.Context, txid string, includeConfig bool) {
// stub ... required by interface
}
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index f6309ce..bbb4a1d 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -81,11 +81,11 @@
}
// Finalize is responsible of saving the revision in the persistent storage
-func (pr *PersistedRevision) Finalize(skipOnExist bool) {
- pr.store(skipOnExist)
+func (pr *PersistedRevision) Finalize(ctx context.Context, skipOnExist bool) {
+ pr.store(ctx, skipOnExist)
}
-func (pr *PersistedRevision) store(skipOnExist bool) {
+func (pr *PersistedRevision) store(ctx context.Context, skipOnExist bool) {
if pr.GetBranch().Txid != "" {
return
}
@@ -110,7 +110,7 @@
}
getRevCache().Set(pr.GetName(), pr)
- if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
+ if err := pr.kvStore.Put(ctx, pr.GetName(), blob); err != nil {
log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
} else {
log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
@@ -120,7 +120,7 @@
}
// SetupWatch -
-func (pr *PersistedRevision) SetupWatch(key string) {
+func (pr *PersistedRevision) SetupWatch(ctx context.Context, key string) {
if key == "" {
log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
return
@@ -136,7 +136,7 @@
log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
pr.SetName(key)
- pr.events = pr.kvStore.CreateWatch(key)
+ pr.events = pr.kvStore.CreateWatch(ctx, key)
}
if !pr.isWatched {
@@ -145,11 +145,11 @@
log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
// Start watching
- go pr.startWatching()
+ go pr.startWatching(ctx)
}
}
-func (pr *PersistedRevision) startWatching() {
+func (pr *PersistedRevision) startWatching(ctx context.Context) {
log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
StopWatchLoop:
@@ -232,18 +232,18 @@
pathLock, _ = latestRev.getNode().GetProxy().parseForControlledPath(latestRev.getNode().GetProxy().getFullPath())
// Reserve the path to prevent others to modify while we reload from persistence
- if _, err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ if _, err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
log.Errorw("Unable to acquire a key and set it to a given value", log.Fields{"error": err})
}
latestRev.getNode().GetProxy().SetOperation(ProxyWatch)
// Load changes and apply to memory
- if _, err = latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs); err != nil {
+ if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
}
// Release path
- if err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.ReleaseReservation(pathLock + "_"); err != nil {
+ if err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_"); err != nil {
log.Errorw("Unable to release reservation for a specific key", log.Fields{"error": err})
}
} else {
@@ -251,7 +251,7 @@
log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
// Load changes and apply to memory
- if _, err = latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs); err != nil {
+ if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
}
}
@@ -319,10 +319,10 @@
}
// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
-func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateAllChildren(ctx context.Context, children map[string][]Revision, branch *Branch) Revision {
log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
- newNPR := pr.Revision.UpdateAllChildren(children, branch)
+ newNPR := pr.Revision.UpdateAllChildren(ctx, children, branch)
newPR := &PersistedRevision{
Revision: newNPR,
@@ -351,7 +351,7 @@
// StorageDrop takes care of eliminating a revision hash that is no longer needed
// and its associated config when required
-func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
+func (pr *PersistedRevision) StorageDrop(ctx context.Context, txid string, includeConfig bool) {
log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "key": pr.GetName(), "isStored": pr.isStored})
pr.mutex.Lock()
@@ -362,7 +362,7 @@
pr.isWatched = false
}
- if err := pr.kvStore.Delete(pr.GetName()); err != nil {
+ if err := pr.kvStore.Delete(ctx, pr.GetName()); err != nil {
log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
} else {
pr.isStored = false
@@ -412,7 +412,7 @@
updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
updatedChildRev.getNode().SetProxy(childRev.getNode().GetProxy())
- updatedChildRev.SetupWatch(updatedChildRev.GetName())
+ updatedChildRev.SetupWatch(ctx, updatedChildRev.GetName())
updatedChildRev.SetLastUpdate()
updatedChildRev.(*PersistedRevision).setVersion(version)
@@ -482,7 +482,7 @@
// We need to start watching this entry for future changes
childRev.SetName(typeName + "/" + keyValue)
- childRev.SetupWatch(childRev.GetName())
+ childRev.SetupWatch(ctx, childRev.GetName())
childRev.(*PersistedRevision).setVersion(version)
// Add entry to cache
@@ -537,7 +537,7 @@
if len(blobs) == 0 {
log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
- if blobs, err = pr.kvStore.List(path); err != nil {
+ if blobs, err = pr.kvStore.List(ctx, path); err != nil {
log.Errorw("failed-to-retrieve-data-from-kvstore", log.Fields{"error": err})
return nil, err
}
diff --git a/db/model/proxy.go b/db/model/proxy.go
index e4a8e6f..3ffc9ff 100644
--- a/db/model/proxy.go
+++ b/db/model/proxy.go
@@ -295,12 +295,12 @@
})
if p.getRoot().KvStore != nil {
- if _, err := p.getRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
return nil, err
}
defer func() {
- err := p.getRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
if err != nil {
log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
}
@@ -349,12 +349,12 @@
})
if p.getRoot().KvStore != nil {
- if _, err := p.getRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
return nil, err
}
defer func() {
- err := p.getRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
if err != nil {
log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
}
@@ -401,12 +401,12 @@
})
if p.getRoot().KvStore != nil {
- if _, err := p.getRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
return nil, err
}
defer func() {
- err := p.getRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
if err != nil {
log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
}
@@ -453,12 +453,12 @@
})
if p.getRoot().KvStore != nil {
- if _, err := p.getRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
return nil, err
}
defer func() {
- err := p.getRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
if err != nil {
log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
}
@@ -506,12 +506,12 @@
})
if p.getRoot().KvStore != nil {
- if _, err := p.getRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+ if _, err := p.getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
log.Errorw("unable-to-acquire-key-from-kvstore", log.Fields{"error": err})
return nil, err
}
defer func() {
- err := p.getRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ err := p.getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_")
if err != nil {
log.Errorw("Unable to release reservation for key", log.Fields{"error": err})
}
@@ -527,8 +527,8 @@
}
// commitTransaction will apply and merge modifications made in the transaction branch to the data model
-func (p *Proxy) commitTransaction(txid string) {
- p.getRoot().FoldTxBranch(txid)
+func (p *Proxy) commitTransaction(ctx context.Context, txid string) {
+ p.getRoot().FoldTxBranch(ctx, txid)
}
// cancelTransaction will terminate a transaction branch along will all changes within it
@@ -537,7 +537,7 @@
}
// CallbackFunction is a type used to define callback functions
-type CallbackFunction func(args ...interface{}) interface{}
+type CallbackFunction func(ctx context.Context, args ...interface{}) interface{}
// CallbackTuple holds the function and arguments details of a callback
type CallbackTuple struct {
@@ -546,14 +546,14 @@
}
// Execute will process the a callback with its provided arguments
-func (tuple *CallbackTuple) Execute(contextArgs []interface{}) interface{} {
+func (tuple *CallbackTuple) Execute(ctx context.Context, contextArgs []interface{}) interface{} {
args := []interface{}{}
args = append(args, tuple.args...)
args = append(args, contextArgs...)
- return tuple.callback(args...)
+ return tuple.callback(ctx, args...)
}
// RegisterCallback associates a callback to the proxy
@@ -588,7 +588,7 @@
p.DeleteCallback(callbackType, funcHash)
}
-func (p *Proxy) invoke(callback *CallbackTuple, context []interface{}) (result interface{}, err error) {
+func (p *Proxy) invoke(ctx context.Context, callback *CallbackTuple, context []interface{}) (result interface{}, err error) {
defer func() {
if r := recover(); r != nil {
errStr := fmt.Sprintf("callback error occurred: %+v", r)
@@ -597,13 +597,13 @@
}
}()
- result = callback.Execute(context)
+ result = callback.Execute(ctx, context)
return result, err
}
// InvokeCallbacks executes all callbacks associated to a specific type
-func (p *Proxy) InvokeCallbacks(args ...interface{}) (result interface{}) {
+func (p *Proxy) InvokeCallbacks(ctx context.Context, args ...interface{}) (result interface{}) {
callbackType := args[0].(CallbackType)
proceedOnError := args[1].(bool)
context := args[2:]
@@ -613,7 +613,7 @@
if callbacks := p.getCallbacks(callbackType); callbacks != nil {
p.mutex.Lock()
for _, callback := range callbacks {
- if result, err = p.invoke(callback, context); err != nil {
+ if result, err = p.invoke(ctx, callback, context); err != nil {
if !proceedOnError {
log.Info("An error occurred. Stopping callback invocation")
break
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index da022e8..6fb5a6f 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -819,11 +819,15 @@
}
func TestProxy_Callbacks_2_Invoke_WithNoInterruption(t *testing.T) {
- TestProxyRootDevice.InvokeCallbacks(PreAdd, false, nil)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ TestProxyRootDevice.InvokeCallbacks(ctx, PreAdd, false, nil)
}
func TestProxy_Callbacks_3_Invoke_WithInterruption(t *testing.T) {
- TestProxyRootDevice.InvokeCallbacks(PreAdd, true, nil)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ TestProxyRootDevice.InvokeCallbacks(ctx, PreAdd, true, nil)
}
func TestProxy_Callbacks_4_Unregister(t *testing.T) {
diff --git a/db/model/revision.go b/db/model/revision.go
index f7ecaba..77c1c02 100644
--- a/db/model/revision.go
+++ b/db/model/revision.go
@@ -25,11 +25,11 @@
// Revision -
type Revision interface {
- Finalize(bool)
+ Finalize(context.Context, bool)
SetConfig(revision *DataRevision)
GetConfig() *DataRevision
Drop(txid string, includeConfig bool)
- StorageDrop(txid string, includeConfig bool)
+ StorageDrop(ctx context.Context, txid string, includeConfig bool)
ChildDrop(childType string, childHash string)
ChildDropByName(childName string)
SetChildren(name string, children []Revision)
@@ -40,7 +40,7 @@
GetHash() string
ClearHash()
getVersion() int64
- SetupWatch(key string)
+ SetupWatch(ctx context.Context, key string)
SetName(name string)
GetName() string
SetBranch(branch *Branch)
@@ -53,5 +53,5 @@
LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error)
UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision
UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision
- UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision
+ UpdateAllChildren(ctx context.Context, children map[string][]Revision, branch *Branch) Revision
}
diff --git a/db/model/root.go b/db/model/root.go
index 0b74ddf..3ae5614 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -33,7 +33,7 @@
type Root interface {
Node
- ExecuteCallbacks()
+ ExecuteCallbacks(ctx context.Context)
AddCallback(callback CallbackFunction, args ...interface{})
AddNotificationCallback(callback CallbackFunction, args ...interface{})
}
@@ -98,30 +98,30 @@
}
// FoldTxBranch will merge the contents of a transaction branch with the root object
-func (r *root) FoldTxBranch(txid string) {
+func (r *root) FoldTxBranch(ctx context.Context, txid string) {
// Start by doing a dry run of the merge
// If that fails, it bails out and the branch is deleted
- if _, err := r.node.MergeBranch(txid, true); err != nil {
+ if _, err := r.node.MergeBranch(ctx, txid, true); err != nil {
// Merge operation fails
r.DeleteTxBranch(txid)
} else {
- if _, err = r.node.MergeBranch(txid, false); err != nil {
+ if _, err = r.node.MergeBranch(ctx, txid, false); err != nil {
log.Errorw("Unable to integrate the contents of a transaction branch within the latest branch of a given node", log.Fields{"error": err})
}
- r.node.GetRoot().ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks(ctx)
r.DeleteTxBranch(txid)
}
}
// ExecuteCallbacks will invoke all the callbacks linked to root object
-func (r *root) ExecuteCallbacks() {
+func (r *root) ExecuteCallbacks(ctx context.Context) {
r.mutex.Lock()
defer r.mutex.Unlock()
for len(r.Callbacks) > 0 {
callback := r.Callbacks[0]
r.Callbacks = r.Callbacks[1:]
- go callback.Execute(nil)
+ go callback.Execute(ctx, nil)
}
//for len(r.NotificationCallbacks) > 0 {
// callback := r.NotificationCallbacks[0]
@@ -162,7 +162,7 @@
r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
}
-func (r *root) syncParent(childRev Revision, txid string) {
+func (r *root) syncParent(ctx context.Context, childRev Revision, txid string) {
data := proto.Clone(r.GetProxy().ParentNode.Latest().GetData().(proto.Message))
for fieldName := range ChildrenFields(data) {
@@ -174,7 +174,7 @@
}
r.GetProxy().ParentNode.Latest().SetConfig(NewDataRevision(r.GetProxy().ParentNode.GetRoot(), data))
- r.GetProxy().ParentNode.Latest(txid).Finalize(false)
+ r.GetProxy().ParentNode.Latest(txid).Finalize(ctx, false)
}
// Update modifies the content of an object at a given path with the provided data
@@ -193,13 +193,13 @@
if result != nil {
if r.GetProxy().FullPath != r.GetProxy().Path {
- r.syncParent(result, txid)
+ r.syncParent(ctx, result, txid)
} else {
- result.Finalize(false)
+ result.Finalize(ctx, false)
}
}
- r.node.GetRoot().ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks(ctx)
return result
}
@@ -219,8 +219,8 @@
}
if result != nil {
- result.Finalize(true)
- r.node.GetRoot().ExecuteCallbacks()
+ result.Finalize(ctx, true)
+ r.node.GetRoot().ExecuteCallbacks(ctx)
}
return result
}
@@ -239,14 +239,14 @@
result = r.node.Remove(ctx, path, "", nil)
}
- r.node.GetRoot().ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks(ctx)
return result
}
// MakeLatest updates a branch with the latest node revision
-func (r *root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
- r.makeLatest(branch, revision, changeAnnouncement)
+func (r *root) MakeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
+ r.makeLatest(ctx, branch, revision, changeAnnouncement)
}
func (r *root) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
@@ -257,7 +257,7 @@
return NewNonPersistedRevision(r, branch, data, children)
}
-func (r *root) makeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
+func (r *root) makeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
r.node.makeLatest(branch, revision, changeAnnouncement)
if r.KvStore != nil && branch.Txid == "" {
@@ -273,7 +273,7 @@
// TODO report error
} else {
log.Debugf("Changing root to : %s", string(blob))
- if err := r.KvStore.Put("root", blob); err != nil {
+ if err := r.KvStore.Put(ctx, "root", blob); err != nil {
log.Errorf("failed to properly put value in kvstore - err: %s", err.Error())
}
}
diff --git a/db/model/transaction.go b/db/model/transaction.go
index 88be89b..7879a89 100644
--- a/db/model/transaction.go
+++ b/db/model/transaction.go
@@ -74,7 +74,7 @@
}
// Commit -
-func (t *Transaction) Commit() {
- t.proxy.commitTransaction(t.txid)
+func (t *Transaction) Commit(ctx context.Context) {
+ t.proxy.commitTransaction(ctx, t.txid)
t.txid = ""
}
diff --git a/db/model/transaction_test.go b/db/model/transaction_test.go
index 8149722..c66101b 100644
--- a/db/model/transaction_test.go
+++ b/db/model/transaction_test.go
@@ -21,6 +21,7 @@
"encoding/hex"
"strconv"
"testing"
+ "time"
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -81,7 +82,9 @@
TestTransactionTargetDeviceID = added.(*voltha.Device).Id
t.Logf("Added device : %+v", added)
}
- addTx.Commit()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ addTx.Commit(ctx)
}
func TestTransaction_3_GetDevice_PostAdd(t *testing.T) {
@@ -95,7 +98,9 @@
assert.NotNil(t, err)
}
t.Logf("retrieved device with ports: %+v", device1)
- getDevWithPortsTx.Commit()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ getDevWithPortsTx.Commit(ctx)
getDevTx := TestTransactionRootProxy.OpenTransaction()
device2, err := getDevTx.Get(context.Background(), basePath, 0, false)
@@ -105,7 +110,7 @@
}
t.Logf("retrieved device: %+v", device2)
- getDevTx.Commit()
+ getDevTx.Commit(ctx)
}
func TestTransaction_4_UpdateDevice(t *testing.T) {
@@ -140,7 +145,9 @@
t.Logf("Updated device : %+v", afterUpdate)
}
}
- updateTx.Commit()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ updateTx.Commit(ctx)
}
func TestTransaction_5_GetDevice_PostUpdate(t *testing.T) {
@@ -154,7 +161,9 @@
assert.NotNil(t, err)
}
t.Logf("retrieved device with ports: %+v", device1)
- getDevWithPortsTx.Commit()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ getDevWithPortsTx.Commit(ctx)
getDevTx := TestTransactionRootProxy.OpenTransaction()
device2, err := getDevTx.Get(context.Background(), basePath, 0, false)
@@ -164,7 +173,7 @@
}
t.Logf("retrieved device: %+v", device2)
- getDevTx.Commit()
+ getDevTx.Commit(ctx)
}
func TestTransaction_6_RemoveDevice(t *testing.T) {
@@ -179,7 +188,9 @@
} else {
t.Logf("Removed device : %+v", removed)
}
- removeTx.Commit()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ removeTx.Commit(ctx)
}
func TestTransaction_7_GetDevice_PostRemove(t *testing.T) {
@@ -194,5 +205,7 @@
}
t.Logf("retrieved device: %+v", device)
- getDevTx.Commit()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ getDevTx.Commit(ctx)
}