[VOL-2831] - Multiple adapter support

This commit introduces the necessary APIs needed to support
multiple adapters.  It uses the number of replicas of a given
adapter and consistent hashing to determine the target of a
given request.

The endpoint_manager.go provides two APIs that will be needed
by components communicating over kafka:
 - GetEndPoint() : to be called before sending a request to kafka
 - IsDeviceOwnedByService(): used during device reconciliation

A change is made to the adapter_proxy.go to use this new mechanism
when sending a request to an adapter from another adapter.

The mocks directory was refactored to get around circular package
dependencies.  This implies any component using these mocks will
need to adjust to the new set of directories when using this
library version.

Change-Id: I470cd62fcfd04edc1fd4508400c9619cadaab25a
diff --git a/pkg/adapters/common/adapter_proxy.go b/pkg/adapters/common/adapter_proxy.go
index 02fa3de..bbae0ed 100644
--- a/pkg/adapters/common/adapter_proxy.go
+++ b/pkg/adapters/common/adapter_proxy.go
@@ -17,6 +17,7 @@
 
 import (
 	"context"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"time"
 
 	"github.com/golang/protobuf/proto"
@@ -32,14 +33,17 @@
 	kafkaICProxy kafka.InterContainerProxy
 	adapterTopic string
 	coreTopic    string
+	endpointMgr  kafka.EndpointManager
 }
 
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
-	var proxy AdapterProxy
-	proxy.kafkaICProxy = kafkaProxy
-	proxy.adapterTopic = adapterTopic
-	proxy.coreTopic = coreTopic
-	logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+	proxy := AdapterProxy{
+		kafkaICProxy: kafkaProxy,
+		adapterTopic: adapterTopic,
+		coreTopic:    coreTopic,
+		endpointMgr:  kafka.NewEndpointManager(backend),
+	}
+	logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
 	return &proxy
 }
 
@@ -87,7 +91,11 @@
 	}
 
 	// Set up the required rpc arguments
-	topic := kafka.Topic{Name: toAdapter}
+	endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
+	if err != nil {
+		return err
+	}
+	topic := kafka.Topic{Name: string(endpoint)}
 	replyToTopic := kafka.Topic{Name: fromAdapter}
 	rpc := "process_inter_adapter_message"
 
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index ee9c5b0..a1b4290 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -19,7 +19,7 @@
 	"context"
 	adapterIf "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mocks "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/stretchr/testify/assert"
diff --git a/pkg/db/backend_test.go b/pkg/db/backend_test.go
index 0cceb00..865d239 100644
--- a/pkg/db/backend_test.go
+++ b/pkg/db/backend_test.go
@@ -22,7 +22,7 @@
 	"testing"
 	"time"
 
