[VOL-2835] Using different topic per ONU device

Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/BUILD.md b/BUILD.md
index 0127947..7572210 100644
--- a/BUILD.md
+++ b/BUILD.md
@@ -1,5 +1,7 @@
 # How to Build and Develop VOLTHA
 
+Please refer to this guide: https://docs.voltha.org/master/overview/dev_virtual.html
+
 ## Building natively on MAC OS X
 
 For advanced developers this may provide a more comfortable developer environment
diff --git a/VERSION b/VERSION
index eef2640..a724a9c 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.0-dev0
+2.4.0-dev
diff --git a/compose/system-test-bbsim.yml b/compose/system-test-bbsim.yml
index 4a6e9d6..3370111 100644
--- a/compose/system-test-bbsim.yml
+++ b/compose/system-test-bbsim.yml
@@ -26,9 +26,9 @@
     environment:
       SERVICE_2181_NAME: "zookeeper"
     ports:
-    - 2181:2181
+      - 2181:2181
     networks:
-    - default
+      - default
     restart: unless-stopped
 
 
@@ -43,9 +43,9 @@
     volumes:
       - /var/run/docker.sock:/var/run/docker.sock
     ports:
-     - 9092:9092
+      - 9092:9092
     networks:
-    - default
+      - default
     restart: unless-stopped
 
 
@@ -63,37 +63,37 @@
       "--initial-cluster-state=new"
     ]
     ports:
-    - "2379:2379"
-    - 2380
-    - 4001
+      - "2379:2379"
+      - 2380
+      - 4001
     networks:
-    - default
+      - default
     restart: unless-stopped
 
 
   rw_core:
     image: "${DOCKER_REGISTRY}${DOCKER_REPOSITORY}voltha-rw-core:${DOCKER_TAG}"
     entrypoint:
-        - /app/rw_core
-        - -kv_store_type=etcd
-        - -kv_store_host=${DOCKER_HOST_IP}
-        - -kv_store_port=2379
-        - -grpc_port=50057
-        - -banner=true
-        - -kafka_adapter_host=${DOCKER_HOST_IP}
-        - -kafka_adapter_port=9092
-        - -kafka_cluster_host=${DOCKER_HOST_IP}
-        - -kafka_cluster_port=9092
-        - -rw_core_topic=rwcore
-        - -kv_store_data_prefix=service/voltha
-        - -in_competing_mode=false
-        - -log_level=DEBUG
+      - /app/rw_core
+      - -kv_store_type=etcd
+      - -kv_store_host=${DOCKER_HOST_IP}
+      - -kv_store_port=2379
+      - -grpc_port=50057
+      - -banner=true
+      - -kafka_adapter_host=${DOCKER_HOST_IP}
+      - -kafka_adapter_port=9092
+      - -kafka_cluster_host=${DOCKER_HOST_IP}
+      - -kafka_cluster_port=9092
+      - -rw_core_topic=rwcore
+      - -kv_store_data_prefix=service/voltha
+      - -in_competing_mode=false
+      - -log_level=DEBUG
     volumes:
-    - "/var/run/docker.sock:/tmp/docker.sock"
+      - "/var/run/docker.sock:/tmp/docker.sock"
     ports:
       - 50057:50057
     networks:
-    - default
+      - default
     restart: unless-stopped
 
 
@@ -108,23 +108,23 @@
     volumes:
     - "/var/run/docker.sock:/tmp/docker.sock"
     networks:
-    - default
+      - default
     restart: unless-stopped
 
 
   onos:
     image: "${DOCKER_REGISTRY}${DOCKER_REPOSITORY}voltha-onos:${DOCKER_TAG}"
     ports:
-    - "8101:8101" # ssh
-    - "6653:6653" # OF
-    - "8181:8181" # UI
+      - "8101:8101" # ssh
+      - "6653:6653" # OF
+      - "8181:8181" # UI
     environment:
       ONOS_APPS: 'drivers,openflow-base'
     volumes:
-    - "/var/run/docker.sock:/tmp/docker.sock"
-    - "./network-cfg-bbsim.json:/root/onos/config/network-cfg.json"
+      - "/var/run/docker.sock:/tmp/docker.sock"
+      - "./network-cfg-bbsim.json:/root/onos/config/network-cfg.json"
     networks:
-    - default
+      - default
     restart: unless-stopped
 
 
@@ -144,7 +144,7 @@
     ports:
       - "50062:50062"
     networks:
-    - default
+      - default
     restart: unless-stopped
 
 
@@ -161,7 +161,7 @@
       "--log_level=DEBUG"
     ]
     networks:
-    - default
+      - default
     restart: unless-stopped
 
 
@@ -180,10 +180,10 @@
       "1"
     ]
     ports:
-    - "50060:50060"
-    - "50074:50074"
+      - "50060:50060"
+      - "50074:50074"
     networks:
-    - default
+      - default
     restart: unless-stopped
 
 
@@ -197,9 +197,9 @@
       - "./radius-clients.conf:/etc/raddb/clients.conf"
       - "./radius-users.conf:/etc/raddb/users"
     ports:
-     - "1812:1812/udp"
-     - "1813:1813"
-     - "18120:18120"
+      - "1812:1812/udp"
+      - "1813:1813"
+      - "18120:18120"
     networks:
-    - default
-    restart: unless-stopped
+      - default
+    restart: unless-stopped
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 2268c09..2a2b789 100644
--- a/go.mod
+++ b/go.mod
@@ -6,8 +6,8 @@
 	github.com/gogo/protobuf v1.3.0
 	github.com/golang/protobuf v1.3.2
 	github.com/google/uuid v1.1.1
-	github.com/opencord/voltha-lib-go/v3 v3.1.0
-	github.com/opencord/voltha-protos/v3 v3.2.8
+	github.com/opencord/voltha-lib-go/v3 v3.1.2
+	github.com/opencord/voltha-protos/v3 v3.3.0
 	github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
 	github.com/stretchr/testify v1.4.0
 	google.golang.org/grpc v1.24.0
diff --git a/go.sum b/go.sum
index 3a03962..61fd9aa 100644
--- a/go.sum
+++ b/go.sum
@@ -4,6 +4,8 @@
 github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
 github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
 github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/Shopify/sarama v1.23.1 h1:XxJBCZEoWJtoWjf/xRbmGUpAmTZGnuuF0ON0EvxxBrs=
 github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
 github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
@@ -25,6 +27,10 @@
 github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d/go.mod h1:f1iKL6ZhUWvbk7PdWVmOaak10o86cqMUYEmn1CZNGEI=
 github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A=
 github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM=
+github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA=
+github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg=
+github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
+github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
 github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73 h1:q1g9lSyo/nOIC3W5E3FK3Unrz8b9LdLXCyuC+ZcpPC0=
 github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73/go.mod h1:507vXsotcZop7NZfBWdhPmVeOse4ko2R7AagJYrpoEg=
 github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
@@ -190,10 +196,10 @@
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
 github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.1.0 h1:3KPBQT2s5HdCo6bwWHpNFeR8KYbDQMmZEq+aHim6U/o=
-github.com/opencord/voltha-lib-go/v3 v3.1.0/go.mod h1:iOFpFIonmGq6nYG4qYWrF5cKyt7UupDLyB4iGXFDRig=
-github.com/opencord/voltha-protos/v3 v3.2.8 h1:FuMEmUsW4WRpIsNCcELM9QBZZwBLRvuw85DVvfpZ1do=
-github.com/opencord/voltha-protos/v3 v3.2.8/go.mod h1:nl1ETp5Iw3avxOaKD8BJlYY5wYI4KeV95aT1pL63nto=
+github.com/opencord/voltha-lib-go/v3 v3.1.2 h1:1BUoi8cljTUlYueGCZCtrWf/a5+KhNupPTILOWEDZVo=
+github.com/opencord/voltha-lib-go/v3 v3.1.2/go.mod h1:ad7C/5/09RcYvGQrxUH4AuOiO8OSQqGmCgEJNEpaag8=
+github.com/opencord/voltha-protos/v3 v3.3.0 h1:1Q1C6nWSkjaJY87GQgc7hWU6kqjkWdM+rzqSXBKb0cQ=
+github.com/opencord/voltha-protos/v3 v3.3.0/go.mod h1:nl1ETp5Iw3avxOaKD8BJlYY5wYI4KeV95aT1pL63nto=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
 github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@@ -234,6 +240,8 @@
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
 github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
 github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
 github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 1ed5b23..faece55 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -32,12 +32,6 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
-// sentinel constants
-const (
-	SentinelAdapterID    = "adapter_sentinel"
-	SentinelDevicetypeID = "device_type_sentinel"
-)
-
 // AdapterAgent represents adapter agent
 type AdapterAgent struct {
 	adapter     *voltha.Adapter
@@ -58,15 +52,6 @@
 	return &adapterAgent
 }
 
-func (aa *AdapterAgent) getDeviceType(deviceType string) *voltha.DeviceType {
-	aa.lock.RLock()
-	defer aa.lock.RUnlock()
-	if _, exist := aa.deviceTypes[deviceType]; exist {
-		return aa.deviceTypes[deviceType]
-	}
-	return nil
-}
-
 func (aa *AdapterAgent) getAdapter() *voltha.Adapter {
 	aa.lock.RLock()
 	defer aa.lock.RUnlock()
@@ -74,12 +59,6 @@
 	return aa.adapter
 }
 
-func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType) {
-	aa.lock.Lock()
-	defer aa.lock.Unlock()
-	aa.deviceTypes[deviceType.Id] = deviceType
-}
-
 // updateCommunicationTime updates the message to the specified time.
 // No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
 func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
@@ -99,7 +78,7 @@
 // AdapterManager represents adapter manager attributes
 type AdapterManager struct {
 	adapterAgents               map[string]*AdapterAgent
-	deviceTypeToAdapterMap      map[string]string
+	deviceTypes                 map[string]*voltha.DeviceType
 	clusterDataProxy            *model.Proxy
 	deviceMgr                   *DeviceManager
 	coreInstanceID              string
@@ -110,12 +89,12 @@
 
 func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
 	aMgr := &AdapterManager{
-		exitChannel:            make(chan int, 1),
-		coreInstanceID:         coreInstanceID,
-		clusterDataProxy:       cdProxy,
-		adapterAgents:          make(map[string]*AdapterAgent),
-		deviceTypeToAdapterMap: make(map[string]string),
-		deviceMgr:              deviceMgr,
+		exitChannel:      make(chan int, 1),
+		coreInstanceID:   coreInstanceID,
+		clusterDataProxy: cdProxy,
+		deviceTypes:      make(map[string]*voltha.DeviceType),
+		adapterAgents:    make(map[string]*AdapterAgent),
+		deviceMgr: deviceMgr,
 	}
 	kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
 	return aMgr
@@ -153,10 +132,6 @@
 				logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
 			}
 		}
-	} else {
-		logger.Debug("no-existing-adapter-found")
-		//	No adapter data.   In order to have a proxy setup for that path let's create a fake adapter
-		return aMgr.addAdapter(&voltha.Adapter{Id: SentinelAdapterID}, true)
 	}
 
 	// Load the device types
@@ -175,8 +150,8 @@
 	}
 
 	logger.Debug("no-existing-device-type-found")
-	//	No device types data.   In order to have a proxy setup for that path let's create a fake device type
-	return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
+
+	return nil
 }
 
 func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
@@ -192,7 +167,8 @@
 func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
 	aMgr.lockAdaptersMap.Lock()
 	defer aMgr.lockAdaptersMap.Unlock()
-	logger.Debugw("adding-adapter", log.Fields{"adapter": adapter})
+	logger.Debugw("adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 	if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
 		if saveToDb {
 			// Save the adapter to the KV store - first check if it already exist
@@ -201,10 +177,17 @@
 				return err
 			} else if !have {
 				if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "adapters", adapter.Id, adapter); err != nil {
-					logger.Errorw("failed-to-save-adapter-to-cluster-proxy", log.Fields{"error": err})
+					logger.Errorw("failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+						"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
 					return err
 				}
-				logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapter": adapter})
+				logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+					"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
+			} else {
+				log.Warnw("adding-adapter-already-in-KV-store", log.Fields{
+					"adapterName":    adapter.Id,
+					"adapterReplica": adapter.CurrentReplica,
+				})
 			}
 		}
 		clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
@@ -223,6 +206,11 @@
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 
+	// create an in memory map to fetch the entire voltha.DeviceType from a device.Type string
+	for _, deviceType := range deviceTypes.Items {
+		aMgr.deviceTypes[deviceType.Id] = deviceType
+	}
+
 	if saveToDb {
 		// Save the device types to the KV store
 		for _, deviceType := range deviceTypes.Items {
@@ -240,17 +228,7 @@
 			}
 		}
 	}
