[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index a6d8186..c2d45de 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -18,44 +18,108 @@
 
 import (
 	"context"
-	"github.com/golang/protobuf/ptypes"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"errors"
 	"sync"
 	"time"
+
+	"github.com/golang/protobuf/ptypes/empty"
+	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/adapter_services"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+	"google.golang.org/grpc"
 )
 
 // agent represents adapter agent
 type agent struct {
-	adapter *voltha.Adapter
-	lock    sync.RWMutex
+	adapter            *voltha.Adapter
+	lock               sync.RWMutex
+	adapterAPIEndPoint string
+	vClient            *vgrpc.Client
+	adapterLock        sync.RWMutex
+	onAdapterRestart   vgrpc.RestartedHandler
+	liveProbeInterval  time.Duration
 }
 
-func newAdapterAgent(adapter *voltha.Adapter) *agent {
+func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+	svc := adapter_services.NewAdapterServiceClient(conn)
+	if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+		logger.Debugw(ctx, "connection-not-ready", log.Fields{"error": err, "health": h})
+		return nil
+	}
+	return svc
+}
+
+func newAdapterAgent(adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration) *agent {
 	return &agent{
-		adapter: adapter,
+		adapter:            adapter,
+		onAdapterRestart:   onAdapterRestart,
+		adapterAPIEndPoint: adapter.Endpoint,
+		liveProbeInterval:  liveProbeInterval,
+	}
+}
+
+func (aa *agent) start(ctx context.Context) error {
+	// Establish grpc connection to Core
+	var err error
+	if aa.vClient, err = vgrpc.NewClient(aa.adapterAPIEndPoint,
+		aa.onAdapterRestart,
+		vgrpc.ActivityCheck(true)); err != nil {
+		return err
+	}
+
+	// Add a liveness communication update
+	aa.vClient.SubscribeForLiveness(aa.updateCommunicationTime)
+
+	go aa.vClient.Start(ctx, setAndTestAdapterServiceHandler)
+	return nil
+}
+
+func (aa *agent) stop(ctx context.Context) {
+	// Close the client
+	if aa.vClient != nil {
+		aa.vClient.Stop(ctx)
 	}
 }
 
 func (aa *agent) getAdapter(ctx context.Context) *voltha.Adapter {
-	aa.lock.RLock()
-	defer aa.lock.RUnlock()
-	logger.Debugw(ctx, "getAdapter", log.Fields{"adapter": aa.adapter})
+	aa.adapterLock.RLock()
+	defer aa.adapterLock.RUnlock()
 	return aa.adapter
 }
 
+func (aa *agent) getClient() (adapter_services.AdapterServiceClient, error) {
+	client, err := aa.vClient.GetClient()
+	if err != nil {
+		return nil, err
+	}
+	c, ok := client.(adapter_services.AdapterServiceClient)
+	if ok {
+		return c, nil
+	}
+	return nil, errors.New("invalid client returned")
+}
+
+func (aa *agent) resetConnection(ctx context.Context) {
+	if aa.vClient != nil {
+		aa.vClient.Reset(ctx)
+	}
+}
+
 // updateCommunicationTime updates the message to the specified time.
 // No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
 func (aa *agent) updateCommunicationTime(new time.Time) {
 	// only update if new time is not in the future, and either the old time is invalid or new time > old time
 	aa.lock.Lock()
 	defer aa.lock.Unlock()
-	if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
-		timestamp, err := ptypes.TimestampProto(new)
-		if err != nil {
-			return // if the new time cannot be encoded, just ignore it
-		}
-
-		aa.adapter.LastCommunication = timestamp
+	timestamp := time.Unix(aa.adapter.LastCommunication, 0)
+	if !new.After(time.Now()) && new.After(timestamp) {
+		timestamp = new
+		aa.adapter.LastCommunication = timestamp.Unix()
 	}
 }