-	"github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mocks "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
 	"github.com/phayes/freeport"
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/grpc/codes"
diff --git a/pkg/kafka/endpoint_manager.go b/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..4c13c76
--- /dev/null
+++ b/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"fmt"
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"sync"
+)
+
+const (
+	// All the values below can be tuned to get optimal data distribution.  The numbers below seems to work well when
+	// supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+	// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+	DefaultPartitionCount = 1117
+
+	// Represents how many times a node is replicated on the consistent ring.
+	DefaultReplicationFactor = 117
+
+	// Load is used to calculate average load.
+	DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance.  When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+	// GetEndpoint is called to get the endpoint to communicate with for a specific device and service type.  For
+	// now this will return the topic name
+	GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+
+	// IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+	// devices owned by that service need to be reconciled
+	IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+	// getReplicaAssignment returns the replica number of the service that owns the deviceID.  This is used by the
+	// test only
+	getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+	id             string // Id of the service.  The same id is used for all replicas
+	totalReplicas  int32
+	replicas       map[ReplicaID]Endpoint
+	consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+	partitionCount           int
+	replicationFactor        int
+	load                     float64
+	backend                  *db.Backend
+	services                 map[string]*service
+	servicesLock             sync.RWMutex
+	deviceTypeServiceMap     map[string]string
+	deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.partitionCount = count
+	}
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.replicationFactor = replicas
+	}
+}
+
+func Load(load float64) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.load = load
+	}
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	tm := &endpointManager{
+		partitionCount:       DefaultPartitionCount,
+		replicationFactor:    DefaultReplicationFactor,
+		load:                 DefaultLoad,
+		backend:              backend,
+		services:             make(map[string]*service),
+		deviceTypeServiceMap: make(map[string]string),
+	}
+
+	for _, option := range opts {
+		option(tm)
+	}
+	return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
+	logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return "", err
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	endpoint := m.getEndPoint()
+	if endpoint == "" {
+		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+	}
+	logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+	return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+	logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return false, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return 0, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
+	serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+	if err != nil {
+		return nil, err
+	}
+	key := ep.makeKey(deviceID, dType, serviceType)
+	return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+	// Check whether service exist
+	ep.servicesLock.RLock()
+	serv, serviceExist := ep.services[serviceType]
+	ep.servicesLock.RUnlock()
+
+	// Load the service and device types if needed
+	if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+		if err := ep.loadServices(); err != nil {
+			return nil, "", err
+		}
+
+		// Check whether the service exists now
+		ep.servicesLock.RLock()
+		serv, serviceExist = ep.services[serviceType]
+		ep.servicesLock.RUnlock()
+		if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+			return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+		}
+	}
+
+	ep.deviceTypeServiceMapLock.RLock()
+	defer ep.deviceTypeServiceMapLock.RUnlock()
+	for dType, sType := range ep.deviceTypeServiceMap {
+		if sType == serviceType {
+			return serv, dType, nil
+		}
+	}
+	return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+	return consistent.Config{
+		PartitionCount:    ep.partitionCount,
+		ReplicationFactor: ep.replicationFactor,
+		Load:              ep.load,
+		Hasher:            hasher{},
+	}
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices() error {
+	ep.servicesLock.Lock()
+	defer ep.servicesLock.Unlock()
+	ep.deviceTypeServiceMapLock.Lock()
+	defer ep.deviceTypeServiceMapLock.Unlock()
+
+	if ep.backend == nil {
+		return status.Error(codes.Aborted, "backend-not-set")
+	}
+	ep.services = make(map[string]*service)
+	ep.deviceTypeServiceMap = make(map[string]string)
+
+	// Load the adapters
+	blobs, err := ep.backend.List(context.Background(), "adapters")
+	if err != nil {
+		return err
+	}
+
+	// Data is marshalled as proto bytes in the data store
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		adapter := &voltha.Adapter{}
+		if err := proto.Unmarshal(data, adapter); err != nil {
+			return err
+		}
+		// A valid adapter should have the vendorID set
+		if adapter.Vendor != "" {
+			if _, ok := ep.services[adapter.Type]; !ok {
+				ep.services[adapter.Type] = &service{
+					id:             adapter.Type,
+					totalReplicas:  adapter.TotalReplicas,
+					replicas:       make(map[ReplicaID]Endpoint),
+					consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+				}
+
+			}
+			currentReplica := ReplicaID(adapter.CurrentReplica)
+			endpoint := Endpoint(adapter.Endpoint)
+			ep.services[adapter.Type].replicas[currentReplica] = endpoint
+			ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+		}
+	}
+	// Load the device types
+	blobs, err = ep.backend.List(context.Background(), "device_types")
+	if err != nil {
+		return err
+	}
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		deviceType := &voltha.DeviceType{}
+		if err := proto.Unmarshal(data, deviceType); err != nil {
+			return err
+		}
+		if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+			ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+		}
+	}
+
+	// Log the loaded data in debug mode to facilitate trouble shooting
+	if logger.V(log.DebugLevel) {
+		for key, val := range ep.services {
+			members := val.consistentRing.GetMembers()
+			logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+			for _, m := range members {
+				n := m.(Member)
+				logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+			}
+		}
+		logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+	}
+	return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+	return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function.  Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+	return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+	String() string
+	getReplica() ReplicaID
+	getEndPoint() Endpoint
+	getID() string
+	getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+	id          string
+	serviceType string
+	vendor      string
+	version     string
+	replica     ReplicaID
+	endpoint    Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+	return &member{
+		id:          ID,
+		serviceType: serviceType,
+		vendor:      vendor,
+		version:     version,
+		replica:     replica,
+		endpoint:    endPoint,
+	}
+}
+
+func (m *member) String() string {
+	return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+	return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+	return m.endpoint
+}
+
+func (m *member) getID() string {
+	return m.id
+}
+
+func (m *member) getServiceType() string {
+	return m.serviceType
+}
diff --git a/pkg/kafka/endpoint_manager_test.go b/pkg/kafka/endpoint_manager_test.go
new file mode 100644
index 0000000..0ed5a3d
--- /dev/null
+++ b/pkg/kafka/endpoint_manager_test.go
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"fmt"
+	"github.com/golang/protobuf/proto"
+	"github.com/google/uuid"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"github.com/phayes/freeport"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"math"
+	"testing"
+	"time"
+)
+
+type EPTest struct {
+	etcdServer  *etcd.EtcdServer
+	backend     *db.Backend
+	maxReplicas int
+	minReplicas int
+}
+
+func newEPTest(minReplicas, maxReplicas int) *EPTest {
+	test := &EPTest{
+		minReplicas: minReplicas,
+		maxReplicas: maxReplicas,
+	}
+
+	// Create backend
+	if err := test.initBackend(); err != nil {
+		logger.Fatalw("setting-backend-failed", log.Fields{"error": err})
+	}
+
+	// Populate backend with data
+	if err := test.populateBackend(); err != nil {
+		logger.Fatalw("populating-db-failed", log.Fields{"error": err})
+	}
+	return test
+}
+
+func (ep *EPTest) initBackend() error {
+	configName := "voltha-lib.kafka.ep.test"
+	storageDir := "voltha-lib.kafka.ep.etcd"
+	logLevel := "error"
+	timeout := time.Duration(5 * time.Second)
+
+	kvClientPort, err := freeport.GetFreePort()
+	if err != nil {
+		return err
+	}
+	peerPort, err := freeport.GetFreePort()
+	if err != nil {
+		return err
+	}
+	ep.etcdServer = etcd.StartEtcdServer(etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+	if ep.etcdServer == nil {
+		return status.Error(codes.Internal, "Embedded server failed to start")
+	}
+
+	ep.backend = db.NewBackend("etcd", "127.0.0.1", kvClientPort, int(timeout.Milliseconds()), "service/voltha")
+	return nil
+}
+
+func (ep *EPTest) stopAll() {
+	if ep.etcdServer != nil {
+		ep.etcdServer.Stop()
+	}
+}
+
+func (ep *EPTest) populateBackend() error {
+	// Add an adapter with multiple replicas
+	adapterPrefix := "adapter_brcm_openomci_onu"
+	numReplicas := ep.maxReplicas
+	for i := 0; i < numReplicas; i++ {
+		adapter := &voltha.Adapter{
+			Id:             fmt.Sprintf("%s_%d", adapterPrefix, i),
+			Vendor:         "VOLTHA OpenONU",
+			Version:        "2.4.0-dev0",
+			Type:           adapterPrefix,
+			CurrentReplica: int32(i),
+			TotalReplicas:  int32(numReplicas),
+			Endpoint:       fmt.Sprintf("%s_%d", adapterPrefix, i),
+		}
+		adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
+		blob, err := proto.Marshal(adapter)
+		if err != nil {
+			return err
+		}
+		if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
+			return err
+		}
+	}
+
+	// Add an adapter with minreplicas
+	adapterPrefix = "adapter_openolt"
+	numReplicas = ep.minReplicas
+	for i := 0; i < numReplicas; i++ {
+		adapter := &voltha.Adapter{
+			Id:             fmt.Sprintf("%s_%d", adapterPrefix, i),
+			Vendor:         "VOLTHA OpenOLT",
+			Version:        "2.3.1-dev",
+			Type:           adapterPrefix,
+			CurrentReplica: int32(i),
+			TotalReplicas:  int32(numReplicas),
+			Endpoint:       fmt.Sprintf("%s_%d", adapterPrefix, i),
+		}
+		adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
+		blob, err := proto.Marshal(adapter)
+		if err != nil {
+			return err
+		}
+		if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
+			return err
+		}
+	}
+
+	// Add the brcm_openomci_onu device type
+	dType := "brcm_openomci_onu"
+	adapterName := "adapter_brcm_openomci_onu"
+	deviceType := &voltha.DeviceType{
+		Id:                          dType,
+		VendorIds:                   []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG"},
+		Adapter:                     adapterName,
+		AcceptsAddRemoveFlowUpdates: true,
+	}
+	blob, err := proto.Marshal(deviceType)
+	if err != nil {
+		return err
+	}
+	if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
+		return err
+	}
+
+	// Add the openolt device type
+	dType = "openolt"
+	adapterName = "adapter_openolt"
+	deviceType = &voltha.DeviceType{
+		Id:                          dType,
+		Adapter:                     adapterName,
+		AcceptsAddRemoveFlowUpdates: true,
+	}
+	blob, err = proto.Marshal(deviceType)
+	if err != nil {
+		return err
+	}
+	if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
+		return err
+	}
+	return nil
+}
+
+func getMeanAndStdDeviation(val []int, replicas int) (float64, float64) {
+	var sum, mean, sd float64
+	for i := 0; i < replicas; i++ {
+		sum += float64(val[i])
+	}
+	mean = sum / float64(replicas)
+
+	for j := 0; j < replicas; j++ {
+		sd += math.Pow(float64(val[j])-mean, 2)
+	}
+	sd = math.Sqrt(sd / float64(replicas))
+	return mean, sd
+}
+
+func (ep *EPTest) testEndpointManagerAPIs(t *testing.T, tm EndpointManager, serviceType string, deviceType string, replicas int) {
+	// Map of device ids to topic
+	deviceIDs := make(map[string]Endpoint)
+	numDevices := 1000
+	total := make([]int, replicas)
+	for i := 0; i < numDevices; i++ {
+		deviceID := uuid.New().String()
+		endpoint, err := tm.GetEndpoint(deviceID, serviceType)
+		if err != nil {
+			logger.Fatalw("error-getting-endpoint", log.Fields{"error": err})
+		}
+		deviceIDs[deviceID] = endpoint
+		replicaID, err := tm.getReplicaAssignment(deviceID, serviceType)
+		if err != nil {
+			logger.Fatalw("error-getting-endpoint", log.Fields{"error": err})
+		}
+		total[replicaID] += 1
+	}
+
+	mean, sdtDev := getMeanAndStdDeviation(total, replicas)
+	fmt.Println(fmt.Sprintf("Device distributions => devices:%d service_replicas:%d mean:%d standard_deviation:%d, distributions:%v", numDevices, replicas, int(mean), int(sdtDev), total))
+
+	// Verify that we get the same topic for a given device ID, irrespective of the number of iterations
+	numIterations := 10
+	for i := 0; i < numIterations; i++ {
+		for deviceID, expectedEndpoint := range deviceIDs {
+			endpointByServiceType, err := tm.GetEndpoint(deviceID, serviceType)
+			if err != nil {
+				logger.Fatalw("error-getting-endpoint", log.Fields{"error": err})
+			}
+			assert.Equal(t, expectedEndpoint, endpointByServiceType)
+		}
+	}
+
+	// Verify that a device belong to the correct node
+	for deviceID := range deviceIDs {
+		replicaID, err := tm.getReplicaAssignment(deviceID, serviceType)
+		if err != nil {
+			logger.Fatalw("error-getting-topic", log.Fields{"error": err})
+		}
+		for k := 0; k < replicas; k++ {
+			owned, err := tm.IsDeviceOwnedByService(deviceID, serviceType, int32(k))
+			if err != nil {
+				logger.Fatalw("error-verifying-device-ownership", log.Fields{"error": err})
+			}
+			assert.Equal(t, ReplicaID(k) == replicaID, owned)
+		}
+	}
+}
+
+func TestEndpointManagerSuite(t *testing.T) {
+	tmt := newEPTest(1, 10)
+	assert.NotNil(t, tmt)
+
+	tm := NewEndpointManager(
+		tmt.backend,
+		PartitionCount(1117),
+		ReplicationFactor(200),
+		Load(1.1))
+
+	defer tmt.stopAll()
+
+	//1. Test APIs with multiple replicas
+	tmt.testEndpointManagerAPIs(t, tm, "adapter_brcm_openomci_onu", "brcm_openomci_onu", tmt.maxReplicas)
+
+	//2. Test APIs with single replica
+	tmt.testEndpointManagerAPIs(t, tm, "adapter_openolt", "openolt", tmt.minReplicas)
+}
diff --git a/pkg/mocks/common.go b/pkg/mocks/etcd/common.go
similarity index 98%
rename from pkg/mocks/common.go
rename to pkg/mocks/etcd/common.go
index 90612bb..a45b4b2 100644
--- a/pkg/mocks/common.go
+++ b/pkg/mocks/etcd/common.go
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package mocks
+package etcd
 
 import (
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
diff --git a/pkg/mocks/etcd_server.go b/pkg/mocks/etcd/etcd_server.go
similarity index 99%
rename from pkg/mocks/etcd_server.go
rename to pkg/mocks/etcd/etcd_server.go
index 487b991..b4e201d 100644
--- a/pkg/mocks/etcd_server.go
+++ b/pkg/mocks/etcd/etcd_server.go
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package mocks
+package etcd
 
 import (
 	"fmt"
diff --git a/pkg/mocks/etcd_server_test.go b/pkg/mocks/etcd/etcd_server_test.go
similarity index 99%
rename from pkg/mocks/etcd_server_test.go
rename to pkg/mocks/etcd/etcd_server_test.go
index 7a861d4..daf2491 100644
--- a/pkg/mocks/etcd_server_test.go
+++ b/pkg/mocks/etcd/etcd_server_test.go
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package mocks
+package etcd
 
 import (
 	"context"
diff --git a/pkg/mocks/common.go b/pkg/mocks/kafka/common.go
similarity index 98%
copy from pkg/mocks/common.go
copy to pkg/mocks/kafka/common.go
index 90612bb..05bc5f9 100644
--- a/pkg/mocks/common.go
+++ b/pkg/mocks/kafka/common.go
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package mocks
+package kafka
 
 import (
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
diff --git a/pkg/mocks/kafka_client.go b/pkg/mocks/kafka/kafka_client.go
similarity index 99%
rename from pkg/mocks/kafka_client.go
rename to pkg/mocks/kafka/kafka_client.go
index 38f147e..5922ce2 100644
--- a/pkg/mocks/kafka_client.go
+++ b/pkg/mocks/kafka/kafka_client.go
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package mocks
+package kafka
 
 import (
 	"fmt"
diff --git a/pkg/mocks/kafka_client_test.go b/pkg/mocks/kafka/kafka_client_test.go
similarity index 99%
rename from pkg/mocks/kafka_client_test.go
rename to pkg/mocks/kafka/kafka_client_test.go
index dcf1973..0e35ec1 100644
--- a/pkg/mocks/kafka_client_test.go
+++ b/pkg/mocks/kafka/kafka_client_test.go
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package mocks
+package kafka
 
 import (
 	"testing"
diff --git a/pkg/mocks/kafka_inter_container_proxy.go b/pkg/mocks/kafka/kafka_inter_container_proxy.go
similarity index 99%
rename from pkg/mocks/kafka_inter_container_proxy.go
rename to pkg/mocks/kafka/kafka_inter_container_proxy.go
index 9879830..34aec95 100644
--- a/pkg/mocks/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package mocks
+package kafka
 
 import (
 	"context"