[VOL-2831] - Multiple adapter support

This commit introduces the necessary APIs needed to support
multiple adapters.  It uses the number of replicas of a given
adapter and consistent hashing to determine the target of a
given request.

The endpoint_manager.go provides two APIs that will be needed
by components communicating over kafka:
 - GetEndPoint() : to be called before sending a request to kafka
 - IsDeviceOwnedByService(): used during device reconciliation

A change is made to the adapter_proxy.go to use this new mechanism
when sending a request to an adapter from another adapter.

The mocks directory was refactored to get around circular package
dependencies.  This implies any component using these mocks will
need to adjust to the new set of directories when using this
library version.

Change-Id: I470cd62fcfd04edc1fd4508400c9619cadaab25a
diff --git a/pkg/mocks/kafka/kafka_inter_container_proxy.go b/pkg/mocks/kafka/kafka_inter_container_proxy.go
new file mode 100644
index 0000000..34aec95
--- /dev/null
+++ b/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"context"
+	"github.com/gogo/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+)
+
+type InvokeRpcArgs struct {
+	Rpc             string
+	ToTopic         *kafka.Topic
+	ReplyToTopic    *kafka.Topic
+	WaitForResponse bool
+	Key             string
+	ParentDeviceId  string
+	KvArgs          map[int]interface{}
+}
+
+type InvokeRpcSpy struct {
+	CallCount int
+	Calls     map[int]InvokeRpcArgs
+	Timeout   bool
+	Response  proto.Message
+}
+
+type InvokeAsyncRpcSpy struct {
+	CallCount int
+	Calls     map[int]InvokeRpcArgs
+	Timeout   bool
+	Response  proto.Message
+}
+
+type MockKafkaICProxy struct {
+	InvokeRpcSpy InvokeRpcSpy
+}
+
+func (s *MockKafkaICProxy) Start() error { return nil }
+func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
+	t := kafka.Topic{
+		Name: "test-topic",
+	}
+	return &t
+}
+func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
+func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
+	return nil
+}
+func (s *MockKafkaICProxy) Stop() {}
+
+func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
+	waitForResponse bool, key string, kvArgs ...*kafka.KVArg) chan *kafka.RpcResponse {
+
+	args := make(map[int]interface{}, 4)
+	for k, v := range kvArgs {
+		args[k] = v
+	}
+
+	s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
+		Rpc:             rpc,
+		ToTopic:         toTopic,
+		ReplyToTopic:    replyToTopic,
+		WaitForResponse: waitForResponse,
+		Key:             key,
+		KvArgs:          args,
+	}
+
+	chnl := make(chan *kafka.RpcResponse)
+
+	return chnl
+}
+
+func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
+	s.InvokeRpcSpy.CallCount++
+
+	success := true
+
+	args := make(map[int]interface{}, 4)
+	for k, v := range kvArgs {
+		args[k] = v
+	}
+
+	s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
+		Rpc:             rpc,
+		ToTopic:         toTopic,
+		ReplyToTopic:    replyToTopic,
+		WaitForResponse: waitForResponse,
+		Key:             key,
+		KvArgs:          args,
+	}
+
+	var response any.Any
+	if s.InvokeRpcSpy.Timeout {
+
+		success = false
+
+		err := &ic.Error{Reason: "context deadline exceeded", Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+		res, _ := ptypes.MarshalAny(err)
+		response = *res
+	} else {
+		res, _ := ptypes.MarshalAny(s.InvokeRpcSpy.Response)
+		response = *res
+	}
+
+	return success, &response
+}
+func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(topic kafka.Topic, handler interface{}) error {
+	return nil
+}
+func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(topic kafka.Topic, initialOffset int64) error {
+	return nil
+}
+func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(topic kafka.Topic) error { return nil }
+func (s *MockKafkaICProxy) EnableLivenessChannel(enable bool) chan bool           { return nil }
+func (s *MockKafkaICProxy) SendLiveness() error                                   { return nil }