+
+func (aa *agent) IsConnectionUp() bool {
+	_, err := aa.getClient()
+	return err == nil
+}
diff --git a/rw_core/core/adapter/common.go b/rw_core/core/adapter/common.go
index 0c6b2c3..354ccc9 100644
--- a/rw_core/core/adapter/common.go
+++ b/rw_core/core/adapter/common.go
@@ -18,7 +18,7 @@
 package adapter
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/rw_core/core/adapter/endpoint_manager.go b/rw_core/core/adapter/endpoint_manager.go
new file mode 100644
index 0000000..dac137c
--- /dev/null
+++ b/rw_core/core/adapter/endpoint_manager.go
@@ -0,0 +1,470 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package adapter
+
+import (
+	"context"
+	"fmt"
+	"sync"
+
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+const (
+	// All the values below can be tuned to get optimal data distribution.  The numbers below seems to work well when
+	// supporting 1000-10000 devices and 1 - 20 replicas of an adapter
+
+	// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+	DefaultPartitionCount = 1117
+
+	// Represents how many times a node is replicated on the consistent ring.
+	DefaultReplicationFactor = 117
+
+	// Load is used to calculate average load.
+	DefaultLoad = 1.1
+)
+
+type Endpoint string // The gRPC endpoint of an adapter instance
+type ReplicaID int32 // The replication ID of an adapter instance
+
+type EndpointManager interface {
+
+	// Registers an adapter
+	RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error
+
+	// GetEndpoint is called to get the endpoint to communicate with for a specific device and device type.
+	GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error)
+
+	// IsDeviceOwnedByAdapter is invoked when a specific adapter (adapter type + replicaNumber) is restarted and
+	// devices owned by that adapter need to be reconciled
+	IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error)
+
+	// GetReplicaAssignment returns the replica number of the adapter that owns the deviceID.  This is used by the
+	// test only
+	GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error)
+}
+
+type adapterService struct {
+	adapterType    string // Type of the adapter.  The same type applies for all replicas of that adapter
+	totalReplicas  int32
+	replicas       map[ReplicaID]Endpoint
+	consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+	partitionCount                    int
+	replicationFactor                 int
+	load                              float64
+	backend                           *db.Backend
+	adapterServices                   map[string]*adapterService
+	adapterServicesLock               sync.RWMutex
+	deviceTypeToAdapterServiceMap     map[string]string
+	deviceTypeToAdapterServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.partitionCount = count
+	}
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.replicationFactor = replicas
+	}
+}
+
+func Load(load float64) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.load = load
+	}
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	tm := &endpointManager{
+		partitionCount:                DefaultPartitionCount,
+		replicationFactor:             DefaultReplicationFactor,
+		load:                          DefaultLoad,
+		backend:                       backend,
+		adapterServices:               make(map[string]*adapterService),
+		deviceTypeToAdapterServiceMap: make(map[string]string),
+	}
+
+	for _, option := range opts {
+		option(tm)
+	}
+	return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error) {
+	logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType})
+	owner, err := ep.getOwnerByDeviceType(ctx, deviceID, deviceType)
+	if err != nil {
+		return "", err
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	endpoint := m.getEndPoint()
+	if endpoint == "" {
+		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", deviceType)
+	}
+	logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType, "endpoint": endpoint})
+	return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error) {
+	logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "adapter-type": adapterType, "replica-number": replicaNumber})
+
+	serv, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
+	if err != nil {
+		return false, err
+	}
+	m, ok := serv.(Member)
+	if !ok {
+		return false, status.Errorf(codes.Aborted, "invalid-member-%v", serv)
+	}
+	return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error) {
+	owner, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
+	if err != nil {
+		return 0, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwnerByDeviceType(ctx context.Context, deviceID string, deviceType string) (consistent.Member, error) {
+	serv, err := ep.getAdapterService(ctx, deviceType)
+	if err != nil {
+		return nil, err
+	}
+	key := ep.makeKey(deviceID, deviceType, serv.adapterType)
+	return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getOwnerByAdapterType(ctx context.Context, deviceID string, adapterType string) (consistent.Member, error) {
+	// Check whether the adapter exist
+	ep.adapterServicesLock.RLock()
+	serv, adapterExist := ep.adapterServices[adapterType]
+	ep.adapterServicesLock.RUnlock()
+
+	if !adapterExist {
+		// Sync from the dB
+		if err := ep.loadAdapterServices(ctx); err != nil {
+			return nil, err
+		}
+		// Check again
+		ep.adapterServicesLock.RLock()
+		serv, adapterExist = ep.adapterServices[adapterType]
+		ep.adapterServicesLock.RUnlock()
+		if !adapterExist {
+			return nil, fmt.Errorf("adapter-type-not-exist-%s", adapterType)
+		}
+	}
+
+	// Get the device type
+	deviceType := ""
+	ep.deviceTypeToAdapterServiceMapLock.RLock()
+	for dType, aType := range ep.deviceTypeToAdapterServiceMap {
+		if aType == adapterType {
+			deviceType = dType
+			break
+		}
+	}
+	ep.deviceTypeToAdapterServiceMapLock.RUnlock()
+
+	if deviceType == "" {
+		return nil, fmt.Errorf("device-type-not-exist-for-adapter-type-%s", adapterType)
+	}
+
+	owner := serv.consistentRing.LocateKey(ep.makeKey(deviceID, deviceType, serv.adapterType))
+	m, ok := owner.(Member)
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m, nil
+}
+
+func (ep *endpointManager) getAdapterService(ctx context.Context, deviceType string) (*adapterService, error) {
+	// First get the adapter type for that device type
+	adapterType := ""
+	ep.deviceTypeToAdapterServiceMapLock.RLock()
+	for dType, aType := range ep.deviceTypeToAdapterServiceMap {
+		if dType == deviceType {
+			adapterType = aType
+			break
+		}
+	}
+	ep.deviceTypeToAdapterServiceMapLock.RUnlock()
+
+	// Check whether the adapter exist
+	adapterExist := false
+	var aServ *adapterService
+	if adapterType != "" {
+		ep.adapterServicesLock.RLock()
+		aServ, adapterExist = ep.adapterServices[adapterType]
+		ep.adapterServicesLock.RUnlock()
+	}
+
+	// Load the service and device types if not found, i.e. sync up with the dB
+	if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
+		if err := ep.loadAdapterServices(ctx); err != nil {
+			return nil, err
+		}
+
+		// Get the adapter type if it was empty before
+		if adapterType == "" {
+			ep.deviceTypeToAdapterServiceMapLock.RLock()
+			for dType, aType := range ep.deviceTypeToAdapterServiceMap {
+				if dType == deviceType {
+					adapterType = aType
+					break
+				}
+			}
+			ep.deviceTypeToAdapterServiceMapLock.RUnlock()
+		}
+		// Error put if the adapter type is not set
+		if adapterType == "" {
+			return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
+		}
+
+		// Get the service
+		ep.adapterServicesLock.RLock()
+		aServ, adapterExist = ep.adapterServices[adapterType]
+		ep.adapterServicesLock.RUnlock()
+	}
+
+	// Sanity check
+	if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
+		return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
+	}
+
+	return aServ, nil
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+	return consistent.Config{
+		PartitionCount:    ep.partitionCount,
+		ReplicationFactor: ep.replicationFactor,
+		Load:              ep.load,
+		Hasher:            hasher{},
+	}
+}
+
+// loadAdapterServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadAdapterServices(ctx context.Context) error {
+	ep.adapterServicesLock.Lock()
+	defer ep.adapterServicesLock.Unlock()
+	ep.deviceTypeToAdapterServiceMapLock.Lock()
+	defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
+
+	if ep.backend == nil {
+		return status.Error(codes.Aborted, "backend-not-set")
+	}
+
+	ep.adapterServices = make(map[string]*adapterService)
+	ep.deviceTypeToAdapterServiceMap = make(map[string]string)
+
+	// Load the adapters
+	blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
+	if err != nil {
+		return err
+	}
+
+	// Data is marshalled as proto bytes in the data store
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		adapter := &voltha.Adapter{}
+		if err := proto.Unmarshal(data, adapter); err != nil {
+			return err
+		}
+		// A valid adapter should have the vendorID set
+		if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
+			logger.Errorw(ctx, "missing vendor id", log.Fields{"adapter": adapter})
+		}
+	}
+	// Load the device types
+	blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
+	if err != nil {
+		return err
+	}
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		deviceType := &voltha.DeviceType{}
+		if err := proto.Unmarshal(data, deviceType); err != nil {
+			return err
+		}
+		ep.addDeviceTypeWithLock(deviceType)
+	}
+
+	ep.printServices(ctx)
+	return nil
+}
+
+func (ep *endpointManager) printServices(ctx context.Context) {
+	if logger.V(log.DebugLevel) {
+		for key, val := range ep.adapterServices {
+			members := val.consistentRing.GetMembers()
+			logger.Debugw(ctx, "adapter-service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+			for _, m := range members {
+				n := m.(Member)
+				logger.Debugw(ctx, "adapter-instance-registered", log.Fields{"service-id": n.getID(), "adapter-type": n.getAdapterType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+			}
+		}
+		logger.Debugw(ctx, "device-types", log.Fields{"device-types": ep.deviceTypeToAdapterServiceMap})
+	}
+}
+
+func (ep *endpointManager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
+	ep.adapterServicesLock.Lock()
+	defer ep.adapterServicesLock.Unlock()
+	ep.deviceTypeToAdapterServiceMapLock.Lock()
+	defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
+
+	if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
+		return err
+	}
+	ep.addDeviceTypesWithLock(deviceTypes)
+	ep.printServices(ctx)
+	return nil
+}
+
+func (ep *endpointManager) setupAdapterWithLock(ctx context.Context, adapter *voltha.Adapter) error {
+	// Build the consistent ring for that adapter
+	if adapter.Vendor != "" {
+		if _, ok := ep.adapterServices[adapter.Type]; !ok {
+			ep.adapterServices[adapter.Type] = &adapterService{
+				adapterType:    adapter.Type,
+				totalReplicas:  adapter.TotalReplicas,
+				replicas:       make(map[ReplicaID]Endpoint),
+				consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+			}
+
+		}
+		currentReplica := ReplicaID(adapter.CurrentReplica)
+		endpoint := Endpoint(adapter.Endpoint)
+		ep.adapterServices[adapter.Type].replicas[currentReplica] = endpoint
+		ep.adapterServices[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+	} else {
+		logger.Errorw(ctx, "missing-vendor-id", log.Fields{"adapter": adapter})
+		return fmt.Errorf("missing vendor id for %s adapter", adapter.Id)
+	}
+	return nil
+}
+
+func (ep *endpointManager) addDeviceTypesWithLock(deviceTypes *voltha.DeviceTypes) {
+	// Update the device types
+	for _, deviceType := range deviceTypes.Items {
+		if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
+			ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
+		}
+	}
+}
+
+func (ep *endpointManager) addDeviceTypeWithLock(deviceType *voltha.DeviceType) {
+	if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
+		ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
+	}
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+// In most cases, a deviceType is the same as a serviceType.  It is being differentiated here to allow a
+// serviceType to support multiple device types
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+	return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function.  Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+	return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+	String() string
+	getReplica() ReplicaID
+	getEndPoint() Endpoint
+	getID() string
+	getAdapterType() string
+}
+
+// member implements the Member interface
+type member struct {
+	id          string
+	adapterType string
+	vendor      string
+	version     string
+	replica     ReplicaID
+	endpoint    Endpoint
+}
+
+func newMember(id string, adapterType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+	return &member{
+		id:          id,
+		adapterType: adapterType,
+		vendor:      vendor,
+		version:     version,
+		replica:     replica,
+		endpoint:    endPoint,
+	}
+}
+
+func (m *member) String() string {
+	return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+	return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+	return m.endpoint
+}
+
+func (m *member) getID() string {
+	return m.id
+}
+
+func (m *member) getAdapterType() string {
+	return m.adapterType
+}
diff --git a/rw_core/core/adapter/endpoint_manager_test.go b/rw_core/core/adapter/endpoint_manager_test.go
new file mode 100644
index 0000000..78635d1
--- /dev/null
+++ b/rw_core/core/adapter/endpoint_manager_test.go
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adapter
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/google/uuid"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+	"github.com/phayes/freeport"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+type EPTest struct {
+	etcdServer  *etcd.EtcdServer
+	backend     *db.Backend
+	maxReplicas int
+	minReplicas int
+}
+
+func newEPTest(minReplicas, maxReplicas int) *EPTest {
+	ctx := context.Background()
+	test := &EPTest{
+		minReplicas: minReplicas,
+		maxReplicas: maxReplicas,
+	}
+
+	// Create backend
+	if err := test.initBackend(); err != nil {
+		logger.Fatalw(ctx, "setting-backend-failed", log.Fields{"error": err})
+	}
+
+	// Populate backend with data
+	if err := test.populateBackend(); err != nil {
+		logger.Fatalw(ctx, "populating-db-failed", log.Fields{"error": err})
+	}
+	return test
+}
+
+func (ep *EPTest) initBackend() error {
+	ctx := context.Background()
+	configName := "voltha-go.adapter.ep.test"
+	storageDir := "voltha-go.adapter.ep.etcd"
+	logLevel := "error"
+	timeout := 5 * time.Second
+
+	kvClientPort, err := freeport.GetFreePort()
+	if err != nil {
+		return err
+	}
+	peerPort, err := freeport.GetFreePort()
+	if err != nil {
+		return err
+	}
+	ep.etcdServer = etcd.StartEtcdServer(ctx, etcd.MKConfig(ctx, configName, kvClientPort, peerPort, storageDir, logLevel))
+	if ep.etcdServer == nil {
+		return status.Error(codes.Internal, "Embedded server failed to start")
+	}
+
+	ep.backend = db.NewBackend(ctx, "etcd", "127.0.0.1"+":"+strconv.Itoa(kvClientPort), timeout, "service/voltha")
+	return nil
+}
+
+func (ep *EPTest) stopAll() {
+	if ep.etcdServer != nil {
+		ep.etcdServer.Stop(context.Background())
+	}
+}
+
+func (ep *EPTest) populateBackend() error {
+	// Add an adapter with multiple replicas
+	adapterPrefix := "adapter_brcm_openomci_onu"
+	numReplicas := ep.maxReplicas
+	for i := 0; i < numReplicas; i++ {
+		adapter := &voltha.Adapter{
+			Id:             fmt.Sprintf("%s_%d", adapterPrefix, i),
+			Vendor:         "VOLTHA OpenONU",
+			Version:        "2.4.0-dev0",
+			Type:           adapterPrefix,
+			CurrentReplica: int32(i),
+			TotalReplicas:  int32(numReplicas),
+			Endpoint:       fmt.Sprintf("%s_%d", adapterPrefix, i),
+		}
+		adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
+		blob, err := proto.Marshal(adapter)
+		if err != nil {
+			return err
+		}
+		if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
+			return err
+		}
+	}
+
+	// Add an adapter with minreplicas
+	adapterPrefix = "adapter_openolt"
+	numReplicas = ep.minReplicas
+	for i := 0; i < numReplicas; i++ {
+		adapter := &voltha.Adapter{
+			Id:             fmt.Sprintf("%s_%d", adapterPrefix, i),
+			Vendor:         "VOLTHA OpenOLT",
+			Version:        "2.3.1-dev",
+			Type:           adapterPrefix,
+			CurrentReplica: int32(i),
+			TotalReplicas:  int32(numReplicas),
+			Endpoint:       fmt.Sprintf("%s_%d", adapterPrefix, i),
+		}
+		adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
+		blob, err := proto.Marshal(adapter)
+		if err != nil {
+			return err
+		}
+		if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
+			return err
+		}
+	}
+
+	// Add the brcm_openomci_onu device type
+	dType := "brcm_openomci_onu"
+	adapterName := "adapter_brcm_openomci_onu"
+	deviceType := &voltha.DeviceType{
+		Id:                          dType,
+		VendorIds:                   []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG"},
+		AdapterType:                 adapterName,
+		AcceptsAddRemoveFlowUpdates: true,
+	}
+	blob, err := proto.Marshal(deviceType)
+	if err != nil {
+		return err
+	}
+	if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
+		return err
+	}
+
+	// Add the openolt device type
+	dType = "openolt"
+	adapterName = "adapter_openolt"
+	deviceType = &voltha.DeviceType{
+		Id:                          dType,
+		AdapterType:                 adapterName,
+		AcceptsAddRemoveFlowUpdates: true,
+	}
+	blob, err = proto.Marshal(deviceType)
+	if err != nil {
+		return err
+	}
+	if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
+		return err
+	}
+	return nil
+}
+
+func getMeanAndStdDeviation(val []int, replicas int) (float64, float64) {
+	var sum, mean, sd float64
+	for i := 0; i < replicas; i++ {
+		sum += float64(val[i])
+	}
+	mean = sum / float64(replicas)
+
+	for j := 0; j < replicas; j++ {
+		sd += math.Pow(float64(val[j])-mean, 2)
+	}
+	sd = math.Sqrt(sd / float64(replicas))
+	return mean, sd
+}
+
+func (ep *EPTest) testEndpointManagerAPIs(t *testing.T, tm EndpointManager, adapterType string, deviceType string, replicas int) {
+	ctx := context.Background()
+	// Map of device ids to topic
+	deviceIDs := make(map[string]Endpoint)
+	numDevices := 1000
+	total := make([]int, replicas)
+	for i := 0; i < numDevices; i++ {
+		deviceID := uuid.New().String()
+		endpoint, err := tm.GetEndpoint(ctx, deviceID, deviceType)
+		if err != nil {
+			logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
+		}
+		deviceIDs[deviceID] = endpoint
+		replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, adapterType)
+		if err != nil {
+			logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
+		}
+		total[replicaID]++
+	}
+
+	mean, sdtDev := getMeanAndStdDeviation(total, replicas)
+	fmt.Printf("Device distributions => devices:%d adapter_replicas:%d mean:%d standard_deviation:%d, distributions:%v\n", numDevices, replicas, int(mean), int(sdtDev), total)
+
+	// Verify that we get the same topic for a given device ID, irrespective of the number of iterations
+	numIterations := 10
+	for i := 0; i < numIterations; i++ {
+		for deviceID, expectedEndpoint := range deviceIDs {
+			endpointByAdapterType, err := tm.GetEndpoint(ctx, deviceID, deviceType)
+			if err != nil {
+				logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
+			}
+			assert.Equal(t, expectedEndpoint, endpointByAdapterType)
+		}
+	}
+
+	// Verify that a device belong to the correct node
+	for deviceID := range deviceIDs {
+		replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, adapterType)
+		if err != nil {
+			logger.Fatalw(ctx, "error-getting-topic", log.Fields{"error": err})
+		}
+		for k := 0; k < replicas; k++ {
+			owned, err := tm.IsDeviceOwnedByAdapter(ctx, deviceID, adapterType, int32(k))
+			if err != nil {
+				logger.Fatalw(ctx, "error-verifying-device-ownership", log.Fields{"error": err})
+			}
+			assert.Equal(t, ReplicaID(k) == replicaID, owned)
+		}
+	}
+}
+
+func TestEndpointManagerSuite(t *testing.T) {
+	tmt := newEPTest(1, 10)
+	assert.NotNil(t, tmt)
+
+	tm := NewEndpointManager(
+		tmt.backend,
+		PartitionCount(1117),
+		ReplicationFactor(200),
+		Load(1.1))
+
+	defer tmt.stopAll()
+
+	//1. Test APIs with multiple replicas
+	tmt.testEndpointManagerAPIs(t, tm, "adapter_brcm_openomci_onu", "brcm_openomci_onu", tmt.maxReplicas)
+
+	//2. Test APIs with single replica
+	tmt.testEndpointManagerAPIs(t, tm, "adapter_openolt", "openolt", tmt.minReplicas)
+}
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index 772ff75..b592842 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -18,129 +18,149 @@
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"sync"
 	"time"
 
+	"github.com/opencord/voltha-lib-go/v7/pkg/db"
+	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	"github.com/opencord/voltha-protos/v5/go/adapter_services"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-lib-go/v5/pkg/probe"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
 // Manager represents adapter manager attributes
 type Manager struct {
-	adapterAgents               map[string]*agent
-	deviceTypes                 map[string]*voltha.DeviceType
-	adapterProxy                *model.Proxy
-	deviceTypeProxy             *model.Proxy
-	onAdapterRestart            adapterRestartedHandler
-	coreInstanceID              string
-	lockAdaptersMap             sync.RWMutex
-	lockdDeviceTypeToAdapterMap sync.RWMutex
+	adapterAgents           map[string]*agent
+	adapterEndpoints        map[Endpoint]*agent
+	deviceTypes             map[string]*voltha.DeviceType
+	adapterDbProxy          *model.Proxy
+	deviceTypeDbProxy       *model.Proxy
+	onAdapterRestart        vgrpc.RestartedHandler
+	endpointMgr             EndpointManager
+	lockAdapterAgentsMap    sync.RWMutex
+	lockDeviceTypesMap      sync.RWMutex
+	lockAdapterEndPointsMap sync.RWMutex
+	liveProbeInterval       time.Duration
 }
 
-func NewAdapterManager(ctx context.Context, dbPath *model.Path, coreInstanceID string, kafkaClient kafka.Client) *Manager {
-	aMgr := &Manager{
-		coreInstanceID:  coreInstanceID,
-		adapterProxy:    dbPath.Proxy("adapters"),
-		deviceTypeProxy: dbPath.Proxy("device_types"),
-		deviceTypes:     make(map[string]*voltha.DeviceType),
-		adapterAgents:   make(map[string]*agent),
-	}
-	kafkaClient.SubscribeForMetadata(ctx, aMgr.updateLastAdapterCommunication)
-	return aMgr
-}
-
-// an interface type for callbacks
-// if more than one callback is required, this should be converted to a proper interface
-type adapterRestartedHandler func(ctx context.Context, adapter *voltha.Adapter) error
-
-func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart adapterRestartedHandler) {
+// SetAdapterRestartedCallback is used to set the callback that needs to be invoked on an adapter restart
+func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart vgrpc.RestartedHandler) {
 	aMgr.onAdapterRestart = onAdapterRestart
 }
 
-func (aMgr *Manager) Start(ctx context.Context) {
-	probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusPreparing)
-	logger.Info(ctx, "starting-adapter-manager")
+func NewAdapterManager(
+	dbPath *model.Path,
+	coreInstanceID string,
+	backend *db.Backend,
+	liveProbeInterval time.Duration,
+) *Manager {
+	return &Manager{
+		adapterDbProxy:    dbPath.Proxy("adapters"),
+		deviceTypeDbProxy: dbPath.Proxy("device_types"),
+		deviceTypes:       make(map[string]*voltha.DeviceType),
+		adapterAgents:     make(map[string]*agent),
+		adapterEndpoints:  make(map[Endpoint]*agent),
+		endpointMgr:       NewEndpointManager(backend),
+		liveProbeInterval: liveProbeInterval,
+	}
+}
+
+func (aMgr *Manager) Start(ctx context.Context, serviceName string) {
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusPreparing)
+	logger.Infow(ctx, "starting-service", log.Fields{"service": serviceName})
 
 	// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
 	// created if there are no data in the dB to start
 	err := aMgr.loadAdaptersAndDevicetypesInMemory(ctx)
 	if err != nil {
-		logger.Fatalf(ctx, "failed-to-load-adapters-and-device-types-in-memory: %s", err)
+		logger.Fatalw(ctx, "failed-to-load-adapters-and-device-types-in-memory", log.Fields{"service": serviceName, "error": err})
 	}
 
