VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
index 9d4ab52..c0c16f9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
@@ -42,6 +42,9 @@
 	channels  []chan *ic.InterContainerMessage
 }
 
+// static check to ensure SaramaClient implements Client
+var _ Client = &SaramaClient{}
+
 // SaramaClient represents the messaging proxy
 type SaramaClient struct {
 	cAdmin                        sarama.ClusterAdmin
@@ -68,6 +71,7 @@
 	numReplicas                   int
 	autoCreateTopic               bool
 	doneCh                        chan int
+	metadataCallback              func(fromTopic string, timestamp int64)
 	topicToConsumerChannelMap     map[string]*consumerChannels
 	lockTopicToConsumerChannelMap sync.RWMutex
 	topicLockMap                  map[string]*sync.RWMutex
@@ -460,6 +464,10 @@
 	return err
 }
 
+func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
+	sc.metadataCallback = callback
+}
+
 func (sc *SaramaClient) updateLiveness(alive bool) {
 	// Post a consistent stream of liveness data to the channel,
 	// so that in a live state, the core does not timeout and
@@ -930,12 +938,16 @@
 func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.RLock()
-	defer sc.lockTopicToConsumerChannelMap.RUnlock()
 	for _, ch := range consumerCh.channels {
 		go func(c chan *ic.InterContainerMessage) {
 			c <- protoMessage
 		}(ch)
 	}
+	sc.lockTopicToConsumerChannelMap.RUnlock()
+
+	if callback := sc.metadataCallback; callback != nil {
+		callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
+	}
 }
 
 func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {