[VOL-4290] Voltha go library updates for gRPC migration

Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 79827aa..185f6ec 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -27,10 +27,8 @@
 	scc "github.com/bsm/sarama-cluster"
 	"github.com/eapache/go-resiliency/breaker"
 	"github.com/golang/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
 	"github.com/google/uuid"
-	"github.com/opencord/voltha-lib-go/v6/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 // consumerChannels represents one or more consumers listening on a kafka topic.  Once a message is received on that
@@ -38,7 +36,7 @@
 //consumer or a group consumer
 type consumerChannels struct {
 	consumers []interface{}
-	channels  []chan *ic.InterContainerMessage
+	channels  []chan proto.Message
 }
 
 // static check to ensure SaramaClient implements Client
@@ -378,7 +376,7 @@
 
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
-func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
@@ -388,13 +386,13 @@
 	if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
 		logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
 		// Create a channel specific for that consumers and add it to the consumers channel map
-		ch := make(chan *ic.InterContainerMessage)
+		ch := make(chan proto.Message)
 		sc.addChannelToConsumerChannelMap(ctx, topic, ch)
 		return ch, nil
 	}
 
 	// Register for the topic and set it up
-	var consumerListeningChannel chan *ic.InterContainerMessage
+	var consumerListeningChannel chan proto.Message
 	var err error
 
 	// Use the consumerType option to figure out the type of consumer to launch
@@ -441,7 +439,7 @@
 }
 
 //UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
@@ -609,7 +607,7 @@
 			// without blocking others. The monitor shouldn't really fall
 			// behind...
 			sc.liveness = make(chan bool, 10)
-			// post intial state to the channel
+			// post initial state to the channel
 			sc.liveness <- sc.alive
 		}
 	} else {
@@ -635,7 +633,7 @@
 			// without blocking others. The monitor shouldn't really fall
 			// behind...
 			sc.healthiness = make(chan bool, 10)
-			// post intial state to the channel
+			// post initial state to the channel
 			sc.healthiness <- sc.healthy
 		}
 	} else {
@@ -749,7 +747,7 @@
 	return nil
 }
 
-func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -788,7 +786,7 @@
 	return err
 }
 
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -908,19 +906,18 @@
 
 // dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
 // topic via the unique channel each subscriber received during subscription
-func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.RLock()
 	for _, ch := range consumerCh.channels {
-		go func(c chan *ic.InterContainerMessage) {
+		go func(c chan proto.Message) {
 			c <- protoMessage
 		}(ch)
 	}
 	sc.lockTopicToConsumerChannelMap.RUnlock()
 
 	if callback := sc.metadataCallback; callback != nil {
-		ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
-		callback(protoMessage.Header.FromTopic, ts)
+		callback(fromTopic, ts)
 	}
 }
 
@@ -948,12 +945,12 @@
 			msgBody := msg.Value
 			sc.updateLiveness(ctx, true)
 			logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
-			icm := &ic.InterContainerMessage{}
-			if err := proto.Unmarshal(msgBody, icm); err != nil {
+			var protoMsg proto.Message
+			if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
 				logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
 				continue
 			}
-			go sc.dispatchToConsumers(consumerChnls, icm)
+			go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
 		case <-sc.doneCh:
 			logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
@@ -989,12 +986,12 @@
 			sc.updateLiveness(ctx, true)
 			logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
 			msgBody := msg.Value
-			icm := &ic.InterContainerMessage{}
-			if err := proto.Unmarshal(msgBody, icm); err != nil {
+			var protoMsg proto.Message
+			if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
 				logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
 				continue
 			}
-			go sc.dispatchToConsumers(consumerChnls, icm)
+			go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
 			consumer.MarkOffset(msg, "")
 		case ntf := <-consumer.Notifications():
 			logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
@@ -1030,7 +1027,7 @@
 
 //// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 //// for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
 	var pConsumers []sarama.PartitionConsumer
 	var err error
 
@@ -1046,10 +1043,10 @@
 
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ic.InterContainerMessage)
+	consumerListeningChannel := make(chan proto.Message)
 	cc := &consumerChannels{
 		consumers: consumersIf,
-		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan proto.Message{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -1069,7 +1066,7 @@
 
 // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 // for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
@@ -1079,10 +1076,10 @@
 	}
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ic.InterContainerMessage)
+	consumerListeningChannel := make(chan proto.Message)
 	cc := &consumerChannels{
 		consumers: []interface{}{pConsumer},
-		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan proto.Message{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -1120,9 +1117,9 @@
 	return pConsumers, nil
 }
 
-func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
+func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
 	var i int
-	var channel chan *ic.InterContainerMessage
+	var channel chan proto.Message
 	for i, channel = range channels {
 		if channel == ch {
 			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]