-	probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusRunning)
-	logger.Info(ctx, "adapter-manager-started")
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+	logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
 }
 
-//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
-	// Load the adapters
-	var adapters []*voltha.Adapter
-	if err := aMgr.adapterProxy.List(log.WithSpanFromContext(context.Background(), ctx), &adapters); err != nil {
-		logger.Errorw(ctx, "Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
-		return err
+func (aMgr *Manager) Stop(ctx context.Context) {
+	//	Stop all adapters
+	aMgr.lockAdapterAgentsMap.RLock()
+	defer aMgr.lockAdapterAgentsMap.RUnlock()
+	for _, adapterAgent := range aMgr.adapterAgents {
+		adapterAgent.stop(ctx)
 	}
-	if len(adapters) != 0 {
-		for _, adapter := range adapters {
-			if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
-				logger.Errorw(ctx, "failed to add adapter", log.Fields{"adapterId": adapter.Id})
-			} else {
-				logger.Debugw(ctx, "adapter added successfully", log.Fields{"adapterId": adapter.Id})
-			}
-		}
-	}
-
-	// Load the device types
-	var deviceTypes []*voltha.DeviceType
-	if err := aMgr.deviceTypeProxy.List(log.WithSpanFromContext(context.Background(), ctx), &deviceTypes); err != nil {
-		logger.Errorw(ctx, "Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
-		return err
-	}
-	if len(deviceTypes) != 0 {
-		dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
-		for _, dType := range deviceTypes {
-			logger.Debugw(ctx, "found-existing-device-types", log.Fields{"deviceTypes": dTypes})
-			dTypes.Items = append(dTypes.Items, dType)
-		}
-		return aMgr.addDeviceTypes(ctx, dTypes, false)
-	}
-
-	logger.Debug(ctx, "no-existing-device-type-found")
-
-	return nil
 }
 
-func (aMgr *Manager) updateLastAdapterCommunication(adapterID string, timestamp time.Time) {
-	aMgr.lockAdaptersMap.RLock()
-	adapterAgent, have := aMgr.adapterAgents[adapterID]
-	aMgr.lockAdaptersMap.RUnlock()
+func (aMgr *Manager) GetAdapterEndpoint(ctx context.Context, deviceID string, deviceType string) (string, error) {
+	endPoint, err := aMgr.endpointMgr.GetEndpoint(ctx, deviceID, deviceType)
+	if err != nil {
+		return "", err
+	}
+	return string(endPoint), nil
+}
+
+func (aMgr *Manager) GetAdapterWithEndpoint(ctx context.Context, endPoint string) (*voltha.Adapter, error) {
+	aMgr.lockAdapterEndPointsMap.RLock()
+	agent, have := aMgr.adapterEndpoints[Endpoint(endPoint)]
+	aMgr.lockAdapterEndPointsMap.RUnlock()
 
 	if have {
-		adapterAgent.updateCommunicationTime(timestamp)
+		return agent.getAdapter(ctx), nil
 	}
+
+	return nil, errors.New("Not found")
+}
+
+func (aMgr *Manager) GetAdapterNameWithEndpoint(ctx context.Context, endPoint string) (string, error) {
+	aMgr.lockAdapterEndPointsMap.RLock()
+	agent, have := aMgr.adapterEndpoints[Endpoint(endPoint)]
+	aMgr.lockAdapterEndPointsMap.RUnlock()
+
+	if have {
+		return agent.adapter.Id, nil
+	}
+
+	return "", errors.New("Not found")
+}
+
+func (aMgr *Manager) GetAdapterClient(_ context.Context, endpoint string) (adapter_services.AdapterServiceClient, error) {
+	if endpoint == "" {
+		return nil, errors.New("endpoint-cannot-be-empty")
+	}
+	aMgr.lockAdapterEndPointsMap.RLock()
+	defer aMgr.lockAdapterEndPointsMap.RUnlock()
+
+	if agent, have := aMgr.adapterEndpoints[Endpoint(endpoint)]; have {
+		return agent.getClient()
+	}
+
+	return nil, fmt.Errorf("Endpoint-not-found-%s", endpoint)
 }
 
 func (aMgr *Manager) addAdapter(ctx context.Context, adapter *voltha.Adapter, saveToDb bool) error {
-	aMgr.lockAdaptersMap.Lock()
-	defer aMgr.lockAdaptersMap.Unlock()
+	aMgr.lockAdapterAgentsMap.Lock()
+	aMgr.lockAdapterEndPointsMap.Lock()
+	defer aMgr.lockAdapterEndPointsMap.Unlock()
+	defer aMgr.lockAdapterAgentsMap.Unlock()
 	logger.Debugw(ctx, "adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
 		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 	if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
 		if saveToDb {
 			// Save the adapter to the KV store - first check if it already exist
-			if have, err := aMgr.adapterProxy.Get(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, &voltha.Adapter{}); err != nil {
+			if have, err := aMgr.adapterDbProxy.Get(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, &voltha.Adapter{}); err != nil {
 				logger.Errorw(ctx, "failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
 				return err
 			} else if !have {
-				if err := aMgr.adapterProxy.Set(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, adapter); err != nil {
+				if err := aMgr.adapterDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, adapter); err != nil {
 					logger.Errorw(ctx, "failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
 						"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
 					return err
@@ -155,7 +175,11 @@
 			}
 		}
 		clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
-		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter)
+		// Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
+		// This handler just log the restart event.  The actual action taken following an adapter restart
+		// will be done when an adapter re-registers itself.
+		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
+		aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
 	}
 	return nil
 }
@@ -165,10 +189,10 @@
 		return fmt.Errorf("no-device-type")
 	}
 	logger.Debugw(ctx, "adding-device-types", log.Fields{"deviceTypes": deviceTypes})
-	aMgr.lockAdaptersMap.Lock()
-	defer aMgr.lockAdaptersMap.Unlock()
-	aMgr.lockdDeviceTypeToAdapterMap.Lock()
-	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+	aMgr.lockAdapterAgentsMap.Lock()
+	defer aMgr.lockAdapterAgentsMap.Unlock()
+	aMgr.lockDeviceTypesMap.Lock()
+	defer aMgr.lockDeviceTypesMap.Unlock()
 
 	// create an in memory map to fetch the entire voltha.DeviceType from a device.Type string
 	for _, deviceType := range deviceTypes.Items {
@@ -178,13 +202,13 @@
 	if saveToDb {
 		// Save the device types to the KV store
 		for _, deviceType := range deviceTypes.Items {
-			if have, err := aMgr.deviceTypeProxy.Get(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, &voltha.DeviceType{}); err != nil {
+			if have, err := aMgr.deviceTypeDbProxy.Get(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, &voltha.DeviceType{}); err != nil {
 				logger.Errorw(ctx, "Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
 				return err
 			} else if !have {
 				//	Does not exist - save it
 				clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
-				if err := aMgr.deviceTypeProxy.Set(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, clonedDType); err != nil {
+				if err := aMgr.deviceTypeDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, clonedDType); err != nil {
 					logger.Errorw(ctx, "Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
 					return err
 				}
@@ -192,35 +216,71 @@
 			}
 		}
 	}
-
 	return nil
 }
 
-// ListAdapters returns the contents of all adapters known to the system
-func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
-	result := &voltha.Adapters{Items: []*voltha.Adapter{}}
-	aMgr.lockAdaptersMap.RLock()
-	defer aMgr.lockAdaptersMap.RUnlock()
-	for _, adapterAgent := range aMgr.adapterAgents {
-		if a := adapterAgent.getAdapter(ctx); a != nil {
-			result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
+func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
+	// Load the adapters
+	var adapters []*voltha.Adapter
+	if err := aMgr.adapterDbProxy.List(log.WithSpanFromContext(context.Background(), ctx), &adapters); err != nil {
+		logger.Errorw(ctx, "Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
+		return err
+	}
+
+	logger.Debugw(ctx, "retrieved-adapters", log.Fields{"count": len(adapters)})
+
+	if len(adapters) != 0 {
+		for _, adapter := range adapters {
+			if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
+				logger.Errorw(ctx, "failed-to-add-adapter", log.Fields{"adapterId": adapter.Id})
+			} else {
+				logger.Debugw(ctx, "adapter-added-successfully", log.Fields{"adapterId": adapter.Id})
+			}
 		}
 	}
-	return result, nil
-}
 
-func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) *voltha.Adapter {
-	aMgr.lockAdaptersMap.RLock()
-	defer aMgr.lockAdaptersMap.RUnlock()
-	if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
-		return adapterAgent.getAdapter(ctx)
+	// Load the device types
+	var deviceTypes []*voltha.DeviceType
+	if err := aMgr.deviceTypeDbProxy.List(log.WithSpanFromContext(context.Background(), ctx), &deviceTypes); err != nil {
+		logger.Errorw(ctx, "Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
+		return err
 	}
+
+	logger.Debugw(ctx, "retrieved-devicetypes", log.Fields{"count": len(deviceTypes)})
+
+	if len(deviceTypes) != 0 {
+		dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
+		for _, dType := range deviceTypes {
+			logger.Debugw(ctx, "found-existing-device-types", log.Fields{"deviceTypes": deviceTypes})
+			dTypes.Items = append(dTypes.Items, dType)
+		}
+		if err := aMgr.addDeviceTypes(ctx, dTypes, false); err != nil {
+			logger.Errorw(ctx, "failed-to-add-device-type", log.Fields{"deviceTypes": deviceTypes})
+		} else {
+			logger.Debugw(ctx, "device-type-added-successfully", log.Fields{"deviceTypes": deviceTypes})
+		}
+	}
+
+	// Start the adapter agents - this will trigger the connection to the adapter
+	aMgr.lockAdapterAgentsMap.RLock()
+	defer aMgr.lockAdapterAgentsMap.RUnlock()
+	for _, adapterAgent := range aMgr.adapterAgents {
+		subCtx := log.WithSpanFromContext(context.Background(), ctx)
+		if err := adapterAgent.start(subCtx); err != nil {
+			logger.Errorw(ctx, "failed-to-start-adapter", log.Fields{"adapter-endpoint": adapterAgent.adapterAPIEndPoint})
+		}
+	}
+
+	logger.Debug(ctx, "no-existing-device-type-found")
+
 	return nil
 }
 
-func (aMgr *Manager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
-	logger.Debugw(ctx, "RegisterAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
-		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
+func (aMgr *Manager) RegisterAdapter(ctx context.Context, registration *ic.AdapterRegistration) (*empty.Empty, error) {
+	adapter := registration.Adapter
+	deviceTypes := registration.DTypes
+	logger.Infow(ctx, "RegisterAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
 
 	if adapter.Type == "" {
 		logger.Errorw(ctx, "adapter-not-specifying-type", log.Fields{
@@ -231,15 +291,25 @@
 		return nil, status.Error(codes.InvalidArgument, "adapter-not-specifying-type")
 	}
 
-	if aMgr.getAdapter(ctx, adapter.Id) != nil {
+	if adpt, _ := aMgr.getAdapter(ctx, adapter.Id); adpt != nil {
 		//	Already registered - Adapter may have restarted.  Trigger the reconcile process for that adapter
+		logger.Warnw(ctx, "adapter-restarted", log.Fields{"adapter": adpt.Id, "endpoint": adpt.Endpoint})
+
+		// First reset the adapter connection
+		agt, err := aMgr.getAgent(ctx, adpt.Id)
+		if err != nil {
+			logger.Errorw(ctx, "no-adapter-agent", log.Fields{"error": err})
+			return nil, err
+		}
+		agt.resetConnection(ctx)
+
 		go func() {
-			err := aMgr.onAdapterRestart(log.WithSpanFromContext(context.Background(), ctx), adapter)
+			err := aMgr.onAdapterRestart(log.WithSpanFromContext(context.Background(), ctx), adpt.Endpoint)
 			if err != nil {
 				logger.Errorw(ctx, "unable-to-restart-adapter", log.Fields{"error": err})
 			}
 		}()
-		return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
+		return &empty.Empty{}, nil
 	}
 	// Save the adapter and the device types
 	if err := aMgr.addAdapter(ctx, adapter, true); err != nil {
@@ -254,16 +324,45 @@
 	logger.Debugw(ctx, "adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
 		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 
-	return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
+	// Setup the endpoints for this adapter
+	if err := aMgr.endpointMgr.RegisterAdapter(ctx, adapter, deviceTypes); err != nil {
+		logger.Errorw(ctx, "failed-to-register-adapter", log.Fields{"error": err})
+	}
+
+	// Start adapter instance - this will trigger the connection to the adapter
+	if agent, err := aMgr.getAgent(ctx, adapter.Id); agent != nil {
+		subCtx := log.WithSpanFromContext(context.Background(), ctx)
+		if err := agent.start(subCtx); err != nil {
+			logger.Errorw(ctx, "failed-to-start-adapter", log.Fields{"error": err})
+			return nil, err
+		}
+	} else {
+		logger.Fatalw(ctx, "adapter-absent", log.Fields{"error": err, "adapter": adapter.Id})
+	}
+
+	return &empty.Empty{}, nil
 }
 
-// GetAdapterType returns the name of the device adapter that service this device type
+func (aMgr *Manager) GetAdapterTypeByVendorID(vendorID string) (string, error) {
+	aMgr.lockDeviceTypesMap.RLock()
+	defer aMgr.lockDeviceTypesMap.RUnlock()
+	for _, dType := range aMgr.deviceTypes {
+		for _, v := range dType.VendorIds {
+			if v == vendorID {
+				return dType.AdapterType, nil
+			}
+		}
+	}
+	return "", fmt.Errorf("vendor id %s not found", vendorID)
+}
+
+// GetAdapterType returns the name of the device adapter that services this device type
 func (aMgr *Manager) GetAdapterType(deviceType string) (string, error) {
-	aMgr.lockdDeviceTypeToAdapterMap.Lock()
-	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
-	for _, adapterAgent := range aMgr.adapterAgents {
-		if deviceType == adapterAgent.adapter.Type {
-			return adapterAgent.adapter.Type, nil
+	aMgr.lockDeviceTypesMap.Lock()
+	defer aMgr.lockDeviceTypesMap.Unlock()
+	for _, dt := range aMgr.deviceTypes {
+		if deviceType == dt.Id {
+			return dt.AdapterType, nil
 		}
 	}
 	return "", fmt.Errorf("adapter-not-registered-for-device-type %s", deviceType)
@@ -272,8 +371,8 @@
 // ListDeviceTypes returns all the device types known to the system
 func (aMgr *Manager) ListDeviceTypes(ctx context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
 	logger.Debug(ctx, "ListDeviceTypes")
-	aMgr.lockdDeviceTypeToAdapterMap.Lock()
-	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+	aMgr.lockDeviceTypesMap.Lock()
+	defer aMgr.lockDeviceTypesMap.Unlock()
 
 	deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
 	for _, deviceType := range aMgr.deviceTypes {
@@ -283,10 +382,10 @@
 }
 
 // GetDeviceType returns the device type proto definition given the name of the device type
-func (aMgr *Manager) GetDeviceType(ctx context.Context, deviceType *voltha.ID) (*voltha.DeviceType, error) {
+func (aMgr *Manager) GetDeviceType(ctx context.Context, deviceType *common.ID) (*voltha.DeviceType, error) {
 	logger.Debugw(ctx, "GetDeviceType", log.Fields{"typeid": deviceType.Id})
-	aMgr.lockdDeviceTypeToAdapterMap.Lock()
-	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+	aMgr.lockDeviceTypesMap.Lock()
+	defer aMgr.lockDeviceTypesMap.Unlock()
 
 	dType, exist := aMgr.deviceTypes[deviceType.Id]
 	if !exist {
@@ -294,3 +393,81 @@
 	}
 	return dType, nil
 }
+
+// ListAdapters returns the contents of all adapters known to the system
+func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
+	logger.Debug(ctx, "Listing adapters")
+	result := &voltha.Adapters{Items: []*voltha.Adapter{}}
+	aMgr.lockAdapterAgentsMap.RLock()
+	defer aMgr.lockAdapterAgentsMap.RUnlock()
+	for _, adapterAgent := range aMgr.adapterAgents {
+		if a := adapterAgent.getAdapter(ctx); a != nil {
+			result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+		}
+	}
+	logger.Debugw(ctx, "Listing adapters", log.Fields{"result": result})
+	return result, nil
+}
+
+func (aMgr *Manager) getAgent(ctx context.Context, adapterID string) (*agent, error) {
+	aMgr.lockAdapterAgentsMap.RLock()
+	defer aMgr.lockAdapterAgentsMap.RUnlock()
+	if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
+		return adapterAgent, nil
+	}
+	return nil, errors.New("Not found")
+}
+
+func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) (*voltha.Adapter, error) {
+	aMgr.lockAdapterAgentsMap.RLock()
+	defer aMgr.lockAdapterAgentsMap.RUnlock()
+	if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
+		return adapterAgent.getAdapter(ctx), nil
+	}
+	return nil, errors.New("Not found")
+}
+
+// mutedAdapterRestartedHandler will be invoked by the grpc client on an adapter restart.
+// Since the Adapter will re-register itself and that will trigger the reconcile process,
+// therefore this handler does nothing, other than logging the event.
+func (aMgr *Manager) mutedAdapterRestartedHandler(ctx context.Context, endpoint string) error {
+	logger.Infow(ctx, "muted-adapter-restarted", log.Fields{"endpoint": endpoint})
+	return nil
+}
+
+func (aMgr *Manager) WaitUntilConnectionsToAdaptersAreUp(ctx context.Context, connectionRetryInterval time.Duration) error {
+	logger.Infow(ctx, "waiting-for-adapters-to-be-up", log.Fields{"retry-interval": connectionRetryInterval})
+	for {
+		aMgr.lockAdapterAgentsMap.Lock()
+		numAdapters := len(aMgr.adapterAgents)
+		if numAdapters == 0 {
+			// No adapter registered yet
+			aMgr.lockAdapterAgentsMap.Unlock()
+			logger.Info(ctx, "no-adapter-registered")
+			return nil
+		}
+		// A case of Core restart
+		agentsUp := true
+	adapterloop:
+		for _, agt := range aMgr.adapterAgents {
+			agentsUp = agentsUp && agt.IsConnectionUp()
+			if !agentsUp {
+				break adapterloop
+			}
+		}
+		aMgr.lockAdapterAgentsMap.Unlock()
+		if agentsUp {
+			logger.Infow(ctx, "adapter-connections-ready", log.Fields{"adapter-count": numAdapters})
+			return nil
+		}
+		logger.Warnw(ctx, "adapter-connections-not-ready", log.Fields{"adapter-count": numAdapters})
+		select {
+		case <-time.After(connectionRetryInterval):
+			logger.Infow(ctx, "retrying-adapter-connections", log.Fields{"adapter-count": numAdapters})
+			continue
+		case <-ctx.Done():
+			logger.Errorw(ctx, "context-timeout", log.Fields{"adapter-count": numAdapters, "err": ctx.Err()})
+			return ctx.Err()
+		}
+	}
+}