[VOL-4290] Voltha go library updates for gRPC migration

Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/pkg/mocks/kafka/common.go b/pkg/mocks/kafka/common.go
index 5c46fb1..ff4aec9 100644
--- a/pkg/mocks/kafka/common.go
+++ b/pkg/mocks/kafka/common.go
@@ -16,7 +16,7 @@
 package kafka
 
 import (
-	"github.com/opencord/voltha-lib-go/v6/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/pkg/mocks/kafka/endpoint_manager.go b/pkg/mocks/kafka/endpoint_manager.go
deleted file mode 100644
index 807815d..0000000
--- a/pkg/mocks/kafka/endpoint_manager.go
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka
-
-import (
-	"context"
-	"github.com/opencord/voltha-lib-go/v6/pkg/kafka"
-)
-
-type EndpointManager struct{}
-
-func NewEndpointManager() kafka.EndpointManager {
-	mock := &EndpointManager{}
-	return mock
-}
-
-func (em *EndpointManager) GetEndpoint(ctx context.Context, deviceID string, serviceType string) (kafka.Endpoint, error) {
-	// TODO add mocks call and args
-	return kafka.Endpoint(serviceType), nil
-}
-
-func (em *EndpointManager) IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error) {
-	// TODO add mocks call and args
-	return true, nil
-}
-
-func (em *EndpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (kafka.ReplicaID, error) {
-	return kafka.ReplicaID(1), nil
-}
diff --git a/pkg/mocks/kafka/kafka_client.go b/pkg/mocks/kafka/kafka_client.go
index ce9dada..ea410ac 100644
--- a/pkg/mocks/kafka/kafka_client.go
+++ b/pkg/mocks/kafka/kafka_client.go
@@ -18,30 +18,34 @@
 import (
 	"context"
 	"fmt"
-	"github.com/golang/protobuf/ptypes"
 	"sync"
 	"time"
 
 	"github.com/golang/protobuf/proto"
-	"github.com/opencord/voltha-lib-go/v6/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v6/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
+const (
+	maxConcurrentMessage = 100
+)
+
 // static check to ensure KafkaClient implements kafka.Client
 var _ kafka.Client = &KafkaClient{}
 
 type KafkaClient struct {
-	topicsChannelMap map[string][]chan *ic.InterContainerMessage
+	topicsChannelMap map[string][]chan proto.Message
 	lock             sync.RWMutex
+	alive            bool
+	livenessMutex    sync.Mutex
+	liveness         chan bool
 }
 
 func NewKafkaClient() *KafkaClient {
 	return &KafkaClient{
-		topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
+		topicsChannelMap: make(map[string][]chan proto.Message),
 		lock:             sync.RWMutex{},
 	}
 }
@@ -70,7 +74,7 @@
 	if _, ok := kc.topicsChannelMap[topic.Name]; ok {
 		return fmt.Errorf("Topic %s already exist", topic.Name)
 	}
-	ch := make(chan *ic.InterContainerMessage)
+	ch := make(chan proto.Message)
 	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
 	return nil
 }
@@ -83,21 +87,21 @@
 	return nil
 }
 
-func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan proto.Message, error) {
 	logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
 	kc.lock.Lock()
 	defer kc.lock.Unlock()
-	ch := make(chan *ic.InterContainerMessage)
+	ch := make(chan proto.Message, maxConcurrentMessage)
 	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
 	return ch, nil
 }
 
-func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
+func removeChannel(s []chan proto.Message, i int) []chan proto.Message {
 	s[i] = s[len(s)-1]
 	return s[:len(s)-1]
 }
 
-func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan proto.Message) error {
 	logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
 	kc.lock.Lock()
 	defer kc.lock.Unlock()
@@ -120,55 +124,50 @@
 	logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
 }
 
-func toIntercontainerMessage(event *voltha.Event) *ic.InterContainerMessage {
-	msg := &ic.InterContainerMessage{
-		Header: &ic.Header{
-			Id:        event.Header.Id,
-			Type:      ic.MessageType_REQUEST,
-			Timestamp: event.Header.RaisedTs,
-		},
-	}
-	// Marshal event
-	if eventBody, err := ptypes.MarshalAny(event); err == nil {
-		msg.Body = eventBody
-	}
-	return msg
-}
-
 func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
 	// Assert message is a proto message
