VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/db/model/root.go b/db/model/root.go
index 0b74ddf..3ae5614 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -33,7 +33,7 @@
 type Root interface {
 	Node
 
-	ExecuteCallbacks()
+	ExecuteCallbacks(ctx context.Context)
 	AddCallback(callback CallbackFunction, args ...interface{})
 	AddNotificationCallback(callback CallbackFunction, args ...interface{})
 }
@@ -98,30 +98,30 @@
 }
 
 // FoldTxBranch will merge the contents of a transaction branch with the root object
-func (r *root) FoldTxBranch(txid string) {
+func (r *root) FoldTxBranch(ctx context.Context, txid string) {
 	// Start by doing a dry run of the merge
 	// If that fails, it bails out and the branch is deleted
-	if _, err := r.node.MergeBranch(txid, true); err != nil {
+	if _, err := r.node.MergeBranch(ctx, txid, true); err != nil {
 		// Merge operation fails
 		r.DeleteTxBranch(txid)
 	} else {
-		if _, err = r.node.MergeBranch(txid, false); err != nil {
+		if _, err = r.node.MergeBranch(ctx, txid, false); err != nil {
 			log.Errorw("Unable to integrate the contents of a transaction branch within the latest branch of a given node", log.Fields{"error": err})
 		}
-		r.node.GetRoot().ExecuteCallbacks()
+		r.node.GetRoot().ExecuteCallbacks(ctx)
 		r.DeleteTxBranch(txid)
 	}
 }
 
 // ExecuteCallbacks will invoke all the callbacks linked to root object
-func (r *root) ExecuteCallbacks() {
+func (r *root) ExecuteCallbacks(ctx context.Context) {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
 
 	for len(r.Callbacks) > 0 {
 		callback := r.Callbacks[0]
 		r.Callbacks = r.Callbacks[1:]
-		go callback.Execute(nil)
+		go callback.Execute(ctx, nil)
 	}
 	//for len(r.NotificationCallbacks) > 0 {
 	//	callback := r.NotificationCallbacks[0]
@@ -162,7 +162,7 @@
 	r.NotificationCallbacks = append(r.NotificationCallbacks, CallbackTuple{callback, args})
 }
 
-func (r *root) syncParent(childRev Revision, txid string) {
+func (r *root) syncParent(ctx context.Context, childRev Revision, txid string) {
 	data := proto.Clone(r.GetProxy().ParentNode.Latest().GetData().(proto.Message))
 
 	for fieldName := range ChildrenFields(data) {
@@ -174,7 +174,7 @@
 	}
 
 	r.GetProxy().ParentNode.Latest().SetConfig(NewDataRevision(r.GetProxy().ParentNode.GetRoot(), data))
-	r.GetProxy().ParentNode.Latest(txid).Finalize(false)
+	r.GetProxy().ParentNode.Latest(txid).Finalize(ctx, false)
 }
 
 // Update modifies the content of an object at a given path with the provided data
@@ -193,13 +193,13 @@
 
 	if result != nil {
 		if r.GetProxy().FullPath != r.GetProxy().Path {
-			r.syncParent(result, txid)
+			r.syncParent(ctx, result, txid)
 		} else {
-			result.Finalize(false)
+			result.Finalize(ctx, false)
 		}
 	}
 
-	r.node.GetRoot().ExecuteCallbacks()
+	r.node.GetRoot().ExecuteCallbacks(ctx)
 
 	return result
 }
@@ -219,8 +219,8 @@
 	}
 
 	if result != nil {
-		result.Finalize(true)
-		r.node.GetRoot().ExecuteCallbacks()
+		result.Finalize(ctx, true)
+		r.node.GetRoot().ExecuteCallbacks(ctx)
 	}
 	return result
 }
@@ -239,14 +239,14 @@
 		result = r.node.Remove(ctx, path, "", nil)
 	}
 
-	r.node.GetRoot().ExecuteCallbacks()
+	r.node.GetRoot().ExecuteCallbacks(ctx)
 
 	return result
 }
 
 // MakeLatest updates a branch with the latest node revision
-func (r *root) MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
-	r.makeLatest(branch, revision, changeAnnouncement)
+func (r *root) MakeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
+	r.makeLatest(ctx, branch, revision, changeAnnouncement)
 }
 
 func (r *root) MakeRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
@@ -257,7 +257,7 @@
 	return NewNonPersistedRevision(r, branch, data, children)
 }
 
-func (r *root) makeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
+func (r *root) makeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple) {
 	r.node.makeLatest(branch, revision, changeAnnouncement)
 
 	if r.KvStore != nil && branch.Txid == "" {
@@ -273,7 +273,7 @@
 			// TODO report error
 		} else {
 			log.Debugf("Changing root to : %s", string(blob))
-			if err := r.KvStore.Put("root", blob); err != nil {
+			if err := r.KvStore.Put(ctx, "root", blob); err != nil {
 				log.Errorf("failed to properly put value in kvstore - err: %s", err.Error())
 			}
 		}