[VOL-2831] - Multiple adapter support

This commit introduces the necessary APIs needed to support
multiple adapters.  It uses the number of replicas of a given
adapter and consistent hashing to determine the target of a
given request.

The endpoint_manager.go provides two APIs that will be needed
by components communicating over kafka:
 - GetEndPoint() : to be called before sending a request to kafka
 - IsDeviceOwnedByService(): used during device reconciliation

A change is made to the adapter_proxy.go to use this new mechanism
when sending a request to an adapter from another adapter.

The mocks directory was refactored to get around circular package
dependencies.  This implies any component using these mocks will
need to adjust to the new set of directories when using this
library version.

Change-Id: I470cd62fcfd04edc1fd4508400c9619cadaab25a
diff --git a/pkg/mocks/kafka/kafka_client_test.go b/pkg/mocks/kafka/kafka_client_test.go
new file mode 100644
index 0000000..0e35ec1
--- /dev/null
+++ b/pkg/mocks/kafka/kafka_client_test.go
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"testing"
+	"time"
+
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestKafkaClientCreateTopic(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	err := cTkc.CreateTopic(&topic, 1, 1)
+	assert.Nil(t, err)
+	err = cTkc.CreateTopic(&topic, 1, 1)
+	assert.NotNil(t, err)
+}
+
+func TestKafkaClientDeleteTopic(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	err := cTkc.DeleteTopic(&topic)
+	assert.Nil(t, err)
+}
+
+func TestKafkaClientSubscribeSend(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	ch, err := cTkc.Subscribe(&topic)
+	assert.Nil(t, err)
+	assert.NotNil(t, ch)
+	testCh := make(chan bool)
+	maxWait := 5 * time.Millisecond
+	msg := &ic.InterContainerMessage{
+		Header: &ic.Header{Id: "1234", ToTopic: topic.Name},
+		Body:   nil,
+	}
+	timer := time.NewTimer(maxWait)
+	defer timer.Stop()
+	go func() {
+		select {
+		case val, ok := <-ch:
+			assert.True(t, ok)
+			assert.Equal(t, val, msg)
+			testCh <- true
+		case <-timer.C:
+			testCh <- false
+		}
+	}()
+	err = cTkc.Send(msg, &topic)
+	assert.Nil(t, err)
+	res := <-testCh
+	assert.True(t, res)
+}
+
+func TestKafkaClientUnSubscribe(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	ch, err := cTkc.Subscribe(&topic)
+	assert.Nil(t, err)
+	assert.NotNil(t, ch)
+	err = cTkc.UnSubscribe(&topic, ch)
+	assert.Nil(t, err)
+}
+
+func TestKafkaClientStop(t *testing.T) {
+	cTkc := NewKafkaClient()
+	topic := kafka.Topic{Name: "myTopic"}
+	ch, err := cTkc.Subscribe(&topic)
+	assert.Nil(t, err)
+	assert.NotNil(t, ch)
+	err = cTkc.UnSubscribe(&topic, ch)
+	assert.Nil(t, err)
+	cTkc.Stop()
+}