-	// ascertain the value interface type is a proto.Message
-	if _, ok := msg.(proto.Message); !ok {
+	protoMsg, ok := msg.(proto.Message)
+	if !ok {
 		logger.Warnw(ctx, "message-not-a-proto-message", log.Fields{"msg": msg})
 		return status.Error(codes.InvalidArgument, "msg-not-a-proto-msg")
 	}
-	req, ok := msg.(*ic.InterContainerMessage)
-	if !ok {
-		event, ok := msg.(*voltha.Event) //This is required as event message will be of type voltha.Event
-		if !ok {
-			return status.Error(codes.InvalidArgument, "unexpected-message-type")
-		}
-		req = toIntercontainerMessage(event)
-	}
-	if req == nil {
-		return status.Error(codes.InvalidArgument, "msg-nil")
-	}
 	kc.lock.RLock()
 	defer kc.lock.RUnlock()
 	for _, ch := range kc.topicsChannelMap[topic.Name] {
-		logger.Debugw(ctx, "Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
-		ch <- req
+		select {
+		case ch <- protoMsg:
+			logger.Debugw(ctx, "publishing", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
+		default:
+			logger.Debugw(ctx, "ignoring-event-channel-busy", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
+		}
 	}
 	return nil
 }
 
 func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
-	return status.Error(codes.Unimplemented, "SendLiveness")
+	kc.livenessMutex.Lock()
+	defer kc.livenessMutex.Unlock()
+	if kc.liveness != nil {
+		kc.liveness <- true // I am a mock
+	}
+	return nil
 }
 
 func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
-	logger.Debug(ctx, "EnableLivenessChannel - unimplemented")
-	return nil
+	logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
+	if enable {
+		kc.livenessMutex.Lock()
+		defer kc.livenessMutex.Unlock()
+		if kc.liveness == nil {
+			logger.Info(ctx, "kafka-create-liveness-channel")
+			kc.liveness = make(chan bool, 10)
+			// post intial state to the channel
+			kc.liveness <- kc.alive
+		}
+	} else {
+		panic("Turning off liveness reporting is not supported")
+	}
+	return kc.liveness
 }
 
 func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
diff --git a/pkg/mocks/kafka/kafka_client_test.go b/pkg/mocks/kafka/kafka_client_test.go
index edd531e..cc3a779 100644
--- a/pkg/mocks/kafka/kafka_client_test.go
+++ b/pkg/mocks/kafka/kafka_client_test.go
@@ -18,11 +18,12 @@
 
 import (
 	"context"
-	"github.com/opencord/voltha-lib-go/v6/pkg/kafka"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/stretchr/testify/assert"
 	"testing"
 	"time"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+	"github.com/stretchr/testify/assert"
 )
 
 func TestKafkaClientCreateTopic(t *testing.T) {
@@ -50,9 +51,9 @@
 	assert.NotNil(t, ch)
 	testCh := make(chan bool)
 	maxWait := 5 * time.Millisecond
-	msg := &ic.InterContainerMessage{
-		Header: &ic.Header{Id: "1234", ToTopic: topic.Name},
-		Body:   nil,
+	msg := &ic.DeviceReason{
+		DeviceId: "1234",
+		Reason:   "mock",
 	}
 	timer := time.NewTimer(maxWait)
 	defer timer.Stop()
diff --git a/pkg/mocks/kafka/kafka_inter_container_proxy.go b/pkg/mocks/kafka/kafka_inter_container_proxy.go
deleted file mode 100644
index 10fcaf0..0000000
--- a/pkg/mocks/kafka/kafka_inter_container_proxy.go
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka
-
-import (
-	"context"
-
-	"github.com/gogo/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
-	"github.com/golang/protobuf/ptypes/any"
-	"github.com/opencord/voltha-lib-go/v6/pkg/kafka"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-)
-
-type InvokeRpcArgs struct {
-	Rpc             string
-	ToTopic         *kafka.Topic
-	ReplyToTopic    *kafka.Topic
-	WaitForResponse bool
-	Key             string
-	ParentDeviceId  string
-	KvArgs          map[int]interface{}
-}
-
-type InvokeRpcSpy struct {
-	CallCount int
-	Calls     map[int]InvokeRpcArgs
-	Timeout   bool
-	Response  proto.Message
-}
-
-type InvokeAsyncRpcSpy struct {
-	CallCount int
-	Calls     map[int]InvokeRpcArgs
-	Timeout   bool
-	Response  proto.Message
-}
-
-type MockKafkaICProxy struct {
-	InvokeRpcSpy InvokeRpcSpy
-}
-
-func (s *MockKafkaICProxy) Start(ctx context.Context) error { return nil }
-func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
-	t := kafka.Topic{
-		Name: "test-topic",
-	}
-	return &t
-}
-
-func (s *MockKafkaICProxy) DeleteTopic(ctx context.Context, topic kafka.Topic) error { return nil }
-
-func (s *MockKafkaICProxy) Stop(ctx context.Context) {}
-
-func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
-	waitForResponse bool, key string, kvArgs ...*kafka.KVArg) chan *kafka.RpcResponse {
-
-	args := make(map[int]interface{}, 4)
-	for k, v := range kvArgs {
-		args[k] = v
-	}
-
-	s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
-		Rpc:             rpc,
-		ToTopic:         toTopic,
-		ReplyToTopic:    replyToTopic,
-		WaitForResponse: waitForResponse,
-		Key:             key,
-		KvArgs:          args,
-	}
-
-	chnl := make(chan *kafka.RpcResponse)
-
-	return chnl
-}
-
-func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
-	s.InvokeRpcSpy.CallCount++
-
-	success := true
-
-	args := make(map[int]interface{}, 4)
-	for k, v := range kvArgs {
-		args[k] = v
-	}
-
-	s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
-		Rpc:             rpc,
-		ToTopic:         toTopic,
-		ReplyToTopic:    replyToTopic,
-		WaitForResponse: waitForResponse,
-		Key:             key,
-		KvArgs:          args,
-	}
-
-	var response any.Any
-	if s.InvokeRpcSpy.Timeout {
-
-		success = false
-
-		err := &ic.Error{Reason: "context deadline exceeded", Code: ic.ErrorCode_DEADLINE_EXCEEDED}
-		res, _ := ptypes.MarshalAny(err)
-		response = *res
-	} else {
-		res, _ := ptypes.MarshalAny(s.InvokeRpcSpy.Response)
-		response = *res
-	}
-
-	return success, &response
-}
-func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic kafka.Topic, handler interface{}) error {
-	return nil
-}
-func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic kafka.Topic, initialOffset int64) error {
-	return nil
-}
-func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic kafka.Topic) error {
-	return nil
-}
-func (s *MockKafkaICProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
-	return nil
-}
-func (s *MockKafkaICProxy) SendLiveness(ctx context.Context) error { return nil }