VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/db/model/persisted_revision.go b/db/model/persisted_revision.go
index f6309ce..bbb4a1d 100644
--- a/db/model/persisted_revision.go
+++ b/db/model/persisted_revision.go
@@ -81,11 +81,11 @@
 }
 
 // Finalize is responsible of saving the revision in the persistent storage
-func (pr *PersistedRevision) Finalize(skipOnExist bool) {
-	pr.store(skipOnExist)
+func (pr *PersistedRevision) Finalize(ctx context.Context, skipOnExist bool) {
+	pr.store(ctx, skipOnExist)
 }
 
-func (pr *PersistedRevision) store(skipOnExist bool) {
+func (pr *PersistedRevision) store(ctx context.Context, skipOnExist bool) {
 	if pr.GetBranch().Txid != "" {
 		return
 	}
@@ -110,7 +110,7 @@
 		}
 
 		getRevCache().Set(pr.GetName(), pr)
-		if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
+		if err := pr.kvStore.Put(ctx, pr.GetName(), blob); err != nil {
 			log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
 		} else {
 			log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
@@ -120,7 +120,7 @@
 }
 
 // SetupWatch -
-func (pr *PersistedRevision) SetupWatch(key string) {
+func (pr *PersistedRevision) SetupWatch(ctx context.Context, key string) {
 	if key == "" {
 		log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
 		return
@@ -136,7 +136,7 @@
 		log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
 
 		pr.SetName(key)
-		pr.events = pr.kvStore.CreateWatch(key)
+		pr.events = pr.kvStore.CreateWatch(ctx, key)
 	}
 
 	if !pr.isWatched {
@@ -145,11 +145,11 @@
 		log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
 
 		// Start watching
-		go pr.startWatching()
+		go pr.startWatching(ctx)
 	}
 }
 
-func (pr *PersistedRevision) startWatching() {
+func (pr *PersistedRevision) startWatching(ctx context.Context) {
 	log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
 
 StopWatchLoop:
@@ -232,18 +232,18 @@
 					pathLock, _ = latestRev.getNode().GetProxy().parseForControlledPath(latestRev.getNode().GetProxy().getFullPath())
 
 					// Reserve the path to prevent others to modify while we reload from persistence
-					if _, err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
+					if _, err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
 						log.Errorw("Unable to acquire a key and set it to a given value", log.Fields{"error": err})
 					}
 					latestRev.getNode().GetProxy().SetOperation(ProxyWatch)
 
 					// Load changes and apply to memory
-					if _, err = latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs); err != nil {
+					if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
 						log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
 					}
 
 					// Release path
-					if err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.ReleaseReservation(pathLock + "_"); err != nil {
+					if err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_"); err != nil {
 						log.Errorw("Unable to release reservation for a specific key", log.Fields{"error": err})
 					}
 				} else {
@@ -251,7 +251,7 @@
 					log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
 
 					// Load changes and apply to memory
-					if _, err = latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs); err != nil {
+					if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
 						log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
 					}
 				}
@@ -319,10 +319,10 @@
 }
 
 // UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
-func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateAllChildren(ctx context.Context, children map[string][]Revision, branch *Branch) Revision {
 	log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
 
-	newNPR := pr.Revision.UpdateAllChildren(children, branch)
+	newNPR := pr.Revision.UpdateAllChildren(ctx, children, branch)
 
 	newPR := &PersistedRevision{
 		Revision:  newNPR,
@@ -351,7 +351,7 @@
 
 // StorageDrop takes care of eliminating a revision hash that is no longer needed
 // and its associated config when required
-func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
+func (pr *PersistedRevision) StorageDrop(ctx context.Context, txid string, includeConfig bool) {
 	log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "key": pr.GetName(), "isStored": pr.isStored})
 
 	pr.mutex.Lock()
@@ -362,7 +362,7 @@
 			pr.isWatched = false
 		}
 
-		if err := pr.kvStore.Delete(pr.GetName()); err != nil {
+		if err := pr.kvStore.Delete(ctx, pr.GetName()); err != nil {
 			log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
 		} else {
 			pr.isStored = false
@@ -412,7 +412,7 @@
 			updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
 
 			updatedChildRev.getNode().SetProxy(childRev.getNode().GetProxy())
-			updatedChildRev.SetupWatch(updatedChildRev.GetName())
+			updatedChildRev.SetupWatch(ctx, updatedChildRev.GetName())
 			updatedChildRev.SetLastUpdate()
 			updatedChildRev.(*PersistedRevision).setVersion(version)
 
@@ -482,7 +482,7 @@
 
 		// We need to start watching this entry for future changes
 		childRev.SetName(typeName + "/" + keyValue)
-		childRev.SetupWatch(childRev.GetName())
+		childRev.SetupWatch(ctx, childRev.GetName())
 		childRev.(*PersistedRevision).setVersion(version)
 
 		// Add entry to cache
@@ -537,7 +537,7 @@
 		if len(blobs) == 0 {
 			log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
 
-			if blobs, err = pr.kvStore.List(path); err != nil {
+			if blobs, err = pr.kvStore.List(ctx, path); err != nil {
 				log.Errorw("failed-to-retrieve-data-from-kvstore", log.Fields{"error": err})
 				return nil, err
 			}