[VOL-2831] - Multiple adapter support

This commit introduces the necessary APIs needed to support
multiple adapters.  It uses the number of replicas of a given
adapter and consistent hashing to determine the target of a
given request.

The endpoint_manager.go provides two APIs that will be needed
by components communicating over kafka:
 - GetEndPoint() : to be called before sending a request to kafka
 - IsDeviceOwnedByService(): used during device reconciliation

A change is made to the adapter_proxy.go to use this new mechanism
when sending a request to an adapter from another adapter.

The mocks directory was refactored to get around circular package
dependencies.  This implies any component using these mocks will
need to adjust to the new set of directories when using this
library version.

Change-Id: I470cd62fcfd04edc1fd4508400c9619cadaab25a
diff --git a/pkg/mocks/kafka/kafka_client.go b/pkg/mocks/kafka/kafka_client.go
new file mode 100644
index 0000000..5922ce2
--- /dev/null
+++ b/pkg/mocks/kafka/kafka_client.go
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"fmt"
+	"sync"
+
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+// static check to ensure KafkaClient implements kafka.Client
+var _ kafka.Client = &KafkaClient{}
+
+type KafkaClient struct {
+	topicsChannelMap map[string][]chan *ic.InterContainerMessage
+	lock             sync.RWMutex
+}
+
+func NewKafkaClient() *KafkaClient {
+	return &KafkaClient{
+		topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
+		lock:             sync.RWMutex{},
+	}
+}
+
+func (kc *KafkaClient) Start() error {
+	logger.Debug("kafka-client-started")
+	return nil
+}
+
+func (kc *KafkaClient) Stop() {
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	for topic, chnls := range kc.topicsChannelMap {
+		for _, c := range chnls {
+			close(c)
+		}
+		delete(kc.topicsChannelMap, topic)
+	}
+	logger.Debug("kafka-client-stopped")
+}
+
+func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
+	logger.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	if _, ok := kc.topicsChannelMap[topic.Name]; ok {
+		return fmt.Errorf("Topic %s already exist", topic.Name)
+	}
+	ch := make(chan *ic.InterContainerMessage)
+	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+	return nil
+}
+
+func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
+	logger.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	delete(kc.topicsChannelMap, topic.Name)
+	return nil
+}
+
+func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+	logger.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	ch := make(chan *ic.InterContainerMessage)
+	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
+	return ch, nil
+}
+
+func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
+	s[i] = s[len(s)-1]
+	return s[:len(s)-1]
+}
+
+func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+	logger.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
+		idx := -1
+		for i, c := range chnls {
+			if c == ch {
+				close(c)
+				idx = i
+			}
+		}
+		if idx >= 0 {
+			kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
+		}
+	}
+	return nil
+}
+
+func (kc *KafkaClient) SubscribeForMetadata(_ func(fromTopic string, timestamp int64)) {
+	logger.Debug("SubscribeForMetadata - unimplemented")
+}
+
+func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
+	req, ok := msg.(*ic.InterContainerMessage)
+	if !ok {
+		return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
+	}
+	if req == nil {
+		return status.Error(codes.InvalidArgument, "msg-nil")
+	}
+	kc.lock.RLock()
+	defer kc.lock.RUnlock()
+	for _, ch := range kc.topicsChannelMap[topic.Name] {
+		logger.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
+		ch <- req
+	}
+	return nil
+}
+
+func (kc *KafkaClient) SendLiveness() error {
+	return status.Error(codes.Unimplemented, "SendLiveness")
+}
+
+func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
+	logger.Debug("EnableLivenessChannel - unimplemented")
+	return nil
+}
+
+func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
+	logger.Debug("EnableHealthinessChannel - unimplemented")
+	return nil
+}