VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/db/model/node.go b/db/model/node.go
index 3bead57..bec07a5 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -38,7 +38,7 @@
 
 // Node interface is an abstraction of the node data structure
 type Node interface {
-	MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
+	MakeLatest(ctx context.Context, branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
 
 	// CRUD functions
 	Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
@@ -52,11 +52,11 @@
 
 	MakeBranch(txid string) *Branch
 	DeleteBranch(txid string)
-	MergeBranch(txid string, dryRun bool) (Revision, error)
+	MergeBranch(ctx context.Context, txid string, dryRun bool) (Revision, error)
 
 	MakeTxBranch() string
 	DeleteTxBranch(txid string)
-	FoldTxBranch(txid string)
+	FoldTxBranch(ctx context.Context, txid string)
 }
 
 type node struct {
@@ -378,7 +378,7 @@
 //getPath traverses the specified path and retrieves the data associated to it
 func (n *node) getPath(ctx context.Context, rev Revision, path string, depth int) interface{} {
 	if path == "" {
-		return n.getData(rev, depth)
+		return n.getData(ctx, rev, depth)
 	}
 
 	partition := strings.SplitN(path, "/", 2)
@@ -413,7 +413,7 @@
 			var response []interface{}
 			for _, childRev := range children {
 				childNode := childRev.getNode()
-				value := childNode.getData(childRev, depth)
+				value := childNode.getData(ctx, childRev, depth)
 				response = append(response, value)
 			}
 			return response
@@ -425,7 +425,7 @@
 		}
 		for _, childRev := range children {
 			childNode := childRev.getNode()
-			value := childNode.getData(childRev, depth)
+			value := childNode.getData(ctx, childRev, depth)
 			response = append(response, value)
 		}
 		return response
@@ -439,13 +439,13 @@
 }
 
 // getData retrieves the data from a node revision
-func (n *node) getData(rev Revision, depth int) interface{} {
+func (n *node) getData(ctx context.Context, rev Revision, depth int) interface{} {
 	msg := rev.GetBranch().GetLatest().Get(depth)
 	var modifiedMsg interface{}
 
 	if n.GetProxy() != nil {
 		log.Debugw("invoking-get-callbacks", log.Fields{"data": msg})
-		if modifiedMsg = n.GetProxy().InvokeCallbacks(Get, false, msg); modifiedMsg != nil {
+		if modifiedMsg = n.GetProxy().InvokeCallbacks(ctx, Get, false, msg); modifiedMsg != nil {
 			msg = modifiedMsg
 		}
 
@@ -605,7 +605,7 @@
 
 	if n.GetProxy() != nil {
 		log.Debug("invoking proxy PreUpdate Callbacks")
-		n.GetProxy().InvokeCallbacks(PreUpdate, false, branch.GetLatest(), data)
+		n.GetProxy().InvokeCallbacks(ctx, PreUpdate, false, branch.GetLatest(), data)
 	}
 
 	if branch.GetLatest().GetData().(proto.Message).String() != data.(proto.Message).String() {
@@ -666,7 +666,7 @@
 			if field.Key != "" {
 				if n.GetProxy() != nil {
 					log.Debug("invoking proxy PreAdd Callbacks")
-					n.GetProxy().InvokeCallbacks(PreAdd, false, data)
+					n.GetProxy().InvokeCallbacks(ctx, PreAdd, false, data)
 				}
 
 				children = make([]Revision, len(rev.GetChildren(name)))
@@ -691,7 +691,7 @@
 
 				updatedRev := rev.UpdateChildren(ctx, name, children, branch)
 				changes := []ChangeTuple{{PostAdd, nil, childRev.GetData()}}
-				childRev.SetupWatch(childRev.GetName())
+				childRev.SetupWatch(ctx, childRev.GetName())
 
 				n.makeLatest(branch, updatedRev, changes)
 
@@ -828,13 +828,13 @@
 			if childRev != nil && idx >= 0 {
 				if n.GetProxy() != nil {
 					data := childRev.GetData()
-					n.GetProxy().InvokeCallbacks(PreRemove, false, data)
+					n.GetProxy().InvokeCallbacks(ctx, PreRemove, false, data)
 					postAnnouncement = append(postAnnouncement, ChangeTuple{PostRemove, data, nil})
 				} else {
 					postAnnouncement = append(postAnnouncement, ChangeTuple{PostRemove, childRev.GetData(), nil})
 				}
 
-				childRev.StorageDrop(txid, true)
+				childRev.StorageDrop(ctx, txid, true)
 				getRevCache().Delete(childRev.GetName())
 
 				branch.LatestLock.Lock()
@@ -877,12 +877,12 @@
 	delete(n.Branches, txid)
 }
 
-func (n *node) mergeChild(txid string, dryRun bool) func(Revision) Revision {
+func (n *node) mergeChild(ctx context.Context, txid string, dryRun bool) func(Revision) Revision {
 	f := func(rev Revision) Revision {
 		childBranch := rev.GetBranch()
 
 		if childBranch.Txid == txid {
-			rev, _ = childBranch.Node.MergeBranch(txid, dryRun)
+			rev, _ = childBranch.Node.MergeBranch(ctx, txid, dryRun)
 		}
 
 		return rev
@@ -891,7 +891,7 @@
 }
 
 // MergeBranch will integrate the contents of a transaction branch within the latest branch of a given node
-func (n *node) MergeBranch(txid string, dryRun bool) (Revision, error) {
+func (n *node) MergeBranch(ctx context.Context, txid string, dryRun bool) (Revision, error) {
 	srcBranch := n.GetBranch(txid)
 	dstBranch := n.GetBranch(NONE)
 
@@ -899,7 +899,7 @@
 	srcRev := srcBranch.GetLatest()
 	dstRev := dstBranch.GetLatest()
 
-	rev, changes := Merge3Way(forkRev, srcRev, dstRev, n.mergeChild(txid, dryRun), dryRun)
+	rev, changes := Merge3Way(ctx, forkRev, srcRev, dstRev, n.mergeChild(ctx, txid, dryRun), dryRun)
 
 	if !dryRun {
 		if rev != nil {