Added SubscribeForMetadata() to the sarama client, to access the header field of each received message.

Also replaced TestKafkaClientImplementsKafkaClientIf() with static checks.

For VOL-2207.  Please consider these related patchsets together:
https://gerrit.opencord.org/#/q/VOL-2207

Change-Id: I77f03fbb0ebcdf60cb5f8d0c8c3276325510e874
diff --git a/VERSION b/VERSION
index 4a36342..cb2b00e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.0
+3.0.1
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index 6289043..9abad93 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -66,6 +66,7 @@
 	DeleteTopic(topic *Topic) error
 	Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
 	UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
+	SubscribeForMetadata(func(fromTopic string, timestamp int64))
 	Send(msg interface{}, topic *Topic, keys ...string) error
 	SendLiveness() error
 	EnableLivenessChannel(enable bool) chan bool
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 042e121..8285876 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -520,7 +520,7 @@
 		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
-		Timestamp: time.Now().Unix(),
+		Timestamp: time.Now().UnixNano(),
 	}
 	responseBody := &ic.InterContainerResponseBody{
 		Success: false,
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 9d4ab52..c0c16f9 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -42,6 +42,9 @@
 	channels  []chan *ic.InterContainerMessage
 }
 
+// static check to ensure SaramaClient implements Client
+var _ Client = &SaramaClient{}
+
 // SaramaClient represents the messaging proxy
 type SaramaClient struct {
 	cAdmin                        sarama.ClusterAdmin
@@ -68,6 +71,7 @@
 	numReplicas                   int
 	autoCreateTopic               bool
 	doneCh                        chan int
+	metadataCallback              func(fromTopic string, timestamp int64)
 	topicToConsumerChannelMap     map[string]*consumerChannels
 	lockTopicToConsumerChannelMap sync.RWMutex
 	topicLockMap                  map[string]*sync.RWMutex
@@ -460,6 +464,10 @@
 	return err
 }
 
+func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
+	sc.metadataCallback = callback
+}
+
 func (sc *SaramaClient) updateLiveness(alive bool) {
 	// Post a consistent stream of liveness data to the channel,
 	// so that in a live state, the core does not timeout and
@@ -930,12 +938,16 @@
 func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.RLock()
-	defer sc.lockTopicToConsumerChannelMap.RUnlock()
 	for _, ch := range consumerCh.channels {
 		go func(c chan *ic.InterContainerMessage) {
 			c <- protoMessage
 		}(ch)
 	}
+	sc.lockTopicToConsumerChannelMap.RUnlock()
+
+	if callback := sc.metadataCallback; callback != nil {
+		callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
+	}
 }
 
 func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
diff --git a/pkg/mocks/kafka_client.go b/pkg/mocks/kafka_client.go
index 51bd532..62af5db 100644
--- a/pkg/mocks/kafka_client.go
+++ b/pkg/mocks/kafka_client.go
@@ -26,6 +26,9 @@
 	"google.golang.org/grpc/status"
 )
 
+// static check to ensure KafkaClient implements kafka.Client
+var _ kafka.Client = &KafkaClient{}
+
 type KafkaClient struct {
 	topicsChannelMap map[string][]chan *ic.InterContainerMessage
 	lock             sync.RWMutex
@@ -108,6 +111,10 @@
 	return nil
 }
 
+func (kc *KafkaClient) SubscribeForMetadata(_ func(fromTopic string, timestamp int64)) {
+	panic("unimplemented")
+}
+
 func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
 	req, ok := msg.(*ic.InterContainerMessage)
 	if !ok {
diff --git a/pkg/mocks/kafka_client_test.go b/pkg/mocks/kafka_client_test.go
index e4517cc..dcf1973 100644
--- a/pkg/mocks/kafka_client_test.go
+++ b/pkg/mocks/kafka_client_test.go
@@ -25,14 +25,6 @@
 	"github.com/stretchr/testify/assert"
 )
 
-func TestKafkaClientImplementsKafkaClientIf(t *testing.T) {
-	client := NewKafkaClient()
-
-	if _, ok := interface{}(client).(kafka.Client); !ok {
-		t.Error("mock kafka client does not implement voltha-lib-go/v3/pkg/kafka/Client interface")
-	}
-}
-
 func TestKafkaClientCreateTopic(t *testing.T) {
 	cTkc := NewKafkaClient()
 	topic := kafka.Topic{Name: "myTopic"}