-	// and save locally
-	for _, deviceType := range deviceTypes.Items {
-		clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
-		if adapterAgent, exist := aMgr.adapterAgents[clonedDType.Adapter]; exist {
-			adapterAgent.updateDeviceType(clonedDType)
-		} else {
-			logger.Debugw("adapter-not-exist", log.Fields{"deviceTypes": deviceTypes, "adapterId": clonedDType.Adapter})
-			aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: clonedDType.Adapter}, deviceTypes)
-		}
-		aMgr.deviceTypeToAdapterMap[clonedDType.Id] = clonedDType.Adapter
-	}
+
 	return nil
 }
 
@@ -260,9 +238,7 @@
 	defer aMgr.lockAdaptersMap.RUnlock()
 	for _, adapterAgent := range aMgr.adapterAgents {
 		if a := adapterAgent.getAdapter(); a != nil {
-			if a.Id != SentinelAdapterID { // don't report the sentinel
-				result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
-			}
+			result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
 		}
 	}
 	return result, nil
@@ -278,7 +254,8 @@
 }
 
 func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
-	logger.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
+	logger.Debugw("registerAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
 
 	if aMgr.getAdapter(adapter.Id) != nil {
 		//	Already registered - Adapter may have restarted.  Trigger the reconcile process for that adapter
@@ -300,17 +277,20 @@
 		return nil, err
 	}
 
-	logger.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
+	logger.Debugw("adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 
 	return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
 }
 
-//getAdapterName returns the name of the device adapter that service this device type
-func (aMgr *AdapterManager) getAdapterName(deviceType string) (string, error) {
+// getAdapterType returns the name of the device adapter that service this device type
+func (aMgr *AdapterManager) getAdapterType(deviceType string) (string, error) {
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
-	if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
-		return adapterID, nil
+	for _, adapterAgent := range aMgr.adapterAgents {
+		if deviceType == adapterAgent.adapter.Type {
+			return adapterAgent.adapter.Type, nil
+		}
 	}
 	return "", fmt.Errorf("Adapter-not-registered-for-device-type %s", deviceType)
 }
@@ -319,16 +299,12 @@
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 
-	deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypeToAdapterMap))
-	for deviceTypeID, adapterID := range aMgr.deviceTypeToAdapterMap {
-		if adapterAgent, have := aMgr.adapterAgents[adapterID]; have {
-			if deviceType := adapterAgent.getDeviceType(deviceTypeID); deviceType != nil {
-				if deviceType.Id != SentinelDevicetypeID { // don't report the sentinel
-					deviceTypes = append(deviceTypes, deviceType)
-				}
-			}
-		}
+	deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
+
+	for _, deviceType := range aMgr.deviceTypes {
+		deviceTypes = append(deviceTypes, deviceType)
 	}
+
 	return deviceTypes
 }
 
@@ -337,10 +313,9 @@
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 
-	if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
-		if adapterAgent := aMgr.adapterAgents[adapterID]; adapterAgent != nil {
-			return adapterAgent.getDeviceType(deviceType)
-		}
+	if deviceType, exist := aMgr.deviceTypes[deviceType]; exist {
+		return deviceType
 	}
+
 	return nil
 }
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index d3907bb..1e18ba4 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -30,14 +30,16 @@
 	deviceTopicRegistered bool
 	corePairTopic         string
 	kafkaICProxy          kafka.InterContainerProxy
+	endpointManager       kafka.EndpointManager
 }
 
 // NewAdapterProxy will return adapter proxy instance
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
 	return &AdapterProxy{
 		kafkaICProxy:          kafkaProxy,
 		corePairTopic:         corePairTopic,
 		deviceTopicRegistered: false,
+		endpointManager:       endpointManager,
 	}
 }
 
@@ -45,8 +47,14 @@
 	return kafka.Topic{Name: ap.corePairTopic}
 }
 
-func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
-	return kafka.Topic{Name: adapterName}
+func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
+
+	endpoint, err := ap.endpointManager.GetEndpoint(deviceID, adapterType)
+	if err != nil {
+		return nil, err
+	}
+
+	return &kafka.Topic{Name: string(endpoint)}, nil
 }
 
 func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
@@ -69,167 +77,210 @@
 func (ap *AdapterProxy) adoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("adoptDevice", log.Fields{"device-id": device.Id})
 	rpc := "adopt_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	logger.Debugw("adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // disableDevice invokes disable device rpc
 func (ap *AdapterProxy) disableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("disableDevice", log.Fields{"device-id": device.Id})
 	rpc := "disable_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // reEnableDevice invokes reenable device rpc
 func (ap *AdapterProxy) reEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("reEnableDevice", log.Fields{"device-id": device.Id})
 	rpc := "reenable_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // rebootDevice invokes reboot device rpc
 func (ap *AdapterProxy) rebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("rebootDevice", log.Fields{"device-id": device.Id})
 	rpc := "reboot_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // deleteDevice invokes delete device rpc
 func (ap *AdapterProxy) deleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("deleteDevice", log.Fields{"device-id": device.Id})
 	rpc := "delete_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // getOfpDeviceInfo invokes get ofp device info rpc
 func (ap *AdapterProxy) getOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("getOfpDeviceInfo", log.Fields{"device-id": device.Id})
 	rpc := "get_ofp_device_info"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // getOfpPortInfo invokes get ofp port info rpc
 func (ap *AdapterProxy) getOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("getOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // reconcileDevice invokes reconcile device rpc
 func (ap *AdapterProxy) reconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("reconcileDevice", log.Fields{"device-id": device.Id})
 	rpc := "reconcile_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // downloadImage invokes download image rpc
 func (ap *AdapterProxy) downloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("downloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "download_image"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // getImageDownloadStatus invokes get image download status rpc
 func (ap *AdapterProxy) getImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // cancelImageDownload invokes cancel image download rpc
 func (ap *AdapterProxy) cancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("cancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // activateImageUpdate invokes activate image update rpc
 func (ap *AdapterProxy) activateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("activateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // revertImageUpdate invokes revert image update rpc
 func (ap *AdapterProxy) revertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("revertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("packetOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
-	toTopic := ap.getAdapterTopic(deviceType)
+	toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "receive_packet_out"
 	args := []*kafka.KVArg{
 		{Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
@@ -237,13 +288,16 @@
 		{Key: "packet", Value: packet},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
 }
 
 // updateFlowsBulk invokes update flows bulk rpc
 func (ap *AdapterProxy) updateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("updateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "update_flows_bulk"
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
@@ -252,7 +306,7 @@
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // updateFlowsIncremental invokes update flows incremental rpc
@@ -266,7 +320,10 @@
 			"group-to-delete-count": len(groupChanges.ToRemove.Items),
 			"group-to-update-count": len(groupChanges.ToUpdate.Items),
 		})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "update_flows_incrementally"
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
@@ -275,83 +332,101 @@
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // updatePmConfigs invokes update pm configs rpc
 func (ap *AdapterProxy) updatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("updatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "Update_pm_config"
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "pm_configs", Value: pmConfigs},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // simulateAlarm invokes simulate alarm rpc
 func (ap *AdapterProxy) simulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("simulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
 	rpc := "simulate_alarm"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: simulateReq},
 	}
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "disable_port"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
 		{Key: "port", Value: port},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "enable_port"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
 		{Key: "port", Value: port},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // childDeviceLost invokes child device_lost rpc
-func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, pDeviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
-	logger.Debugw("childDeviceLost", log.Fields{"parent-device-id": pDeviceID, "parent-port-no": pPortNo, "onu-id": onuID})
+func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
+	logger.Debugw("childDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
 	rpc := "child_device_lost"
-	toTopic := ap.getAdapterTopic(deviceType)
+	toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
-		{Key: "pDeviceId", Value: &ic.StrType{Val: pDeviceID}},
+		{Key: "pDeviceId", Value: &ic.StrType{Val: deviceID}},
 		{Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
 		{Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
 }
 
 func (ap *AdapterProxy) startOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
 	rpc := "start_omci_test"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
 	// TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
 	//   than including the whole request, which is (deviceid, uuid)
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id,
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
 		&kafka.KVArg{Key: "device", Value: device},
 		&kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
 }
diff --git a/rw_core/core/adapter_proxy_test.go b/rw_core/core/adapter_proxy_test.go
index 8784ff2..718cc30 100755
--- a/rw_core/core/adapter_proxy_test.go
+++ b/rw_core/core/adapter_proxy_test.go
@@ -24,7 +24,7 @@
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -60,7 +60,7 @@
 	var err error
 
 	// Create the KV client
-	kc = lm.NewKafkaClient()
+	kc = mock_kafka.NewKafkaClient()
 
 	// Setup core inter-container proxy and core request handler
 	coreKafkaICProxy = kafka.NewInterContainerProxy(
@@ -98,7 +98,7 @@
 }
 
 func TestCreateAdapterProxy(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	assert.NotNil(t, ap)
 }
 
@@ -119,7 +119,7 @@
 
 func testSimpleRequests(t *testing.T) {
 	type simpleRequest func(context.Context, *voltha.Device) (chan *kafka.RpcResponse, error)
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	simpleRequests := []simpleRequest{
 		ap.adoptDevice,
 		ap.disableDevice,
@@ -162,7 +162,7 @@
 }
 
 func testGetSwitchCapabilityFromAdapter(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
@@ -179,7 +179,7 @@
 }
 
 func testGetPortInfoFromAdapter(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
@@ -197,7 +197,7 @@
 }
 
 func testPacketOut(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	outPort := uint32(1)
 	packet, err := getRandomBytes(50)
@@ -211,7 +211,7 @@
 }
 
 func testFlowUpdates(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	_, err := ap.updateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
 	assert.Nil(t, err)
@@ -226,7 +226,7 @@
 }
 
 func testPmUpdates(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
@@ -236,7 +236,7 @@
 	assert.Nil(t, err)
 }
 
-func TestSuite(t *testing.T) {
+func TestSuiteAdapterProxy(t *testing.T) {
 	//1. Test the simple requests first
 	testSimpleRequests(t)
 
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index 9771619..7591c18 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -30,7 +30,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
 	"google.golang.org/grpc/codes"
@@ -75,7 +75,7 @@
 }
 
 //startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*lm.EtcdServer, int, error) {
+func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
 	kvClientPort, err := freeport.GetFreePort()
 	if err != nil {
 		return nil, 0, err
@@ -84,14 +84,14 @@
 	if err != nil {
 		return nil, 0, err
 	}
-	etcdServer := lm.StartEtcdServer(lm.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
 	if etcdServer == nil {
 		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
 	}
 	return etcdServer, kvClientPort, nil
 }
 
-func stopEmbeddedEtcdServer(server *lm.EtcdServer) {
+func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
 	if server != nil {
 		server.Stop()
 	}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 69391fb..dccd271 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -254,15 +254,16 @@
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
 	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
 	// with the adapter then we need to know the adapter that will handle this request
-	adapterName, err := agent.adapterMgr.getAdapterName(cloned.Type)
+	adapterName, err := agent.adapterMgr.getAdapterType(cloned.Type)
 	if err != nil {
 		return err
 	}
 	cloned.Adapter = adapterName
 
 	if cloned.AdminState == voltha.AdminState_ENABLED {
-		logger.Debugw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
-		return nil
+		logger.Warnw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
+		err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s ", cloned.Id))
+		return err
 	}
 
 	if cloned.AdminState == voltha.AdminState_DELETED {
@@ -1665,14 +1666,18 @@
 	}
 
 	device := agent.getDeviceWithoutLock()
-	adapterName, err := agent.adapterMgr.getAdapterName(device.Type)
-	if err != nil {
-		agent.requestQueue.RequestComplete()
-		return nil, err
+
+	if device.Adapter == "" {
+		adapterName, err := agent.adapterMgr.getAdapterType(device.Type)
+		if err != nil {
+			agent.requestQueue.RequestComplete()
+			return nil, err
+		}
+
+		device.Adapter = adapterName
 	}
 
 	// Send request to the adapter
-	device.Adapter = adapterName
 	ch, err := agent.adapterProxy.startOmciTest(ctx, device, omcitestrequest)
 	agent.requestQueue.RequestComplete()
 	if err != nil {
diff --git a/rw_core/core/device_agent_test.go b/rw_core/core/device_agent_test.go
index 4f1506f..f8fb810 100755
--- a/rw_core/core/device_agent_test.go
+++ b/rw_core/core/device_agent_test.go
@@ -21,7 +21,8 @@
 	"github.com/opencord/voltha-go/rw_core/config"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
@@ -35,7 +36,7 @@
 )
 
 type DATest struct {
-	etcdServer     *lm.EtcdServer
+	etcdServer     *mock_etcd.EtcdServer
 	core           *Core
 	kClient        kafka.Client
 	kvClientPort   int
@@ -57,7 +58,7 @@
 		logger.Fatal(err)
 	}
 	// Create the kafka client
-	test.kClient = lm.NewKafkaClient()
+	test.kClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index afe84e8..18593d8 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -56,12 +56,15 @@
 }
 
 func newDeviceManager(core *Core) *DeviceManager {
+
+	endpointManager := kafka.NewEndpointManager(&core.backend)
+
 	var deviceMgr DeviceManager
 	deviceMgr.core = core
 	deviceMgr.exitChannel = make(chan int, 1)
 	deviceMgr.rootDevices = make(map[string]bool)
 	deviceMgr.kafkaICProxy = core.kmp
-	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic)
+	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic, endpointManager)
 	deviceMgr.coreInstanceID = core.instanceID
 	deviceMgr.clusterDataProxy = core.clusterDataProxy
 	deviceMgr.adapterMgr = core.adapterMgr
@@ -186,7 +189,6 @@
 	} else {
 		res = status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-
 	sendResponse(ctx, ch, res)
 }
 
@@ -601,7 +603,8 @@
 
 // adapterRestarted is invoked whenever an adapter is restarted
 func (dMgr *DeviceManager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
-	logger.Debugw("adapter-restarted", log.Fields{"adapter": adapter.Id})
+	logger.Debugw("adapter-restarted", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 
 	// Let's reconcile the device managed by this Core only
 	if len(dMgr.rootDevices) == 0 {
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 0490c5f..e54c14c 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -34,7 +34,8 @@
 	cm "github.com/opencord/voltha-go/rw_core/mocks"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/version"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -45,7 +46,7 @@
 )
 
 type NBTest struct {
-	etcdServer        *lm.EtcdServer
+	etcdServer        *mock_etcd.EtcdServer
 	core              *Core
 	kClient           kafka.Client
 	kvClientPort      int
@@ -69,7 +70,7 @@
 		logger.Fatal(err)
 	}
 	// Create the kafka client
-	test.kClient = lm.NewKafkaClient()
+	test.kClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-nbi-test"
@@ -114,9 +115,13 @@
 
 	//	Register the adapter
 	registrationData := &voltha.Adapter{
-		Id:      nb.oltAdapterName,
-		Vendor:  "Voltha-olt",
-		Version: version.VersionInfo.Version,
+		Id:             nb.oltAdapterName,
+		Vendor:         "Voltha-olt",
+		Version:        version.VersionInfo.Version,
+		Type:           nb.oltAdapterName,
+		CurrentReplica: 1,
+		TotalReplicas:  1,
+		Endpoint:       nb.oltAdapterName,
 	}
 	types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
 	deviceTypes := &voltha.DeviceTypes{Items: types}
@@ -134,9 +139,13 @@
 
 	//	Register the adapter
 	registrationData = &voltha.Adapter{
-		Id:      nb.onuAdapterName,
-		Vendor:  "Voltha-onu",
-		Version: version.VersionInfo.Version,
+		Id:             nb.onuAdapterName,
+		Vendor:         "Voltha-onu",
+		Version:        version.VersionInfo.Version,
+		Type:           nb.onuAdapterName,
+		CurrentReplica: 1,
+		TotalReplicas:  1,
+		Endpoint:       nb.onuAdapterName,
 	}
 	types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
 	deviceTypes = &voltha.DeviceTypes{Items: types}
@@ -1108,7 +1117,7 @@
 	assert.Nil(t, err)
 }
 
-func TestSuite1(t *testing.T) {
+func TestSuiteNbiApiHandler(t *testing.T) {
 	f, err := os.Create("profile.cpu")
 	if err != nil {
 		logger.Fatalf("could not create CPU profile: %v\n ", err)
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 175fe06..70d809a 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -27,7 +27,8 @@
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
@@ -357,7 +358,7 @@
 }
 
 type LDATest struct {
-	etcdServer     *lm.EtcdServer
+	etcdServer     *mock_etcd.EtcdServer
 	core           *Core
 	kClient        kafka.Client
 	kvClientPort   int
@@ -380,7 +381,7 @@
 		logger.Fatal(err)
 	}
 	// Create the kafka client
-	test.kClient = lm.NewKafkaClient()
+	test.kClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"
diff --git a/vendor/github.com/buraksezer/consistent/.gitignore b/vendor/github.com/buraksezer/consistent/.gitignore
new file mode 100644
index 0000000..a1338d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.gitignore
@@ -0,0 +1,14 @@
+# Binaries for programs and plugins
+*.exe
+*.dll
+*.so
+*.dylib
+
+# Test binary, build with `go test -c`
+*.test
+
+# Output of the go coverage tool, specifically when used with LiteIDE
+*.out
+
+# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
+.glide/
diff --git a/vendor/github.com/buraksezer/consistent/.travis.yml b/vendor/github.com/buraksezer/consistent/.travis.yml
new file mode 100644
index 0000000..4f2ee4d
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/.travis.yml
@@ -0,0 +1 @@
+language: go
diff --git a/vendor/github.com/buraksezer/consistent/LICENSE b/vendor/github.com/buraksezer/consistent/LICENSE
new file mode 100644
index 0000000..e470334
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Burak Sezer
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/buraksezer/consistent/README.md b/vendor/github.com/buraksezer/consistent/README.md
new file mode 100644
index 0000000..bde53d1
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/README.md
@@ -0,0 +1,235 @@
+consistent
+==========
+[![GoDoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/buraksezer/consistent) [![Build Status](https://travis-ci.org/buraksezer/consistent.svg?branch=master)](https://travis-ci.org/buraksezer/consistent) [![Coverage](http://gocover.io/_badge/github.com/buraksezer/consistent)](http://gocover.io/github.com/buraksezer/consistent) [![Go Report Card](https://goreportcard.com/badge/github.com/buraksezer/consistent)](https://goreportcard.com/report/github.com/buraksezer/consistent) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)  
+
+
+This library provides a consistent hashing function which simultaneously achieves both uniformity and consistency. 
+
+For detailed information about the concept, you should take a look at the following resources:
+
+* [Consistent Hashing with Bounded Loads on Google Research Blog](https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html)
+* [Improving load balancing with a new consistent-hashing algorithm on Vimeo Engineering Blog](https://medium.com/vimeo-engineering-blog/improving-load-balancing-with-a-new-consistent-hashing-algorithm-9f1bd75709ed)
+* [Consistent Hashing with Bounded Loads paper on arXiv](https://arxiv.org/abs/1608.01350)
+
+Table of Content
+----------------
+
+- [Overview](#overview)
+- [Install](#install)
+- [Configuration](#configuration)
+- [Usage](#usage)
+- [Benchmarks](#benchmarks)
+- [Examples](#examples)
+
+Overview
+--------
+
+In this package's context, the keys are distributed among partitions and partitions are distributed among members as well. 
+
+When you create a new consistent instance or call `Add/Remove`:
+
+* The member's name is hashed and inserted into the hash ring,
+* Average load is calculated by the algorithm defined in the paper,
+* Partitions are distributed among members by hashing partition IDs and none of them exceed the average load.
+
+Average load cannot be exceeded. So if all members are loaded at the maximum while trying to add a new member, it panics.
+
+When you want to locate a key by calling `LocateKey`:
+
+* The key(byte slice) is hashed,
+* The result of the hash is mod by the number of partitions,
+* The result of this modulo - `MOD(hash result, partition count)` - is the partition in which the key will be located,
+* Owner of the partition is already determined before calling `LocateKey`. So it returns the partition owner immediately.
+
+No memory is allocated by `consistent` except hashing when you want to locate a key.
+
+Note that the number of partitions cannot be changed after creation. 
+
+Install
+-------
+
+With a correctly configured Go environment:
+
+```
+go get github.com/buraksezer/consistent
+```
+
+You will find some useful usage samples in [examples](https://github.com/buraksezer/consistent/tree/master/_examples) folder.
+
+Configuration
+-------------
+
+```go
+type Config struct {
+	// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+	Hasher Hasher
+
+	// Keys are distributed among partitions. Prime numbers are good to
+	// distribute keys uniformly. Select a big PartitionCount if you have
+	// too many keys.
+	PartitionCount int
+
+	// Members are replicated on consistent hash ring. This number controls
+	// the number each member is replicated on the ring.
+	ReplicationFactor int
+
+	// Load is used to calculate average load. See the code, the paper and Google's 
+	// blog post to learn about it.
+	Load float64
+}
+```
+
+Any hash algorithm can be used as hasher which implements Hasher interface. Please take a look at the *Sample* section for an example.
+
+Usage
+-----
+
+`LocateKey` function finds a member in the cluster for your key:
+```go
+// With a properly configured and initialized consistent instance
+key := []byte("my-key")
+member := c.LocateKey(key)
+```
+It returns a thread-safe copy of the member you added before.
+
+The second most frequently used function is `GetClosestN`. 
+
+```go
+// With a properly configured and initialized consistent instance
+
+key := []byte("my-key")
+members, err := c.GetClosestN(key, 2)
+```
+
+This may be useful to find backup nodes to store your key.
+
+Benchmarks
+----------
+On an early 2015 Macbook:
+
+```
+BenchmarkAddRemove-4     	  100000	     22006 ns/op
+BenchmarkLocateKey-4     	 5000000	       252 ns/op
+BenchmarkGetClosestN-4   	  500000	      2974 ns/op
+```
+
+Examples
+--------
+
+The most basic use of consistent package should be like this. For detailed list of functions, [visit godoc.org.](https://godoc.org/github.com/buraksezer/consistent)
+More sample code can be found under [_examples](https://github.com/buraksezer/consistent/tree/master/_examples).
+
+```go
+package main
+
+import (
+	"fmt"
+
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+)
+
+// In your code, you probably have a custom data type 
+// for your cluster members. Just add a String function to implement 
+// consistent.Member interface.
+type myMember string
+
+func (m myMember) String() string {
+	return string(m)
+}
+
+// consistent package doesn't provide a default hashing function. 
+// You should provide a proper one to distribute keys/members uniformly.
+type hasher struct{}
+
+func (h hasher) Sum64(data []byte) uint64 {
+	// you should use a proper hash function for uniformity.
+	return xxhash.Sum64(data)
+}
+
+func main() {
+	// Create a new consistent instance
+	cfg := consistent.Config{
+		PartitionCount:    7,
+		ReplicationFactor: 20,
+		Load:              1.25,
+		Hasher:            hasher{},
+	}
+	c := consistent.New(nil, cfg)
+
+	// Add some members to the consistent hash table.
+	// Add function calculates average load and distributes partitions over members
+	node1 := myMember("node1.olric.com")
+	c.Add(node1)
+
+	node2 := myMember("node2.olric.com")
+	c.Add(node2)
+
+	key := []byte("my-key")
+	// calculates partition id for the given key
+	// partID := hash(key) % partitionCount
+	// the partitions are already distributed among members by Add function.
+	owner := c.LocateKey(key)
+	fmt.Println(owner.String())
+	// Prints node2.olric.com
+}
+```
+
+Another useful example is `_examples/relocation_percentage.go`. It creates a `consistent` object with 8 members and distributes partitions among them. Then adds 9th member, 
+here is the result with a proper configuration and hash function:
+
+```
+bloom:consistent burak$ go run _examples/relocation_percentage.go
+partID: 218 moved to node2.olric from node0.olric
+partID: 173 moved to node9.olric from node3.olric
+partID: 225 moved to node7.olric from node0.olric
+partID:  85 moved to node9.olric from node7.olric
+partID: 220 moved to node5.olric from node0.olric
+partID:  33 moved to node9.olric from node5.olric
+partID: 254 moved to node9.olric from node4.olric
+partID:  71 moved to node9.olric from node3.olric
+partID: 236 moved to node9.olric from node2.olric
+partID: 118 moved to node9.olric from node3.olric
+partID: 233 moved to node3.olric from node0.olric
+partID:  50 moved to node9.olric from node4.olric
+partID: 252 moved to node9.olric from node2.olric
+partID: 121 moved to node9.olric from node2.olric
+partID: 259 moved to node9.olric from node4.olric
+partID:  92 moved to node9.olric from node7.olric
+partID: 152 moved to node9.olric from node3.olric
+partID: 105 moved to node9.olric from node2.olric
+
+6% of the partitions are relocated
+```
+
+Moved partition count is highly dependent on your configuration and quailty of hash function. You should modify the configuration to find an optimum set of configurations
+for your system.
+
+`_examples/load_distribution.go` is also useful to understand load distribution. It creates a `consistent` object with 8 members and locates 1M key. It also calculates average 
+load which cannot be exceeded by any member. Here is the result:
+
+```
+Maximum key count for a member should be around this:  147602
+member: node2.olric, key count: 100362
+member: node5.olric, key count: 99448
+member: node0.olric, key count: 147735
+member: node3.olric, key count: 103455
+member: node6.olric, key count: 147069
+member: node1.olric, key count: 121566
+member: node4.olric, key count: 147932
+member: node7.olric, key count: 132433
+```
+
+Average load can be calculated by using the following formula:
+
+```
+load := (consistent.AverageLoad() * float64(keyCount)) / float64(config.PartitionCount)
+```
+
+Contributions
+-------------
+Please don't hesitate to fork the project and send a pull request or just e-mail me to ask questions and share ideas.
+
+License
+-------
+MIT License, - see LICENSE for more details.
diff --git a/vendor/github.com/buraksezer/consistent/consistent.go b/vendor/github.com/buraksezer/consistent/consistent.go
new file mode 100644
index 0000000..a1446d6
--- /dev/null
+++ b/vendor/github.com/buraksezer/consistent/consistent.go
@@ -0,0 +1,362 @@
+// Copyright (c) 2018 Burak Sezer
+// All rights reserved.
+//
+// This code is licensed under the MIT License.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files(the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions :
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+// Package consistent provides a consistent hashing function with bounded loads.
+// For more information about the underlying algorithm, please take a look at
+// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
+//
+// Example Use:
+// 	cfg := consistent.Config{
+// 		PartitionCount:    71,
+// 		ReplicationFactor: 20,
+// 		Load:              1.25,
+// 		Hasher:            hasher{},
+//	}
+//
+//      // Create a new consistent object
+//      // You may call this with a list of members
+//      // instead of adding them one by one.
+//	c := consistent.New(members, cfg)
+//
+//      // myMember struct just needs to implement a String method.
+//      // New/Add/Remove distributes partitions among members using the algorithm
+//      // defined on Google Research Blog.
+//	c.Add(myMember)
+//
+//	key := []byte("my-key")
+//      // LocateKey hashes the key and calculates partition ID with
+//      // this modulo operation: MOD(hash result, partition count)
+//      // The owner of the partition is already calculated by New/Add/Remove.
+//      // LocateKey just returns the member which's responsible for the key.
+//	member := c.LocateKey(key)
+//
+package consistent
+
+import (
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"math"
+	"sort"
+	"sync"
+)
+
+var (
+	//ErrInsufficientMemberCount represents an error which means there are not enough members to complete the task.
+	ErrInsufficientMemberCount = errors.New("insufficient member count")
+
+	// ErrMemberNotFound represents an error which means requested member could not be found in consistent hash ring.
+	ErrMemberNotFound = errors.New("member could not be found in ring")
+)
+
+// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+// Hasher should minimize collisions (generating same hash for different byte slice)
+// and while performance is also important fast functions are preferable (i.e.
+// you can use FarmHash family).
+type Hasher interface {
+	Sum64([]byte) uint64
+}
+
+// Member interface represents a member in consistent hash ring.
+type Member interface {
+	String() string
+}
+
+// Config represents a structure to control consistent package.
+type Config struct {
+	// Hasher is responsible for generating unsigned, 64 bit hash of provided byte slice.
+	Hasher Hasher
+
+	// Keys are distributed among partitions. Prime numbers are good to
+	// distribute keys uniformly. Select a big PartitionCount if you have
+	// too many keys.
+	PartitionCount int
+
+	// Members are replicated on consistent hash ring. This number means that a member
+	// how many times replicated on the ring.
+	ReplicationFactor int
+
+	// Load is used to calculate average load. See the code, the paper and Google's blog post to learn about it.
+	Load float64
+}
+
+// Consistent holds the information about the members of the consistent hash circle.
+type Consistent struct {
+	mu sync.RWMutex
+
+	config         Config
+	hasher         Hasher
+	sortedSet      []uint64
+	partitionCount uint64
+	loads          map[string]float64
+	members        map[string]*Member
+	partitions     map[int]*Member
+	ring           map[uint64]*Member
+}
+
+// New creates and returns a new Consistent object.
+func New(members []Member, config Config) *Consistent {
+	c := &Consistent{
+		config:         config,
+		members:        make(map[string]*Member),
+		partitionCount: uint64(config.PartitionCount),
+		ring:           make(map[uint64]*Member),
+	}
+	if config.Hasher == nil {
+		panic("Hasher cannot be nil")
+	}
+	// TODO: Check configuration here
+	c.hasher = config.Hasher
+	for _, member := range members {
+		c.add(member)
+	}
+	if members != nil {
+		c.distributePartitions()
+	}
+	return c
+}
+
+// GetMembers returns a thread-safe copy of members.
+func (c *Consistent) GetMembers() []Member {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	// Create a thread-safe copy of member list.
+	members := make([]Member, 0, len(c.members))
+	for _, member := range c.members {
+		members = append(members, *member)
+	}
+	return members
+}
+
+// AverageLoad exposes the current average load.
+func (c *Consistent) AverageLoad() float64 {
+	avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
+	return math.Ceil(avgLoad)
+}
+
+func (c *Consistent) distributeWithLoad(partID, idx int, partitions map[int]*Member, loads map[string]float64) {
+	avgLoad := c.AverageLoad()
+	var count int
+	for {
+		count++
+		if count >= len(c.sortedSet) {
+			// User needs to decrease partition count, increase member count or increase load factor.
+			panic("not enough room to distribute partitions")
+		}
+		i := c.sortedSet[idx]
+		member := *c.ring[i]
+		load := loads[member.String()]
+		if load+1 <= avgLoad {
+			partitions[partID] = &member
+			loads[member.String()]++
+			return
+		}
+		idx++
+		if idx >= len(c.sortedSet) {
+			idx = 0
+		}
+	}
+}
+
+func (c *Consistent) distributePartitions() {
+	loads := make(map[string]float64)
+	partitions := make(map[int]*Member)
+
+	bs := make([]byte, 8)
+	for partID := uint64(0); partID < c.partitionCount; partID++ {
+		binary.LittleEndian.PutUint64(bs, partID)
+		key := c.hasher.Sum64(bs)
+		idx := sort.Search(len(c.sortedSet), func(i int) bool {
+			return c.sortedSet[i] >= key
+		})
+		if idx >= len(c.sortedSet) {
+			idx = 0
+		}
+		c.distributeWithLoad(int(partID), idx, partitions, loads)
+	}
+	c.partitions = partitions
+	c.loads = loads
+}
+
+func (c *Consistent) add(member Member) {
+	for i := 0; i < c.config.ReplicationFactor; i++ {
+		key := []byte(fmt.Sprintf("%s%d", member.String(), i))
+		h := c.hasher.Sum64(key)
+		c.ring[h] = &member
+		c.sortedSet = append(c.sortedSet, h)
+	}
+	// sort hashes ascendingly
+	sort.Slice(c.sortedSet, func(i int, j int) bool {
+		return c.sortedSet[i] < c.sortedSet[j]
+	})
+	// Storing member at this map is useful to find backup members of a partition.
+	c.members[member.String()] = &member
+}
+
+// Add adds a new member to the consistent hash circle.
+func (c *Consistent) Add(member Member) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if _, ok := c.members[member.String()]; ok {
+		// We already have this member. Quit immediately.
+		return
+	}
+	c.add(member)
+	c.distributePartitions()
+}
+
+func (c *Consistent) delSlice(val uint64) {
+	for i := 0; i < len(c.sortedSet); i++ {
+		if c.sortedSet[i] == val {
+			c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
+			break
+		}
+	}
+}
+
+// Remove removes a member from the consistent hash circle.
+func (c *Consistent) Remove(name string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if _, ok := c.members[name]; !ok {
+		// There is no member with that name. Quit immediately.
+		return
+	}
+
+	for i := 0; i < c.config.ReplicationFactor; i++ {
+		key := []byte(fmt.Sprintf("%s%d", name, i))
+		h := c.hasher.Sum64(key)
+		delete(c.ring, h)
+		c.delSlice(h)
+	}
+	delete(c.members, name)
+	if len(c.members) == 0 {
+		// consistent hash ring is empty now. Reset the partition table.
+		c.partitions = make(map[int]*Member)
+		return
+	}
+	c.distributePartitions()
+}
+
+// LoadDistribution exposes load distribution of members.
+func (c *Consistent) LoadDistribution() map[string]float64 {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	// Create a thread-safe copy
+	res := make(map[string]float64)
+	for member, load := range c.loads {
+		res[member] = load
+	}
+	return res
+}
+
+// FindPartitionID returns partition id for given key.
+func (c *Consistent) FindPartitionID(key []byte) int {
+	hkey := c.hasher.Sum64(key)
+	return int(hkey % c.partitionCount)
+}
+
+// GetPartitionOwner returns the owner of the given partition.
+func (c *Consistent) GetPartitionOwner(partID int) Member {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	member, ok := c.partitions[partID]
+	if !ok {
+		return nil
+	}
+	// Create a thread-safe copy of member and return it.
+	return *member
+}
+
+// LocateKey finds a home for given key
+func (c *Consistent) LocateKey(key []byte) Member {
+	partID := c.FindPartitionID(key)
+	return c.GetPartitionOwner(partID)
+}
+
+func (c *Consistent) getClosestN(partID, count int) ([]Member, error) {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	res := []Member{}
+	if count > len(c.members) {
+		return res, ErrInsufficientMemberCount
+	}
+
+	var ownerKey uint64
+	owner := c.GetPartitionOwner(partID)
+	// Hash and sort all the names.
+	keys := []uint64{}
+	kmems := make(map[uint64]*Member)
+	for name, member := range c.members {
+		key := c.hasher.Sum64([]byte(name))
+		if name == owner.String() {
+			ownerKey = key
+		}
+		keys = append(keys, key)
+		kmems[key] = member
+	}
+	sort.Slice(keys, func(i, j int) bool {
+		return keys[i] < keys[j]
+	})
+
+	// Find the key owner
+	idx := 0
+	for idx < len(keys) {
+		if keys[idx] == ownerKey {
+			key := keys[idx]
+			res = append(res, *kmems[key])
+			break
+		}
+		idx++
+	}
+
+	// Find the closest(replica owners) members.
+	for len(res) < count {
+		idx++
+		if idx >= len(keys) {
+			idx = 0
+		}
+		key := keys[idx]
+		res = append(res, *kmems[key])
+	}
+	return res, nil
+}
+
+// GetClosestN returns the closest N member to a key in the hash ring.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestN(key []byte, count int) ([]Member, error) {
+	partID := c.FindPartitionID(key)
+	return c.getClosestN(partID, count)
+}
+
+// GetClosestNForPartition returns the closest N member for given partition.
+// This may be useful to find members for replication.
+func (c *Consistent) GetClosestNForPartition(partID, count int) ([]Member, error) {
+	return c.getClosestN(partID, count)
+}
diff --git a/vendor/github.com/cespare/xxhash/LICENSE.txt b/vendor/github.com/cespare/xxhash/LICENSE.txt
new file mode 100644
index 0000000..24b5306
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/LICENSE.txt
@@ -0,0 +1,22 @@
+Copyright (c) 2016 Caleb Spare
+
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/github.com/cespare/xxhash/README.md b/vendor/github.com/cespare/xxhash/README.md
new file mode 100644
index 0000000..0982fd2
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/README.md
@@ -0,0 +1,50 @@
+# xxhash
+
+[![GoDoc](https://godoc.org/github.com/cespare/xxhash?status.svg)](https://godoc.org/github.com/cespare/xxhash)
+
+xxhash is a Go implementation of the 64-bit
+[xxHash](http://cyan4973.github.io/xxHash/) algorithm, XXH64. This is a
+high-quality hashing algorithm that is much faster than anything in the Go
+standard library.
+
+The API is very small, taking its cue from the other hashing packages in the
+standard library:
+
+    $ go doc github.com/cespare/xxhash                                                                                                                                                                                              !
+    package xxhash // import "github.com/cespare/xxhash"
+
+    Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
+    at http://cyan4973.github.io/xxHash/.
+
+    func New() hash.Hash64
+    func Sum64(b []byte) uint64
+    func Sum64String(s string) uint64
+
+This implementation provides a fast pure-Go implementation and an even faster
+assembly implementation for amd64.
+
+## Benchmarks
+
+Here are some quick benchmarks comparing the pure-Go and assembly
+implementations of Sum64 against another popular Go XXH64 implementation,
+[github.com/OneOfOne/xxhash](https://github.com/OneOfOne/xxhash):
+
+| input size | OneOfOne | cespare (purego) | cespare |
+| --- | --- | --- | --- |
+| 5 B   |  416 MB/s | 720 MB/s |  872 MB/s  |
+| 100 B | 3980 MB/s | 5013 MB/s | 5252 MB/s  |
+| 4 KB  | 12727 MB/s | 12999 MB/s | 13026 MB/s |
+| 10 MB | 9879 MB/s | 10775 MB/s | 10913 MB/s  |
+
+These numbers were generated with:
+
+```
+$ go test -benchtime 10s -bench '/OneOfOne,'
+$ go test -tags purego -benchtime 10s -bench '/xxhash,'
+$ go test -benchtime 10s -bench '/xxhash,'
+```
+
+## Projects using this package
+
+- [InfluxDB](https://github.com/influxdata/influxdb)
+- [Prometheus](https://github.com/prometheus/prometheus)
diff --git a/vendor/github.com/cespare/xxhash/go.mod b/vendor/github.com/cespare/xxhash/go.mod
new file mode 100644
index 0000000..10605a6
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/go.mod
@@ -0,0 +1,6 @@
+module github.com/cespare/xxhash
+
+require (
+	github.com/OneOfOne/xxhash v1.2.2
+	github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
+)
diff --git a/vendor/github.com/cespare/xxhash/go.sum b/vendor/github.com/cespare/xxhash/go.sum
new file mode 100644
index 0000000..f6b5542
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/go.sum
@@ -0,0 +1,4 @@
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
+github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
diff --git a/vendor/github.com/cespare/xxhash/rotate.go b/vendor/github.com/cespare/xxhash/rotate.go
new file mode 100644
index 0000000..f3eac5e
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/rotate.go
@@ -0,0 +1,14 @@
+// +build !go1.9
+
+package xxhash
+
+// TODO(caleb): After Go 1.10 comes out, remove this fallback code.
+
+func rol1(x uint64) uint64  { return (x << 1) | (x >> (64 - 1)) }
+func rol7(x uint64) uint64  { return (x << 7) | (x >> (64 - 7)) }
+func rol11(x uint64) uint64 { return (x << 11) | (x >> (64 - 11)) }
+func rol12(x uint64) uint64 { return (x << 12) | (x >> (64 - 12)) }
+func rol18(x uint64) uint64 { return (x << 18) | (x >> (64 - 18)) }
+func rol23(x uint64) uint64 { return (x << 23) | (x >> (64 - 23)) }
+func rol27(x uint64) uint64 { return (x << 27) | (x >> (64 - 27)) }
+func rol31(x uint64) uint64 { return (x << 31) | (x >> (64 - 31)) }
diff --git a/vendor/github.com/cespare/xxhash/rotate19.go b/vendor/github.com/cespare/xxhash/rotate19.go
new file mode 100644
index 0000000..b99612b
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/rotate19.go
@@ -0,0 +1,14 @@
+// +build go1.9
+
+package xxhash
+
+import "math/bits"
+
+func rol1(x uint64) uint64  { return bits.RotateLeft64(x, 1) }
+func rol7(x uint64) uint64  { return bits.RotateLeft64(x, 7) }
+func rol11(x uint64) uint64 { return bits.RotateLeft64(x, 11) }
+func rol12(x uint64) uint64 { return bits.RotateLeft64(x, 12) }
+func rol18(x uint64) uint64 { return bits.RotateLeft64(x, 18) }
+func rol23(x uint64) uint64 { return bits.RotateLeft64(x, 23) }
+func rol27(x uint64) uint64 { return bits.RotateLeft64(x, 27) }
+func rol31(x uint64) uint64 { return bits.RotateLeft64(x, 31) }
diff --git a/vendor/github.com/cespare/xxhash/xxhash.go b/vendor/github.com/cespare/xxhash/xxhash.go
new file mode 100644
index 0000000..f896bd2
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash.go
@@ -0,0 +1,168 @@
+// Package xxhash implements the 64-bit variant of xxHash (XXH64) as described
+// at http://cyan4973.github.io/xxHash/.
+package xxhash
+
+import (
+	"encoding/binary"
+	"hash"
+)
+
+const (
+	prime1 uint64 = 11400714785074694791
+	prime2 uint64 = 14029467366897019727
+	prime3 uint64 = 1609587929392839161
+	prime4 uint64 = 9650029242287828579
+	prime5 uint64 = 2870177450012600261
+)
+
+// NOTE(caleb): I'm using both consts and vars of the primes. Using consts where
+// possible in the Go code is worth a small (but measurable) performance boost
+// by avoiding some MOVQs. Vars are needed for the asm and also are useful for
+// convenience in the Go code in a few places where we need to intentionally
+// avoid constant arithmetic (e.g., v1 := prime1 + prime2 fails because the
+// result overflows a uint64).
+var (
+	prime1v = prime1
+	prime2v = prime2
+	prime3v = prime3
+	prime4v = prime4
+	prime5v = prime5
+)
+
+type xxh struct {
+	v1    uint64
+	v2    uint64
+	v3    uint64
+	v4    uint64
+	total int
+	mem   [32]byte
+	n     int // how much of mem is used
+}
+
+// New creates a new hash.Hash64 that implements the 64-bit xxHash algorithm.
+func New() hash.Hash64 {
+	var x xxh
+	x.Reset()
+	return &x
+}
+
+func (x *xxh) Reset() {
+	x.n = 0
+	x.total = 0
+	x.v1 = prime1v + prime2
+	x.v2 = prime2
+	x.v3 = 0
+	x.v4 = -prime1v
+}
+
+func (x *xxh) Size() int      { return 8 }
+func (x *xxh) BlockSize() int { return 32 }
+
+// Write adds more data to x. It always returns len(b), nil.
+func (x *xxh) Write(b []byte) (n int, err error) {
+	n = len(b)
+	x.total += len(b)
+
+	if x.n+len(b) < 32 {
+		// This new data doesn't even fill the current block.
+		copy(x.mem[x.n:], b)
+		x.n += len(b)
+		return
+	}
+
+	if x.n > 0 {
+		// Finish off the partial block.
+		copy(x.mem[x.n:], b)
+		x.v1 = round(x.v1, u64(x.mem[0:8]))
+		x.v2 = round(x.v2, u64(x.mem[8:16]))
+		x.v3 = round(x.v3, u64(x.mem[16:24]))
+		x.v4 = round(x.v4, u64(x.mem[24:32]))
+		b = b[32-x.n:]
+		x.n = 0
+	}
+
+	if len(b) >= 32 {
+		// One or more full blocks left.
+		b = writeBlocks(x, b)
+	}
+
+	// Store any remaining partial block.
+	copy(x.mem[:], b)
+	x.n = len(b)
+
+	return
+}
+
+func (x *xxh) Sum(b []byte) []byte {
+	s := x.Sum64()
+	return append(
+		b,
+		byte(s>>56),
+		byte(s>>48),
+		byte(s>>40),
+		byte(s>>32),
+		byte(s>>24),
+		byte(s>>16),
+		byte(s>>8),
+		byte(s),
+	)
+}
+
+func (x *xxh) Sum64() uint64 {
+	var h uint64
+
+	if x.total >= 32 {
+		v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
+		h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
+		h = mergeRound(h, v1)
+		h = mergeRound(h, v2)
+		h = mergeRound(h, v3)
+		h = mergeRound(h, v4)
+	} else {
+		h = x.v3 + prime5
+	}
+
+	h += uint64(x.total)
+
+	i, end := 0, x.n
+	for ; i+8 <= end; i += 8 {
+		k1 := round(0, u64(x.mem[i:i+8]))
+		h ^= k1
+		h = rol27(h)*prime1 + prime4
+	}
+	if i+4 <= end {
+		h ^= uint64(u32(x.mem[i:i+4])) * prime1
+		h = rol23(h)*prime2 + prime3
+		i += 4
+	}
+	for i < end {
+		h ^= uint64(x.mem[i]) * prime5
+		h = rol11(h) * prime1
+		i++
+	}
+
+	h ^= h >> 33
+	h *= prime2
+	h ^= h >> 29
+	h *= prime3
+	h ^= h >> 32
+
+	return h
+}
+
+func u64(b []byte) uint64 { return binary.LittleEndian.Uint64(b) }
+func u32(b []byte) uint32 { return binary.LittleEndian.Uint32(b) }
+
+func round(acc, input uint64) uint64 {
+	acc += input * prime2
+	acc = rol31(acc)
+	acc *= prime1
+	return acc
+}
+
+func mergeRound(acc, val uint64) uint64 {
+	val = round(0, val)
+	acc ^= val
+	acc = acc*prime1 + prime4
+	return acc
+}
diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.go b/vendor/github.com/cespare/xxhash/xxhash_amd64.go
new file mode 100644
index 0000000..d617652
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_amd64.go
@@ -0,0 +1,12 @@
+// +build !appengine
+// +build gc
+// +build !purego
+
+package xxhash
+
+// Sum64 computes the 64-bit xxHash digest of b.
+//
+//go:noescape
+func Sum64(b []byte) uint64
+
+func writeBlocks(x *xxh, b []byte) []byte
diff --git a/vendor/github.com/cespare/xxhash/xxhash_amd64.s b/vendor/github.com/cespare/xxhash/xxhash_amd64.s
new file mode 100644
index 0000000..757f201
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_amd64.s
@@ -0,0 +1,233 @@
+// +build !appengine
+// +build gc
+// +build !purego
+
+#include "textflag.h"
+
+// Register allocation:
+// AX	h
+// CX	pointer to advance through b
+// DX	n
+// BX	loop end
+// R8	v1, k1
+// R9	v2
+// R10	v3
+// R11	v4
+// R12	tmp
+// R13	prime1v
+// R14	prime2v
+// R15	prime4v
+
+// round reads from and advances the buffer pointer in CX.
+// It assumes that R13 has prime1v and R14 has prime2v.
+#define round(r) \
+	MOVQ  (CX), R12 \
+	ADDQ  $8, CX    \
+	IMULQ R14, R12  \
+	ADDQ  R12, r    \
+	ROLQ  $31, r    \
+	IMULQ R13, r
+
+// mergeRound applies a merge round on the two registers acc and val.
+// It assumes that R13 has prime1v, R14 has prime2v, and R15 has prime4v.
+#define mergeRound(acc, val) \
+	IMULQ R14, val \
+	ROLQ  $31, val \
+	IMULQ R13, val \
+	XORQ  val, acc \
+	IMULQ R13, acc \
+	ADDQ  R15, acc
+
+// func Sum64(b []byte) uint64
+TEXT ·Sum64(SB), NOSPLIT, $0-32
+	// Load fixed primes.
+	MOVQ ·prime1v(SB), R13
+	MOVQ ·prime2v(SB), R14
+	MOVQ ·prime4v(SB), R15
+
+	// Load slice.
+	MOVQ b_base+0(FP), CX
+	MOVQ b_len+8(FP), DX
+	LEAQ (CX)(DX*1), BX
+
+	// The first loop limit will be len(b)-32.
+	SUBQ $32, BX
+
+	// Check whether we have at least one block.
+	CMPQ DX, $32
+	JLT  noBlocks
+
+	// Set up initial state (v1, v2, v3, v4).
+	MOVQ R13, R8
+	ADDQ R14, R8
+	MOVQ R14, R9
+	XORQ R10, R10
+	XORQ R11, R11
+	SUBQ R13, R11
+
+	// Loop until CX > BX.
+blockLoop:
+	round(R8)
+	round(R9)
+	round(R10)
+	round(R11)
+
+	CMPQ CX, BX
+	JLE  blockLoop
+
+	MOVQ R8, AX
+	ROLQ $1, AX
+	MOVQ R9, R12
+	ROLQ $7, R12
+	ADDQ R12, AX
+	MOVQ R10, R12
+	ROLQ $12, R12
+	ADDQ R12, AX
+	MOVQ R11, R12
+	ROLQ $18, R12
+	ADDQ R12, AX
+
+	mergeRound(AX, R8)
+	mergeRound(AX, R9)
+	mergeRound(AX, R10)
+	mergeRound(AX, R11)
+
+	JMP afterBlocks
+
+noBlocks:
+	MOVQ ·prime5v(SB), AX
+
+afterBlocks:
+	ADDQ DX, AX
+
+	// Right now BX has len(b)-32, and we want to loop until CX > len(b)-8.
+	ADDQ $24, BX
+
+	CMPQ CX, BX
+	JG   fourByte
+
+wordLoop:
+	// Calculate k1.
+	MOVQ  (CX), R8
+	ADDQ  $8, CX
+	IMULQ R14, R8
+	ROLQ  $31, R8
+	IMULQ R13, R8
+
+	XORQ  R8, AX
+	ROLQ  $27, AX
+	IMULQ R13, AX
+	ADDQ  R15, AX
+
+	CMPQ CX, BX
+	JLE  wordLoop
+
+fourByte:
+	ADDQ $4, BX
+	CMPQ CX, BX
+	JG   singles
+
+	MOVL  (CX), R8
+	ADDQ  $4, CX
+	IMULQ R13, R8
+	XORQ  R8, AX
+
+	ROLQ  $23, AX
+	IMULQ R14, AX
+	ADDQ  ·prime3v(SB), AX
+
+singles:
+	ADDQ $4, BX
+	CMPQ CX, BX
+	JGE  finalize
+
+singlesLoop:
+	MOVBQZX (CX), R12
+	ADDQ    $1, CX
+	IMULQ   ·prime5v(SB), R12
+	XORQ    R12, AX
+
+	ROLQ  $11, AX
+	IMULQ R13, AX
+
+	CMPQ CX, BX
+	JL   singlesLoop
+
+finalize:
+	MOVQ  AX, R12
+	SHRQ  $33, R12
+	XORQ  R12, AX
+	IMULQ R14, AX
+	MOVQ  AX, R12
+	SHRQ  $29, R12
+	XORQ  R12, AX
+	IMULQ ·prime3v(SB), AX
+	MOVQ  AX, R12
+	SHRQ  $32, R12
+	XORQ  R12, AX
+
+	MOVQ AX, ret+24(FP)
+	RET
+
+// writeBlocks uses the same registers as above except that it uses AX to store
+// the x pointer.
+
+// func writeBlocks(x *xxh, b []byte) []byte
+TEXT ·writeBlocks(SB), NOSPLIT, $0-56
+	// Load fixed primes needed for round.
+	MOVQ ·prime1v(SB), R13
+	MOVQ ·prime2v(SB), R14
+
+	// Load slice.
+	MOVQ b_base+8(FP), CX
+	MOVQ CX, ret_base+32(FP) // initialize return base pointer; see NOTE below
+	MOVQ b_len+16(FP), DX
+	LEAQ (CX)(DX*1), BX
+	SUBQ $32, BX
+
+	// Load vN from x.
+	MOVQ x+0(FP), AX
+	MOVQ 0(AX), R8   // v1
+	MOVQ 8(AX), R9   // v2
+	MOVQ 16(AX), R10 // v3
+	MOVQ 24(AX), R11 // v4
+
+	// We don't need to check the loop condition here; this function is
+	// always called with at least one block of data to process.
+blockLoop:
+	round(R8)
+	round(R9)
+	round(R10)
+	round(R11)
+
+	CMPQ CX, BX
+	JLE  blockLoop
+
+	// Copy vN back to x.
+	MOVQ R8, 0(AX)
+	MOVQ R9, 8(AX)
+	MOVQ R10, 16(AX)
+	MOVQ R11, 24(AX)
+
+	// Construct return slice.
+	// NOTE: It's important that we don't construct a slice that has a base
+	// pointer off the end of the original slice, as in Go 1.7+ this will
+	// cause runtime crashes. (See discussion in, for example,
+	// https://github.com/golang/go/issues/16772.)
+	// Therefore, we calculate the length/cap first, and if they're zero, we
+	// keep the old base. This is what the compiler does as well if you
+	// write code like
+	//   b = b[len(b):]
+
+	// New length is 32 - (CX - BX) -> BX+32 - CX.
+	ADDQ $32, BX
+	SUBQ CX, BX
+	JZ   afterSetBase
+
+	MOVQ CX, ret_base+32(FP)
+
+afterSetBase:
+	MOVQ BX, ret_len+40(FP)
+	MOVQ BX, ret_cap+48(FP) // set cap == len
+
+	RET
diff --git a/vendor/github.com/cespare/xxhash/xxhash_other.go b/vendor/github.com/cespare/xxhash/xxhash_other.go
new file mode 100644
index 0000000..c68d13f
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_other.go
@@ -0,0 +1,75 @@
+// +build !amd64 appengine !gc purego
+
+package xxhash
+
+// Sum64 computes the 64-bit xxHash digest of b.
+func Sum64(b []byte) uint64 {
+	// A simpler version would be
+	//   x := New()
+	//   x.Write(b)
+	//   return x.Sum64()
+	// but this is faster, particularly for small inputs.
+
+	n := len(b)
+	var h uint64
+
+	if n >= 32 {
+		v1 := prime1v + prime2
+		v2 := prime2
+		v3 := uint64(0)
+		v4 := -prime1v
+		for len(b) >= 32 {
+			v1 = round(v1, u64(b[0:8:len(b)]))
+			v2 = round(v2, u64(b[8:16:len(b)]))
+			v3 = round(v3, u64(b[16:24:len(b)]))
+			v4 = round(v4, u64(b[24:32:len(b)]))
+			b = b[32:len(b):len(b)]
+		}
+		h = rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
+		h = mergeRound(h, v1)
+		h = mergeRound(h, v2)
+		h = mergeRound(h, v3)
+		h = mergeRound(h, v4)
+	} else {
+		h = prime5
+	}
+
+	h += uint64(n)
+
+	i, end := 0, len(b)
+	for ; i+8 <= end; i += 8 {
+		k1 := round(0, u64(b[i:i+8:len(b)]))
+		h ^= k1
+		h = rol27(h)*prime1 + prime4
+	}
+	if i+4 <= end {
+		h ^= uint64(u32(b[i:i+4:len(b)])) * prime1
+		h = rol23(h)*prime2 + prime3
+		i += 4
+	}
+	for ; i < end; i++ {
+		h ^= uint64(b[i]) * prime5
+		h = rol11(h) * prime1
+	}
+
+	h ^= h >> 33
+	h *= prime2
+	h ^= h >> 29
+	h *= prime3
+	h ^= h >> 32
+
+	return h
+}
+
+func writeBlocks(x *xxh, b []byte) []byte {
+	v1, v2, v3, v4 := x.v1, x.v2, x.v3, x.v4
+	for len(b) >= 32 {
+		v1 = round(v1, u64(b[0:8:len(b)]))
+		v2 = round(v2, u64(b[8:16:len(b)]))
+		v3 = round(v3, u64(b[16:24:len(b)]))
+		v4 = round(v4, u64(b[24:32:len(b)]))
+		b = b[32:len(b):len(b)]
+	}
+	x.v1, x.v2, x.v3, x.v4 = v1, v2, v3, v4
+	return b
+}
diff --git a/vendor/github.com/cespare/xxhash/xxhash_safe.go b/vendor/github.com/cespare/xxhash/xxhash_safe.go
new file mode 100644
index 0000000..dfa15ab
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_safe.go
@@ -0,0 +1,10 @@
+// +build appengine
+
+// This file contains the safe implementations of otherwise unsafe-using code.
+
+package xxhash
+
+// Sum64String computes the 64-bit xxHash digest of s.
+func Sum64String(s string) uint64 {
+	return Sum64([]byte(s))
+}
diff --git a/vendor/github.com/cespare/xxhash/xxhash_unsafe.go b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go
new file mode 100644
index 0000000..d2b64e8
--- /dev/null
+++ b/vendor/github.com/cespare/xxhash/xxhash_unsafe.go
@@ -0,0 +1,30 @@
+// +build !appengine
+
+// This file encapsulates usage of unsafe.
+// xxhash_safe.go contains the safe implementations.
+
+package xxhash
+
+import (
+	"reflect"
+	"unsafe"
+)
+
+// Sum64String computes the 64-bit xxHash digest of s.
+// It may be faster than Sum64([]byte(s)) by avoiding a copy.
+//
+// TODO(caleb): Consider removing this if an optimization is ever added to make
+// it unnecessary: https://golang.org/issue/2205.
+//
+// TODO(caleb): We still have a function call; we could instead write Go/asm
+// copies of Sum64 for strings to squeeze out a bit more speed.
+func Sum64String(s string) uint64 {
+	// See https://groups.google.com/d/msg/golang-nuts/dcjzJy-bSpw/tcZYBzQqAQAJ
+	// for some discussion about this unsafe conversion.
+	var b []byte
+	bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+	bh.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data
+	bh.Len = len(s)
+	bh.Cap = len(s)
+	return Sum64(b)
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
index 02fa3de..bbae0ed 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
@@ -17,6 +17,7 @@
 
 import (
 	"context"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"time"
 
 	"github.com/golang/protobuf/proto"
@@ -32,14 +33,17 @@
 	kafkaICProxy kafka.InterContainerProxy
 	adapterTopic string
 	coreTopic    string
+	endpointMgr  kafka.EndpointManager
 }
 
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
-	var proxy AdapterProxy
-	proxy.kafkaICProxy = kafkaProxy
-	proxy.adapterTopic = adapterTopic
-	proxy.coreTopic = coreTopic
-	logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+	proxy := AdapterProxy{
+		kafkaICProxy: kafkaProxy,
+		adapterTopic: adapterTopic,
+		coreTopic:    coreTopic,
+		endpointMgr:  kafka.NewEndpointManager(backend),
+	}
+	logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
 	return &proxy
 }
 
@@ -87,7 +91,11 @@
 	}
 
 	// Set up the required rpc arguments
-	topic := kafka.Topic{Name: toAdapter}
+	endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
+	if err != nil {
+		return err
+	}
+	topic := kafka.Topic{Name: string(endpoint)}
 	replyToTopic := kafka.Topic{Name: fromAdapter}
 	rpc := "process_inter_adapter_message"
 
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
index 9582f33..20e1a52 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -99,6 +99,28 @@
 	topic := kafka.Topic{Name: ap.coreTopic}
 	replyToTopic := ap.getAdapterTopic()
 	args := make([]*kafka.KVArg, 2)
+
+	if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+		log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+	}
+
+	if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+		log.Fatal("currentReplica can't be 0, it has to start from 1")
+	}
+
+	if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+		// if the adapter is not setting these fields they default to 0,
+		// in that case it means the adapter is not ready to be scaled and thus it defaults
+		// to a single instance
+		adapter.CurrentReplica = 1
+		adapter.TotalReplicas = 1
+	}
+
+	if adapter.CurrentReplica > adapter.TotalReplicas {
+		log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+			adapter.CurrentReplica, adapter.TotalReplicas)
+	}
+
 	args[0] = &kafka.KVArg{
 		Key:   "adapter",
 		Value: adapter,
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
index 9f08b0d..724ad32 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
@@ -63,10 +63,10 @@
 	ConfigAttribute string
 }
 
-// ConfigManager is a wrapper over backend to maintain Configuration of voltha components
+// ConfigManager is a wrapper over Backend to maintain Configuration of voltha components
 // in kvstore based persistent storage
 type ConfigManager struct {
-	backend             *db.Backend
+	Backend             *db.Backend
 	KvStoreConfigPrefix string
 }
 
@@ -95,7 +95,7 @@
 
 	return &ConfigManager{
 		KvStoreConfigPrefix: defaultkvStoreConfigPath,
-		backend: &db.Backend{
+		Backend: &db.Backend{
 			Client:     kvClient,
 			StoreType:  kvStoreType,
 			Host:       kvStoreHost,
@@ -108,12 +108,12 @@
 
 // RetrieveComponentList list the component Names for which loglevel is stored in kvstore
 func (c *ConfigManager) RetrieveComponentList(ctx context.Context, configType ConfigType) ([]string, error) {
-	data, err := c.backend.List(ctx, c.KvStoreConfigPrefix)
+	data, err := c.Backend.List(ctx, c.KvStoreConfigPrefix)
 	if err != nil {
 		return nil, err
 	}
 
-	// Looping through the data recieved from the backend for config
+	// Looping through the data recieved from the Backend for config
 	// Trimming and Splitting the required key and value from data and  storing as componentName,PackageName and Level
 	// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
 	// Then in default will be stored as PackageName,componentName as <Component Name> and DEBUG will be stored as value in List struct
@@ -168,14 +168,14 @@
 
 	c.changeEventChan = make(chan *ConfigChangeEvent, 1)
 
-	c.kvStoreEventChan = c.cManager.backend.CreateWatch(ctx, key, true)
+	c.kvStoreEventChan = c.cManager.Backend.CreateWatch(ctx, key, true)
 
 	go c.processKVStoreWatchEvents()
 
 	return c.changeEventChan
 }
 
-// processKVStoreWatchEvents process event channel recieved from the backend for any ChangeType
+// processKVStoreWatchEvents process event channel recieved from the Backend for any ChangeType
 // It checks for the EventType is valid or not.For the valid EventTypes creates ConfigChangeEvent and send it on channel
 func (c *ComponentConfig) processKVStoreWatchEvents() {
 
@@ -183,7 +183,7 @@
 
 	logger.Debugw("processing-kvstore-event-change", log.Fields{"key-prefix": ccKeyPrefix})
 
-	ccPathPrefix := c.cManager.backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
+	ccPathPrefix := c.cManager.Backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
 
 	for watchResp := range c.kvStoreEventChan {
 
@@ -210,7 +210,7 @@
 
 	logger.Debugw("retrieving-config", log.Fields{"key": key})
 
-	if kvpair, err := c.cManager.backend.Get(ctx, key); err != nil {
+	if kvpair, err := c.cManager.Backend.Get(ctx, key); err != nil {
 		return "", err
 	} else {
 		if kvpair == nil {
@@ -228,17 +228,17 @@
 
 	logger.Debugw("retreiving-list", log.Fields{"key": key})
 
-	data, err := c.cManager.backend.List(ctx, key)
+	data, err := c.cManager.Backend.List(ctx, key)
 	if err != nil {
 		return nil, err
 	}
 
-	// Looping through the data recieved from the backend for the given key
+	// Looping through the data recieved from the Backend for the given key
 	// Trimming the required key and value from data and  storing as key/value pair
 	// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
 	// Then in default will be stored as key and DEBUG will be stored as value in map[string]string
 	res := make(map[string]string)
-	ccPathPrefix := c.cManager.backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
+	ccPathPrefix := c.cManager.Backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
 	for attr, val := range data {
 		res[strings.TrimPrefix(attr, ccPathPrefix)] = strings.Trim(fmt.Sprintf("%s", val.Value), "\"")
 	}
@@ -252,7 +252,7 @@
 	logger.Debugw("saving-config", log.Fields{"key": key, "value": configValue})
 
 	//save the data for update config
-	if err := c.cManager.backend.Put(ctx, key, configValue); err != nil {
+	if err := c.cManager.Backend.Put(ctx, key, configValue); err != nil {
 		return err
 	}
 	return nil
@@ -264,7 +264,7 @@
 
 	logger.Debugw("deleting-config", log.Fields{"key": key})
 	//delete the config
-	if err := c.cManager.backend.Delete(ctx, key); err != nil {
+	if err := c.cManager.Backend.Delete(ctx, key); err != nil {
 		return err
 	}
 	return nil
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
index b929c9d..9c36241 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
@@ -15,7 +15,7 @@
  */
 
 // Package Config provides dynamic logging configuration for specific Voltha component with loglevel lookup
-// from etcd kvstore implemented using backend.
+// from etcd kvstore implemented using Backend.
 // Any Voltha component can start utilizing dynamic logging by starting goroutine of StartLogLevelConfigProcessing after
 // starting kvClient for the component.
 
@@ -121,8 +121,8 @@
 }
 
 // ProcessLogConfig will first load and apply log config and then start waiting on component config and global config
-// channels for any changes. Event channel will be recieved from backend for valid change type
-// Then data for componentn log config and global log config will be retrieved from backend and stored in updatedLogConfig in precedence order
+// channels for any changes. Event channel will be recieved from Backend for valid change type
+// Then data for componentn log config and global log config will be retrieved from Backend and stored in updatedLogConfig in precedence order
 // If any changes in updatedLogConfig will be applied on component
 func (c *ComponentLogController) processLogConfig(ctx context.Context) {
 
@@ -247,7 +247,7 @@
 	return componentLogConfig, nil
 }
 
-// buildUpdatedLogConfig retrieve the global logConfig and component logConfig  from backend
+// buildUpdatedLogConfig retrieve the global logConfig and component logConfig  from Backend
 // component logConfig stores the log config with precedence order
 // For example, If the global logConfig is set and component logConfig is set only for specific package then
 // component logConfig is stored with global logConfig  and component logConfig of specific package
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..1258382
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"fmt"
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"sync"
+)
+
+const (
+	// All the values below can be tuned to get optimal data distribution.  The numbers below seems to work well when
+	// supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+	// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+	DefaultPartitionCount = 1117
+
+	// Represents how many times a node is replicated on the consistent ring.
+	DefaultReplicationFactor = 117
+
+	// Load is used to calculate average load.
+	DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance.  When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+	// GetEndpoint is called to get the endpoint to communicate with for a specific device and service type.  For
+	// now this will return the topic name
+	GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+
+	// IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+	// devices owned by that service need to be reconciled
+	IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+	// GetReplicaAssignment returns the replica number of the service that owns the deviceID.  This is used by the
+	// test only
+	GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+	id             string // Id of the service.  The same id is used for all replicas
+	totalReplicas  int32
+	replicas       map[ReplicaID]Endpoint
+	consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+	partitionCount           int
+	replicationFactor        int
+	load                     float64
+	backend                  *db.Backend
+	services                 map[string]*service
+	servicesLock             sync.RWMutex
+	deviceTypeServiceMap     map[string]string
+	deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.partitionCount = count
+	}
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.replicationFactor = replicas
+	}
+}
+
+func Load(load float64) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.load = load
+	}
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	tm := &endpointManager{
+		partitionCount:       DefaultPartitionCount,
+		replicationFactor:    DefaultReplicationFactor,
+		load:                 DefaultLoad,
+		backend:              backend,
+		services:             make(map[string]*service),
+		deviceTypeServiceMap: make(map[string]string),
+	}
+
+	for _, option := range opts {
+		option(tm)
+	}
+	return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
+	logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return "", err
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	endpoint := m.getEndPoint()
+	if endpoint == "" {
+		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+	}
+	logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+	return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+	logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return false, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return 0, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
+	serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+	if err != nil {
+		return nil, err
+	}
+	key := ep.makeKey(deviceID, dType, serviceType)
+	return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+	// Check whether service exist
+	ep.servicesLock.RLock()
+	serv, serviceExist := ep.services[serviceType]
+	ep.servicesLock.RUnlock()
+
+	// Load the service and device types if needed
+	if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+		if err := ep.loadServices(); err != nil {
+			return nil, "", err
+		}
+
+		// Check whether the service exists now
+		ep.servicesLock.RLock()
+		serv, serviceExist = ep.services[serviceType]
+		ep.servicesLock.RUnlock()
+		if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+			return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+		}
+	}
+
+	ep.deviceTypeServiceMapLock.RLock()
+	defer ep.deviceTypeServiceMapLock.RUnlock()
+	for dType, sType := range ep.deviceTypeServiceMap {
+		if sType == serviceType {
+			return serv, dType, nil
+		}
+	}
+	return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+	return consistent.Config{
+		PartitionCount:    ep.partitionCount,
+		ReplicationFactor: ep.replicationFactor,
+		Load:              ep.load,
+		Hasher:            hasher{},
+	}
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices() error {
+	ep.servicesLock.Lock()
+	defer ep.servicesLock.Unlock()
+	ep.deviceTypeServiceMapLock.Lock()
+	defer ep.deviceTypeServiceMapLock.Unlock()
+
+	if ep.backend == nil {
+		return status.Error(codes.Aborted, "backend-not-set")
+	}
+	ep.services = make(map[string]*service)
+	ep.deviceTypeServiceMap = make(map[string]string)
+
+	// Load the adapters
+	blobs, err := ep.backend.List(context.Background(), "adapters")
+	if err != nil {
+		return err
+	}
+
+	// Data is marshalled as proto bytes in the data store
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		adapter := &voltha.Adapter{}
+		if err := proto.Unmarshal(data, adapter); err != nil {
+			return err
+		}
+		// A valid adapter should have the vendorID set
+		if adapter.Vendor != "" {
+			if _, ok := ep.services[adapter.Type]; !ok {
+				ep.services[adapter.Type] = &service{
+					id:             adapter.Type,
+					totalReplicas:  adapter.TotalReplicas,
+					replicas:       make(map[ReplicaID]Endpoint),
+					consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+				}
+
+			}
+			currentReplica := ReplicaID(adapter.CurrentReplica)
+			endpoint := Endpoint(adapter.Endpoint)
+			ep.services[adapter.Type].replicas[currentReplica] = endpoint
+			ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+		}
+	}
+	// Load the device types
+	blobs, err = ep.backend.List(context.Background(), "device_types")
+	if err != nil {
+		return err
+	}
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		deviceType := &voltha.DeviceType{}
+		if err := proto.Unmarshal(data, deviceType); err != nil {
+			return err
+		}
+		if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+			ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+		}
+	}
+
+	// Log the loaded data in debug mode to facilitate trouble shooting
+	if logger.V(log.DebugLevel) {
+		for key, val := range ep.services {
+			members := val.consistentRing.GetMembers()
+			logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+			for _, m := range members {
+				n := m.(Member)
+				logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+			}
+		}
+		logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+	}
+	return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+	return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function.  Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+	return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+	String() string
+	getReplica() ReplicaID
+	getEndPoint() Endpoint
+	getID() string
+	getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+	id          string
+	serviceType string
+	vendor      string
+	version     string
+	replica     ReplicaID
+	endpoint    Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+	return &member{
+		id:          ID,
+		serviceType: serviceType,
+		vendor:      vendor,
+		version:     version,
+		replica:     replica,
+		endpoint:    endPoint,
+	}
+}
+
+func (m *member) String() string {
+	return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+	return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+	return m.endpoint
+}
+
+func (m *member) getID() string {
+	return m.id
+}
+
+func (m *member) getServiceType() string {
+	return m.serviceType
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go
similarity index 98%
copy from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go
copy to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go
index 90612bb..a45b4b2 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package mocks
+package etcd
 
 import (
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd_server.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/etcd_server.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd_server.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/etcd_server.go
index 487b991..b4e201d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd_server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/etcd_server.go
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package mocks
+package etcd
 
 import (
 	"fmt"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
similarity index 98%
rename from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
index 90612bb..05bc5f9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package mocks
+package kafka
 
 import (
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/endpoint_manager.go
new file mode 100644
index 0000000..fedbebf
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/endpoint_manager.go
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+)
+
+type EndpointManager struct{}
+
+func NewEndpointManager() kafka.EndpointManager {
+	mock := &EndpointManager{}
+	return mock
+}
+
+func (em *EndpointManager) GetEndpoint(deviceID string, serviceType string) (kafka.Endpoint, error) {
+	// TODO add mocks call and args
+	return kafka.Endpoint(serviceType), nil
+}
+
+func (em *EndpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+	// TODO add mocks call and args
+	return true, nil
+}
+
+func (em *EndpointManager) GetReplicaAssignment(deviceID string, serviceType string) (kafka.ReplicaID, error) {
+	return kafka.ReplicaID(1), nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_client.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_client.go
index 38f147e..5922ce2 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_client.go
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package mocks
+package kafka
 
 import (
 	"fmt"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_inter_container_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_inter_container_proxy.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
index 9879830..34aec95 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_inter_container_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package mocks
+package kafka
 
 import (
 	"context"
diff --git a/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go b/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
index 4cc76e0..9359dc1 100644
--- a/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
+++ b/vendor/github.com/opencord/voltha-protos/v3/go/voltha/adapter.pb.go
@@ -65,8 +65,8 @@
 
 // Adapter (software plugin)
 type Adapter struct {
-	// Unique name of adapter, matching the python package name under
-	// voltha/adapters.
+	// the adapter ID has to be unique,
+	// it will be generated as Type + CurrentReplica
 	Id      string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
 	Vendor  string `protobuf:"bytes,2,opt,name=vendor,proto3" json:"vendor,omitempty"`
 	Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"`
@@ -76,10 +76,16 @@
 	AdditionalDescription *any.Any `protobuf:"bytes,64,opt,name=additional_description,json=additionalDescription,proto3" json:"additional_description,omitempty"`
 	LogicalDeviceIds      []string `protobuf:"bytes,4,rep,name=logical_device_ids,json=logicalDeviceIds,proto3" json:"logical_device_ids,omitempty"`
 	// timestamp when the adapter last sent a message to the core
-	LastCommunication    *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_communication,json=lastCommunication,proto3" json:"last_communication,omitempty"`
-	XXX_NoUnkeyedLiteral struct{}             `json:"-"`
-	XXX_unrecognized     []byte               `json:"-"`
-	XXX_sizecache        int32                `json:"-"`
+	LastCommunication *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_communication,json=lastCommunication,proto3" json:"last_communication,omitempty"`
+	CurrentReplica    int32                `protobuf:"varint,6,opt,name=currentReplica,proto3" json:"currentReplica,omitempty"`
+	TotalReplicas     int32                `protobuf:"varint,7,opt,name=totalReplicas,proto3" json:"totalReplicas,omitempty"`
+	Endpoint          string               `protobuf:"bytes,8,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
+	// all replicas of the same adapter will have the same type
+	// it is used to associate a device to an adapter
+	Type                 string   `protobuf:"bytes,9,opt,name=type,proto3" json:"type,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
 }
 
 func (m *Adapter) Reset()         { *m = Adapter{} }
@@ -156,6 +162,34 @@
 	return nil
 }
 
+func (m *Adapter) GetCurrentReplica() int32 {
+	if m != nil {
+		return m.CurrentReplica
+	}
+	return 0
+}
+
+func (m *Adapter) GetTotalReplicas() int32 {
+	if m != nil {
+		return m.TotalReplicas
+	}
+	return 0
+}
+
+func (m *Adapter) GetEndpoint() string {
+	if m != nil {
+		return m.Endpoint
+	}
+	return ""
+}
+
+func (m *Adapter) GetType() string {
+	if m != nil {
+		return m.Type
+	}
+	return ""
+}
+
 type Adapters struct {
 	Items                []*Adapter `protobuf:"bytes,1,rep,name=items,proto3" json:"items,omitempty"`
 	XXX_NoUnkeyedLiteral struct{}   `json:"-"`
@@ -204,31 +238,34 @@
 func init() { proto.RegisterFile("voltha_protos/adapter.proto", fileDescriptor_7e998ce153307274) }
 
 var fileDescriptor_7e998ce153307274 = []byte{
-	// 405 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xd1, 0x6a, 0xdb, 0x30,
-	0x14, 0x86, 0x71, 0xb2, 0xb8, 0xab, 0x4a, 0x59, 0xaa, 0x2d, 0xc3, 0xf3, 0x28, 0x35, 0x81, 0x81,
-	0x2f, 0x56, 0x99, 0xb5, 0x2f, 0xb0, 0xa4, 0xbd, 0xe9, 0xad, 0x28, 0xbb, 0xd8, 0x8d, 0x51, 0x24,
-	0xd5, 0x15, 0xd8, 0x3a, 0xc6, 0x52, 0x0c, 0x7d, 0xc8, 0xbd, 0xc1, 0x1e, 0x60, 0x4f, 0xb0, 0xeb,
-	0x11, 0x49, 0x26, 0x4e, 0x06, 0xbd, 0x32, 0xfa, 0xbf, 0xff, 0xfc, 0xe7, 0x1c, 0xc9, 0xe8, 0x73,
-	0x0f, 0xb5, 0x7d, 0x66, 0x65, 0xdb, 0x81, 0x05, 0x53, 0x30, 0xc1, 0x5a, 0x2b, 0x3b, 0xe2, 0x8e,
-	0x38, 0xf6, 0x30, 0xfd, 0x54, 0x01, 0x54, 0xb5, 0x2c, 0x9c, 0xba, 0xd9, 0x3e, 0x15, 0x4c, 0xbf,
-	0x78, 0x4b, 0x9a, 0x1e, 0xd6, 0x73, 0x68, 0x1a, 0xd0, 0x81, 0x25, 0x87, 0xac, 0x91, 0x96, 0x05,
-	0x72, 0x75, 0x1c, 0x68, 0x55, 0x23, 0x8d, 0x65, 0x4d, 0xeb, 0x0d, 0x4b, 0x8a, 0xce, 0x57, 0x7e,
-	0x94, 0x3b, 0xd0, 0x4f, 0xaa, 0xc2, 0x2b, 0x74, 0xc1, 0x84, 0x50, 0x56, 0x81, 0x66, 0x75, 0xc9,
-	0x9d, 0x98, 0x7c, 0xcf, 0xa2, 0xfc, 0xec, 0xe6, 0x03, 0xf1, 0x69, 0x64, 0x48, 0x23, 0x2b, 0xfd,
-	0x42, 0xe7, 0x7b, 0xbb, 0x8f, 0x58, 0xfe, 0x9e, 0xa0, 0x93, 0x10, 0x8a, 0x17, 0x68, 0xa2, 0x44,
-	0x12, 0x65, 0x51, 0x7e, 0xba, 0x9e, 0xfd, 0xf9, 0xfb, 0xeb, 0x32, 0xa2, 0x13, 0x25, 0xf0, 0x25,
-	0x8a, 0x7b, 0xa9, 0x05, 0x74, 0xc9, 0x64, 0x8c, 0x82, 0x88, 0xaf, 0xd0, 0x49, 0x2f, 0x3b, 0xa3,
-	0x40, 0x27, 0xd3, 0x31, 0x1f, 0x54, 0x7c, 0x8d, 0xe2, 0x30, 0xda, 0xdc, 0x8d, 0xb6, 0x20, 0xfe,
-	0x0a, 0xc8, 0xc1, 0x32, 0x34, 0x98, 0x30, 0x45, 0x1f, 0x47, 0x4b, 0x09, 0x69, 0x78, 0xa7, 0xda,
-	0xdd, 0xe9, 0xb5, 0xcd, 0x86, 0xa6, 0x8b, 0x7d, 0xe9, 0xfd, 0xbe, 0x12, 0x7f, 0x45, 0xb8, 0x86,
-	0x4a, 0x71, 0x17, 0xd8, 0x2b, 0x2e, 0x4b, 0x25, 0x4c, 0xf2, 0x26, 0x9b, 0xe6, 0xa7, 0x74, 0x1e,
-	0xc8, 0xbd, 0x03, 0x0f, 0xc2, 0xe0, 0x07, 0x84, 0x6b, 0x66, 0x6c, 0xb9, 0x7b, 0xb7, 0xad, 0x56,
-	0x9c, 0xb9, 0xee, 0x33, 0xd7, 0x3d, 0xfd, 0xaf, 0xfb, 0xe3, 0xf0, 0x4a, 0xf4, 0x62, 0x57, 0x75,
-	0x37, 0x2e, 0x5a, 0x7e, 0x43, 0x6f, 0xc3, 0x96, 0x06, 0x7f, 0x41, 0x33, 0x65, 0x65, 0x63, 0x92,
-	0x28, 0x9b, 0xe6, 0x67, 0x37, 0xef, 0x8e, 0xae, 0x81, 0x7a, 0xba, 0x7e, 0x44, 0xef, 0xa1, 0xab,
-	0x08, 0xb4, 0x52, 0x73, 0xe8, 0x44, 0x70, 0xad, 0xcf, 0x7f, 0xb8, 0x6f, 0x30, 0xff, 0x24, 0x95,
-	0xb2, 0xcf, 0xdb, 0x0d, 0xe1, 0xd0, 0x14, 0x83, 0xb5, 0xf0, 0xd6, 0xeb, 0xf0, 0x6b, 0xf5, 0xb7,
-	0x45, 0x05, 0x41, 0xdb, 0xc4, 0x4e, 0xbc, 0xfd, 0x17, 0x00, 0x00, 0xff, 0xff, 0x4e, 0xc5, 0xdf,
-	0x09, 0xdb, 0x02, 0x00, 0x00,
+	// 463 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x52, 0xdd, 0x6e, 0xd3, 0x30,
+	0x14, 0x56, 0xda, 0x35, 0x6d, 0x3d, 0x15, 0x3a, 0x43, 0x91, 0x09, 0x9a, 0x16, 0x55, 0x80, 0x72,
+	0xc1, 0x12, 0xb1, 0xbd, 0x00, 0xed, 0x76, 0xb3, 0x5b, 0x6b, 0xe2, 0x82, 0x9b, 0xca, 0xb5, 0xbd,
+	0xcc, 0x52, 0xe2, 0x13, 0xc5, 0x6e, 0xa4, 0x3e, 0x24, 0x2f, 0xc0, 0x13, 0xf0, 0x04, 0x5c, 0xa3,
+	0x3a, 0x0e, 0xfd, 0x41, 0xda, 0x55, 0x72, 0xbe, 0xef, 0x3b, 0xdf, 0xf9, 0x33, 0xfa, 0xd0, 0x40,
+	0x61, 0x9f, 0xd9, 0xaa, 0xaa, 0xc1, 0x82, 0xc9, 0x98, 0x60, 0x95, 0x95, 0x75, 0xea, 0x42, 0x1c,
+	0xb6, 0x64, 0xf4, 0x3e, 0x07, 0xc8, 0x0b, 0x99, 0x39, 0x74, 0xbd, 0x79, 0xca, 0x98, 0xde, 0xb6,
+	0x92, 0x28, 0x3a, 0xce, 0xe7, 0x50, 0x96, 0xa0, 0x3d, 0x47, 0x8e, 0xb9, 0x52, 0x5a, 0xe6, 0x99,
+	0xab, 0x53, 0x43, 0xab, 0x4a, 0x69, 0x2c, 0x2b, 0xab, 0x56, 0x30, 0xa7, 0x68, 0xb2, 0x68, 0x5b,
+	0xb9, 0x03, 0xfd, 0xa4, 0x72, 0xbc, 0x40, 0x17, 0x4c, 0x08, 0x65, 0x15, 0x68, 0x56, 0xac, 0xb8,
+	0x03, 0xc9, 0xb7, 0x38, 0x48, 0xce, 0x6f, 0xde, 0xa6, 0xad, 0x5b, 0xda, 0xb9, 0xa5, 0x0b, 0xbd,
+	0xa5, 0xd3, 0xbd, 0xbc, 0xb5, 0x98, 0xff, 0xea, 0xa3, 0xa1, 0x37, 0xc5, 0x33, 0xd4, 0x53, 0x82,
+	0x04, 0x71, 0x90, 0x8c, 0x97, 0x83, 0xdf, 0x7f, 0x7e, 0x5e, 0x06, 0xb4, 0xa7, 0x04, 0xbe, 0x44,
+	0x61, 0x23, 0xb5, 0x80, 0x9a, 0xf4, 0x0e, 0x29, 0x0f, 0xe2, 0x2b, 0x34, 0x6c, 0x64, 0x6d, 0x14,
+	0x68, 0xd2, 0x3f, 0xe4, 0x3b, 0x14, 0x5f, 0xa3, 0xd0, 0xb7, 0x36, 0x75, 0xad, 0xcd, 0xd2, 0x76,
+	0x05, 0xe9, 0xd1, 0x30, 0xd4, 0x8b, 0x30, 0x45, 0xef, 0x0e, 0x86, 0x12, 0xd2, 0xf0, 0x5a, 0x55,
+	0xbb, 0xe8, 0xa5, 0xc9, 0xba, 0xa2, 0xb3, 0x7d, 0xea, 0xfd, 0x3e, 0x13, 0x7f, 0x41, 0xb8, 0x80,
+	0x5c, 0x71, 0x67, 0xd8, 0x28, 0x2e, 0x57, 0x4a, 0x18, 0x72, 0x16, 0xf7, 0x93, 0x31, 0x9d, 0x7a,
+	0xe6, 0xde, 0x11, 0x0f, 0xc2, 0xe0, 0x07, 0x84, 0x0b, 0x66, 0xec, 0x6a, 0x77, 0xb7, 0x8d, 0x56,
+	0x9c, 0xb9, 0xea, 0x03, 0x57, 0x3d, 0xfa, 0xaf, 0xfa, 0x63, 0x77, 0x25, 0x7a, 0xb1, 0xcb, 0xba,
+	0x3b, 0x4c, 0xc2, 0x9f, 0xd1, 0x2b, 0xbe, 0xa9, 0x6b, 0xa9, 0x2d, 0x95, 0x55, 0xa1, 0x38, 0x23,
+	0x61, 0x1c, 0x24, 0x03, 0x7a, 0x82, 0xe2, 0x8f, 0x68, 0x62, 0xc1, 0xb2, 0xc2, 0xc7, 0x86, 0x0c,
+	0x9d, 0xec, 0x18, 0xc4, 0x11, 0x1a, 0x49, 0x2d, 0x2a, 0x50, 0xda, 0x92, 0xd1, 0x6e, 0xd7, 0xf4,
+	0x5f, 0x8c, 0x31, 0x3a, 0xb3, 0xdb, 0x4a, 0x92, 0xb1, 0xc3, 0xdd, 0xff, 0xfc, 0x2b, 0x1a, 0xf9,
+	0x1d, 0x1b, 0xfc, 0x09, 0x0d, 0x94, 0x95, 0xa5, 0x21, 0x41, 0xdc, 0x4f, 0xce, 0x6f, 0x5e, 0x9f,
+	0x1c, 0x81, 0xb6, 0xec, 0xf2, 0x11, 0xbd, 0x81, 0x3a, 0x4f, 0xa1, 0x92, 0x9a, 0x43, 0x2d, 0xbc,
+	0x6a, 0x39, 0xf9, 0xee, 0xbe, 0x5e, 0xfc, 0x23, 0xcd, 0x95, 0x7d, 0xde, 0xac, 0x53, 0x0e, 0x65,
+	0xd6, 0x49, 0xb3, 0x56, 0x7a, 0xed, 0x1f, 0x76, 0x73, 0x9b, 0xe5, 0xe0, 0xb1, 0x75, 0xe8, 0xc0,
+	0xdb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xef, 0x64, 0x5e, 0x10, 0x59, 0x03, 0x00, 0x00,
 }
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 5b3c29c..8bd422c 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -8,6 +8,10 @@
 github.com/beorn7/perks/quantile
 # github.com/bsm/sarama-cluster v2.1.15+incompatible
 github.com/bsm/sarama-cluster
+# github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
+github.com/buraksezer/consistent
+# github.com/cespare/xxhash v1.1.0
+github.com/cespare/xxhash
 # github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73
 github.com/cevaris/ordered_map
 # github.com/coreos/go-semver v0.2.0
@@ -95,7 +99,7 @@
 github.com/modern-go/concurrent
 # github.com/modern-go/reflect2 v1.0.1
 github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v3 v3.1.0
+# github.com/opencord/voltha-lib-go/v3 v3.1.2
 github.com/opencord/voltha-lib-go/v3/pkg/adapters
 github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
 github.com/opencord/voltha-lib-go/v3/pkg/adapters/common
@@ -106,10 +110,11 @@
 github.com/opencord/voltha-lib-go/v3/pkg/grpc
 github.com/opencord/voltha-lib-go/v3/pkg/kafka
 github.com/opencord/voltha-lib-go/v3/pkg/log
-github.com/opencord/voltha-lib-go/v3/pkg/mocks
+github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd
+github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka
 github.com/opencord/voltha-lib-go/v3/pkg/probe
 github.com/opencord/voltha-lib-go/v3/pkg/version
-# github.com/opencord/voltha-protos/v3 v3.2.8
+# github.com/opencord/voltha-protos/v3 v3.3.0
 github.com/opencord/voltha-protos/v3/go/common
 github.com/opencord/voltha-protos/v3/go/inter_container
 github.com/opencord/voltha-protos/v3/go/omci