[VOL-4290] Voltha go library updates for gRPC migration

Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/pkg/mocks/kafka/kafka_client.go b/pkg/mocks/kafka/kafka_client.go
index ce9dada..ea410ac 100644
--- a/pkg/mocks/kafka/kafka_client.go
+++ b/pkg/mocks/kafka/kafka_client.go
@@ -18,30 +18,34 @@
 import (
 	"context"
 	"fmt"
-	"github.com/golang/protobuf/ptypes"
 	"sync"
 	"time"
 
 	"github.com/golang/protobuf/proto"
-	"github.com/opencord/voltha-lib-go/v6/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v6/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
+const (
+	maxConcurrentMessage = 100
+)
+
 // static check to ensure KafkaClient implements kafka.Client
 var _ kafka.Client = &KafkaClient{}
 
 type KafkaClient struct {
-	topicsChannelMap map[string][]chan *ic.InterContainerMessage
+	topicsChannelMap map[string][]chan proto.Message
 	lock             sync.RWMutex
+	alive            bool
+	livenessMutex    sync.Mutex
+	liveness         chan bool
 }
 
 func NewKafkaClient() *KafkaClient {
 	return &KafkaClient{
-		topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
+		topicsChannelMap: make(map[string][]chan proto.Message),
 		lock:             sync.RWMutex{},
 	}
 }
@@ -70,7 +74,7 @@
 	if _, ok := kc.topicsChannelMap[topic.Name]; ok {
 		return fmt.Errorf("Topic %s already exist", topic.Name)
 	}
-	ch := make(chan *ic.InterContainerMessage)
+	ch := make(chan proto.Message)
 	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
 	return nil
 }
@@ -83,21 +87,21 @@
 	return nil
 }
 
-func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan proto.Message, error) {
 	logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
 	kc.lock.Lock()
 	defer kc.lock.Unlock()
-	ch := make(chan *ic.InterContainerMessage)
+	ch := make(chan proto.Message, maxConcurrentMessage)
 	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
 	return ch, nil
 }
 
-func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
+func removeChannel(s []chan proto.Message, i int) []chan proto.Message {
 	s[i] = s[len(s)-1]
 	return s[:len(s)-1]
 }
 
-func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan proto.Message) error {
 	logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
 	kc.lock.Lock()
 	defer kc.lock.Unlock()
@@ -120,55 +124,50 @@
 	logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
 }
 
-func toIntercontainerMessage(event *voltha.Event) *ic.InterContainerMessage {
-	msg := &ic.InterContainerMessage{
-		Header: &ic.Header{
-			Id:        event.Header.Id,
-			Type:      ic.MessageType_REQUEST,
-			Timestamp: event.Header.RaisedTs,
-		},
-	}
-	// Marshal event
-	if eventBody, err := ptypes.MarshalAny(event); err == nil {
-		msg.Body = eventBody
-	}
-	return msg
-}
-
 func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
 	// Assert message is a proto message
-	// ascertain the value interface type is a proto.Message
-	if _, ok := msg.(proto.Message); !ok {
+	protoMsg, ok := msg.(proto.Message)
+	if !ok {
 		logger.Warnw(ctx, "message-not-a-proto-message", log.Fields{"msg": msg})
 		return status.Error(codes.InvalidArgument, "msg-not-a-proto-msg")
 	}
-	req, ok := msg.(*ic.InterContainerMessage)
-	if !ok {
-		event, ok := msg.(*voltha.Event) //This is required as event message will be of type voltha.Event
-		if !ok {
-			return status.Error(codes.InvalidArgument, "unexpected-message-type")
-		}
-		req = toIntercontainerMessage(event)
-	}
-	if req == nil {
-		return status.Error(codes.InvalidArgument, "msg-nil")
-	}
 	kc.lock.RLock()
 	defer kc.lock.RUnlock()
 	for _, ch := range kc.topicsChannelMap[topic.Name] {
-		logger.Debugw(ctx, "Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
-		ch <- req
+		select {
+		case ch <- protoMsg:
+			logger.Debugw(ctx, "publishing", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
+		default:
+			logger.Debugw(ctx, "ignoring-event-channel-busy", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
+		}
 	}
 	return nil
 }
 
 func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
-	return status.Error(codes.Unimplemented, "SendLiveness")
+	kc.livenessMutex.Lock()
+	defer kc.livenessMutex.Unlock()
+	if kc.liveness != nil {
+		kc.liveness <- true // I am a mock
+	}
+	return nil
 }
 
 func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
-	logger.Debug(ctx, "EnableLivenessChannel - unimplemented")
-	return nil
+	logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
+	if enable {
+		kc.livenessMutex.Lock()
+		defer kc.livenessMutex.Unlock()
+		if kc.liveness == nil {
+			logger.Info(ctx, "kafka-create-liveness-channel")
+			kc.liveness = make(chan bool, 10)
+			// post intial state to the channel
+			kc.liveness <- kc.alive
+		}
+	} else {
+		panic("Turning off liveness reporting is not supported")
+	}
+	return kc.liveness
 }
 
 func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {