VOL-2180 code changes for context addition
Integrating InterContainerProxy interface changes

Change-Id: Ia20c5ac3093b7845acf80cce801ec0c1d90c125f
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
index 9d4ab52..c0c16f9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
@@ -42,6 +42,9 @@
 	channels  []chan *ic.InterContainerMessage
 }
 
+// static check to ensure SaramaClient implements Client
+var _ Client = &SaramaClient{}
+
 // SaramaClient represents the messaging proxy
 type SaramaClient struct {
 	cAdmin                        sarama.ClusterAdmin
@@ -68,6 +71,7 @@
 	numReplicas                   int
 	autoCreateTopic               bool
 	doneCh                        chan int
+	metadataCallback              func(fromTopic string, timestamp int64)
 	topicToConsumerChannelMap     map[string]*consumerChannels
 	lockTopicToConsumerChannelMap sync.RWMutex
 	topicLockMap                  map[string]*sync.RWMutex
@@ -460,6 +464,10 @@
 	return err
 }
 
+func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
+	sc.metadataCallback = callback
+}
+
 func (sc *SaramaClient) updateLiveness(alive bool) {
 	// Post a consistent stream of liveness data to the channel,
 	// so that in a live state, the core does not timeout and
@@ -930,12 +938,16 @@
 func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.RLock()
-	defer sc.lockTopicToConsumerChannelMap.RUnlock()
 	for _, ch := range consumerCh.channels {
 		go func(c chan *ic.InterContainerMessage) {
 			c <- protoMessage
 		}(ch)
 	}
+	sc.lockTopicToConsumerChannelMap.RUnlock()
+
+	if callback := sc.metadataCallback; callback != nil {
+		callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
+	}
 }
 
 func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {