[VOL-2835] Using different topic per ONU device

Change-Id: I3e55064292f28f9bf39ad6bc75fd5758f5313317
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 1ed5b23..faece55 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -32,12 +32,6 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
-// sentinel constants
-const (
-	SentinelAdapterID    = "adapter_sentinel"
-	SentinelDevicetypeID = "device_type_sentinel"
-)
-
 // AdapterAgent represents adapter agent
 type AdapterAgent struct {
 	adapter     *voltha.Adapter
@@ -58,15 +52,6 @@
 	return &adapterAgent
 }
 
-func (aa *AdapterAgent) getDeviceType(deviceType string) *voltha.DeviceType {
-	aa.lock.RLock()
-	defer aa.lock.RUnlock()
-	if _, exist := aa.deviceTypes[deviceType]; exist {
-		return aa.deviceTypes[deviceType]
-	}
-	return nil
-}
-
 func (aa *AdapterAgent) getAdapter() *voltha.Adapter {
 	aa.lock.RLock()
 	defer aa.lock.RUnlock()
@@ -74,12 +59,6 @@
 	return aa.adapter
 }
 
-func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType) {
-	aa.lock.Lock()
-	defer aa.lock.Unlock()
-	aa.deviceTypes[deviceType.Id] = deviceType
-}
-
 // updateCommunicationTime updates the message to the specified time.
 // No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
 func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
@@ -99,7 +78,7 @@
 // AdapterManager represents adapter manager attributes
 type AdapterManager struct {
 	adapterAgents               map[string]*AdapterAgent
-	deviceTypeToAdapterMap      map[string]string
+	deviceTypes                 map[string]*voltha.DeviceType
 	clusterDataProxy            *model.Proxy
 	deviceMgr                   *DeviceManager
 	coreInstanceID              string
@@ -110,12 +89,12 @@
 
 func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
 	aMgr := &AdapterManager{
-		exitChannel:            make(chan int, 1),
-		coreInstanceID:         coreInstanceID,
-		clusterDataProxy:       cdProxy,
-		adapterAgents:          make(map[string]*AdapterAgent),
-		deviceTypeToAdapterMap: make(map[string]string),
-		deviceMgr:              deviceMgr,
+		exitChannel:      make(chan int, 1),
+		coreInstanceID:   coreInstanceID,
+		clusterDataProxy: cdProxy,
+		deviceTypes:      make(map[string]*voltha.DeviceType),
+		adapterAgents:    make(map[string]*AdapterAgent),
+		deviceMgr: deviceMgr,
 	}
 	kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
 	return aMgr
@@ -153,10 +132,6 @@
 				logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
 			}
 		}
-	} else {
-		logger.Debug("no-existing-adapter-found")
-		//	No adapter data.   In order to have a proxy setup for that path let's create a fake adapter
-		return aMgr.addAdapter(&voltha.Adapter{Id: SentinelAdapterID}, true)
 	}
 
 	// Load the device types
@@ -175,8 +150,8 @@
 	}
 
 	logger.Debug("no-existing-device-type-found")
-	//	No device types data.   In order to have a proxy setup for that path let's create a fake device type
-	return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
+
+	return nil
 }
 
 func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
@@ -192,7 +167,8 @@
 func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
 	aMgr.lockAdaptersMap.Lock()
 	defer aMgr.lockAdaptersMap.Unlock()
-	logger.Debugw("adding-adapter", log.Fields{"adapter": adapter})
+	logger.Debugw("adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 	if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
 		if saveToDb {
 			// Save the adapter to the KV store - first check if it already exist
@@ -201,10 +177,17 @@
 				return err
 			} else if !have {
 				if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "adapters", adapter.Id, adapter); err != nil {
-					logger.Errorw("failed-to-save-adapter-to-cluster-proxy", log.Fields{"error": err})
+					logger.Errorw("failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+						"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
 					return err
 				}
-				logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapter": adapter})
+				logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+					"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
+			} else {
+				log.Warnw("adding-adapter-already-in-KV-store", log.Fields{
+					"adapterName":    adapter.Id,
+					"adapterReplica": adapter.CurrentReplica,
+				})
 			}
 		}
 		clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
@@ -223,6 +206,11 @@
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 
+	// create an in memory map to fetch the entire voltha.DeviceType from a device.Type string
+	for _, deviceType := range deviceTypes.Items {
+		aMgr.deviceTypes[deviceType.Id] = deviceType
+	}
+
 	if saveToDb {
 		// Save the device types to the KV store
 		for _, deviceType := range deviceTypes.Items {
@@ -240,17 +228,7 @@
 			}
 		}
 	}
-	// and save locally
-	for _, deviceType := range deviceTypes.Items {
-		clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
-		if adapterAgent, exist := aMgr.adapterAgents[clonedDType.Adapter]; exist {
-			adapterAgent.updateDeviceType(clonedDType)
-		} else {
-			logger.Debugw("adapter-not-exist", log.Fields{"deviceTypes": deviceTypes, "adapterId": clonedDType.Adapter})
-			aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: clonedDType.Adapter}, deviceTypes)
-		}
-		aMgr.deviceTypeToAdapterMap[clonedDType.Id] = clonedDType.Adapter
-	}
+
 	return nil
 }
 
@@ -260,9 +238,7 @@
 	defer aMgr.lockAdaptersMap.RUnlock()
 	for _, adapterAgent := range aMgr.adapterAgents {
 		if a := adapterAgent.getAdapter(); a != nil {
-			if a.Id != SentinelAdapterID { // don't report the sentinel
-				result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
-			}
+			result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
 		}
 	}
 	return result, nil
@@ -278,7 +254,8 @@
 }
 
 func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
-	logger.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
+	logger.Debugw("registerAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
 
 	if aMgr.getAdapter(adapter.Id) != nil {
 		//	Already registered - Adapter may have restarted.  Trigger the reconcile process for that adapter
@@ -300,17 +277,20 @@
 		return nil, err
 	}
 
-	logger.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
+	logger.Debugw("adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 
 	return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
 }
 
-//getAdapterName returns the name of the device adapter that service this device type
-func (aMgr *AdapterManager) getAdapterName(deviceType string) (string, error) {
+// getAdapterType returns the name of the device adapter that service this device type
+func (aMgr *AdapterManager) getAdapterType(deviceType string) (string, error) {
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
-	if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
-		return adapterID, nil
+	for _, adapterAgent := range aMgr.adapterAgents {
+		if deviceType == adapterAgent.adapter.Type {
+			return adapterAgent.adapter.Type, nil
+		}
 	}
 	return "", fmt.Errorf("Adapter-not-registered-for-device-type %s", deviceType)
 }
@@ -319,16 +299,12 @@
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 
-	deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypeToAdapterMap))
-	for deviceTypeID, adapterID := range aMgr.deviceTypeToAdapterMap {
-		if adapterAgent, have := aMgr.adapterAgents[adapterID]; have {
-			if deviceType := adapterAgent.getDeviceType(deviceTypeID); deviceType != nil {
-				if deviceType.Id != SentinelDevicetypeID { // don't report the sentinel
-					deviceTypes = append(deviceTypes, deviceType)
-				}
-			}
-		}
+	deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
+
+	for _, deviceType := range aMgr.deviceTypes {
+		deviceTypes = append(deviceTypes, deviceType)
 	}
+
 	return deviceTypes
 }
 
@@ -337,10 +313,9 @@
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 
-	if adapterID, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
-		if adapterAgent := aMgr.adapterAgents[adapterID]; adapterAgent != nil {
-			return adapterAgent.getDeviceType(deviceType)
-		}
+	if deviceType, exist := aMgr.deviceTypes[deviceType]; exist {
+		return deviceType
 	}
+
 	return nil
 }
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index d3907bb..1e18ba4 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -30,14 +30,16 @@
 	deviceTopicRegistered bool
 	corePairTopic         string
 	kafkaICProxy          kafka.InterContainerProxy
+	endpointManager       kafka.EndpointManager
 }
 
 // NewAdapterProxy will return adapter proxy instance
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
 	return &AdapterProxy{
 		kafkaICProxy:          kafkaProxy,
 		corePairTopic:         corePairTopic,
 		deviceTopicRegistered: false,
+		endpointManager:       endpointManager,
 	}
 }
 
@@ -45,8 +47,14 @@
 	return kafka.Topic{Name: ap.corePairTopic}
 }
 
-func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
-	return kafka.Topic{Name: adapterName}
+func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
+
+	endpoint, err := ap.endpointManager.GetEndpoint(deviceID, adapterType)
+	if err != nil {
+		return nil, err
+	}
+
+	return &kafka.Topic{Name: string(endpoint)}, nil
 }
 
 func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
@@ -69,167 +77,210 @@
 func (ap *AdapterProxy) adoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("adoptDevice", log.Fields{"device-id": device.Id})
 	rpc := "adopt_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	logger.Debugw("adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // disableDevice invokes disable device rpc
 func (ap *AdapterProxy) disableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("disableDevice", log.Fields{"device-id": device.Id})
 	rpc := "disable_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // reEnableDevice invokes reenable device rpc
 func (ap *AdapterProxy) reEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("reEnableDevice", log.Fields{"device-id": device.Id})
 	rpc := "reenable_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // rebootDevice invokes reboot device rpc
 func (ap *AdapterProxy) rebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("rebootDevice", log.Fields{"device-id": device.Id})
 	rpc := "reboot_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // deleteDevice invokes delete device rpc
 func (ap *AdapterProxy) deleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("deleteDevice", log.Fields{"device-id": device.Id})
 	rpc := "delete_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // getOfpDeviceInfo invokes get ofp device info rpc
 func (ap *AdapterProxy) getOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("getOfpDeviceInfo", log.Fields{"device-id": device.Id})
 	rpc := "get_ofp_device_info"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // getOfpPortInfo invokes get ofp port info rpc
 func (ap *AdapterProxy) getOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("getOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // reconcileDevice invokes reconcile device rpc
 func (ap *AdapterProxy) reconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("reconcileDevice", log.Fields{"device-id": device.Id})
 	rpc := "reconcile_device"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // downloadImage invokes download image rpc
 func (ap *AdapterProxy) downloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("downloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "download_image"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // getImageDownloadStatus invokes get image download status rpc
 func (ap *AdapterProxy) getImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // cancelImageDownload invokes cancel image download rpc
 func (ap *AdapterProxy) cancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("cancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // activateImageUpdate invokes activate image update rpc
 func (ap *AdapterProxy) activateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("activateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // revertImageUpdate invokes revert image update rpc
 func (ap *AdapterProxy) revertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("revertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: download},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("packetOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
-	toTopic := ap.getAdapterTopic(deviceType)
+	toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "receive_packet_out"
 	args := []*kafka.KVArg{
 		{Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
@@ -237,13 +288,16 @@
 		{Key: "packet", Value: packet},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
 }
 
 // updateFlowsBulk invokes update flows bulk rpc
 func (ap *AdapterProxy) updateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("updateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "update_flows_bulk"
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
@@ -252,7 +306,7 @@
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // updateFlowsIncremental invokes update flows incremental rpc
@@ -266,7 +320,10 @@
 			"group-to-delete-count": len(groupChanges.ToRemove.Items),
 			"group-to-update-count": len(groupChanges.ToUpdate.Items),
 		})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "update_flows_incrementally"
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
@@ -275,83 +332,101 @@
 		{Key: "flow_metadata", Value: flowMetadata},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // updatePmConfigs invokes update pm configs rpc
 func (ap *AdapterProxy) updatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("updatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	rpc := "Update_pm_config"
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "pm_configs", Value: pmConfigs},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // simulateAlarm invokes simulate alarm rpc
 func (ap *AdapterProxy) simulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("simulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
 	rpc := "simulate_alarm"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 		{Key: "request", Value: simulateReq},
 	}
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "disable_port"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
 		{Key: "port", Value: port},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "enable_port"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
 		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
 		{Key: "port", Value: port},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
 }
 
 // childDeviceLost invokes child device_lost rpc
-func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, pDeviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
-	logger.Debugw("childDeviceLost", log.Fields{"parent-device-id": pDeviceID, "parent-port-no": pPortNo, "onu-id": onuID})
+func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
+	logger.Debugw("childDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
 	rpc := "child_device_lost"
-	toTopic := ap.getAdapterTopic(deviceType)
+	toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+	if err != nil {
+		return nil, err
+	}
 	args := []*kafka.KVArg{
-		{Key: "pDeviceId", Value: &ic.StrType{Val: pDeviceID}},
+		{Key: "pDeviceId", Value: &ic.StrType{Val: deviceID}},
 		{Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
 		{Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
 	}
 	replyToTopic := ap.getCoreTopic()
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
 }
 
 func (ap *AdapterProxy) startOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
 	logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
 	rpc := "start_omci_test"
-	toTopic := ap.getAdapterTopic(device.Adapter)
+	toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+	if err != nil {
+		return nil, err
+	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
 	// TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
 	//   than including the whole request, which is (deviceid, uuid)
-	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id,
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
 		&kafka.KVArg{Key: "device", Value: device},
 		&kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
 }
diff --git a/rw_core/core/adapter_proxy_test.go b/rw_core/core/adapter_proxy_test.go
index 8784ff2..718cc30 100755
--- a/rw_core/core/adapter_proxy_test.go
+++ b/rw_core/core/adapter_proxy_test.go
@@ -24,7 +24,7 @@
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -60,7 +60,7 @@
 	var err error
 
 	// Create the KV client
-	kc = lm.NewKafkaClient()
+	kc = mock_kafka.NewKafkaClient()
 
 	// Setup core inter-container proxy and core request handler
 	coreKafkaICProxy = kafka.NewInterContainerProxy(
@@ -98,7 +98,7 @@
 }
 
 func TestCreateAdapterProxy(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	assert.NotNil(t, ap)
 }
 
@@ -119,7 +119,7 @@
 
 func testSimpleRequests(t *testing.T) {
 	type simpleRequest func(context.Context, *voltha.Device) (chan *kafka.RpcResponse, error)
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	simpleRequests := []simpleRequest{
 		ap.adoptDevice,
 		ap.disableDevice,
@@ -162,7 +162,7 @@
 }
 
 func testGetSwitchCapabilityFromAdapter(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
@@ -179,7 +179,7 @@
 }
 
 func testGetPortInfoFromAdapter(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
@@ -197,7 +197,7 @@
 }
 
 func testPacketOut(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	outPort := uint32(1)
 	packet, err := getRandomBytes(50)
@@ -211,7 +211,7 @@
 }
 
 func testFlowUpdates(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	_, err := ap.updateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
 	assert.Nil(t, err)
@@ -226,7 +226,7 @@
 }
 
 func testPmUpdates(t *testing.T) {
-	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
+	ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
@@ -236,7 +236,7 @@
 	assert.Nil(t, err)
 }
 
-func TestSuite(t *testing.T) {
+func TestSuiteAdapterProxy(t *testing.T) {
 	//1. Test the simple requests first
 	testSimpleRequests(t)
 
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index 9771619..7591c18 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -30,7 +30,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
 	"google.golang.org/grpc/codes"
@@ -75,7 +75,7 @@
 }
 
 //startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*lm.EtcdServer, int, error) {
+func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
 	kvClientPort, err := freeport.GetFreePort()
 	if err != nil {
 		return nil, 0, err
@@ -84,14 +84,14 @@
 	if err != nil {
 		return nil, 0, err
 	}
-	etcdServer := lm.StartEtcdServer(lm.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
 	if etcdServer == nil {
 		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
 	}
 	return etcdServer, kvClientPort, nil
 }
 
-func stopEmbeddedEtcdServer(server *lm.EtcdServer) {
+func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
 	if server != nil {
 		server.Stop()
 	}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 69391fb..dccd271 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -254,15 +254,16 @@
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
 	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
 	// with the adapter then we need to know the adapter that will handle this request
-	adapterName, err := agent.adapterMgr.getAdapterName(cloned.Type)
+	adapterName, err := agent.adapterMgr.getAdapterType(cloned.Type)
 	if err != nil {
 		return err
 	}
 	cloned.Adapter = adapterName
 
 	if cloned.AdminState == voltha.AdminState_ENABLED {
-		logger.Debugw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
-		return nil
+		logger.Warnw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
+		err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s ", cloned.Id))
+		return err
 	}
 
 	if cloned.AdminState == voltha.AdminState_DELETED {
@@ -1665,14 +1666,18 @@
 	}
 
 	device := agent.getDeviceWithoutLock()
-	adapterName, err := agent.adapterMgr.getAdapterName(device.Type)
-	if err != nil {
-		agent.requestQueue.RequestComplete()
-		return nil, err
+
+	if device.Adapter == "" {
+		adapterName, err := agent.adapterMgr.getAdapterType(device.Type)
+		if err != nil {
+			agent.requestQueue.RequestComplete()
+			return nil, err
+		}
+
+		device.Adapter = adapterName
 	}
 
 	// Send request to the adapter
-	device.Adapter = adapterName
 	ch, err := agent.adapterProxy.startOmciTest(ctx, device, omcitestrequest)
 	agent.requestQueue.RequestComplete()
 	if err != nil {
diff --git a/rw_core/core/device_agent_test.go b/rw_core/core/device_agent_test.go
index 4f1506f..f8fb810 100755
--- a/rw_core/core/device_agent_test.go
+++ b/rw_core/core/device_agent_test.go
@@ -21,7 +21,8 @@
 	"github.com/opencord/voltha-go/rw_core/config"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
@@ -35,7 +36,7 @@
 )
 
 type DATest struct {
-	etcdServer     *lm.EtcdServer
+	etcdServer     *mock_etcd.EtcdServer
 	core           *Core
 	kClient        kafka.Client
 	kvClientPort   int
@@ -57,7 +58,7 @@
 		logger.Fatal(err)
 	}
 	// Create the kafka client
-	test.kClient = lm.NewKafkaClient()
+	test.kClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index afe84e8..18593d8 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -56,12 +56,15 @@
 }
 
 func newDeviceManager(core *Core) *DeviceManager {
+
+	endpointManager := kafka.NewEndpointManager(&core.backend)
+
 	var deviceMgr DeviceManager
 	deviceMgr.core = core
 	deviceMgr.exitChannel = make(chan int, 1)
 	deviceMgr.rootDevices = make(map[string]bool)
 	deviceMgr.kafkaICProxy = core.kmp
-	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic)
+	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic, endpointManager)
 	deviceMgr.coreInstanceID = core.instanceID
 	deviceMgr.clusterDataProxy = core.clusterDataProxy
 	deviceMgr.adapterMgr = core.adapterMgr
@@ -186,7 +189,6 @@
 	} else {
 		res = status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-
 	sendResponse(ctx, ch, res)
 }
 
@@ -601,7 +603,8 @@
 
 // adapterRestarted is invoked whenever an adapter is restarted
 func (dMgr *DeviceManager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
-	logger.Debugw("adapter-restarted", log.Fields{"adapter": adapter.Id})
+	logger.Debugw("adapter-restarted", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 
 	// Let's reconcile the device managed by this Core only
 	if len(dMgr.rootDevices) == 0 {
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 0490c5f..e54c14c 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -34,7 +34,8 @@
 	cm "github.com/opencord/voltha-go/rw_core/mocks"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/version"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -45,7 +46,7 @@
 )
 
 type NBTest struct {
-	etcdServer        *lm.EtcdServer
+	etcdServer        *mock_etcd.EtcdServer
 	core              *Core
 	kClient           kafka.Client
 	kvClientPort      int
@@ -69,7 +70,7 @@
 		logger.Fatal(err)
 	}
 	// Create the kafka client
-	test.kClient = lm.NewKafkaClient()
+	test.kClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-nbi-test"
@@ -114,9 +115,13 @@
 
 	//	Register the adapter
 	registrationData := &voltha.Adapter{
-		Id:      nb.oltAdapterName,
-		Vendor:  "Voltha-olt",
-		Version: version.VersionInfo.Version,
+		Id:             nb.oltAdapterName,
+		Vendor:         "Voltha-olt",
+		Version:        version.VersionInfo.Version,
+		Type:           nb.oltAdapterName,
+		CurrentReplica: 1,
+		TotalReplicas:  1,
+		Endpoint:       nb.oltAdapterName,
 	}
 	types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
 	deviceTypes := &voltha.DeviceTypes{Items: types}
@@ -134,9 +139,13 @@
 
 	//	Register the adapter
 	registrationData = &voltha.Adapter{
-		Id:      nb.onuAdapterName,
-		Vendor:  "Voltha-onu",
-		Version: version.VersionInfo.Version,
+		Id:             nb.onuAdapterName,
+		Vendor:         "Voltha-onu",
+		Version:        version.VersionInfo.Version,
+		Type:           nb.onuAdapterName,
+		CurrentReplica: 1,
+		TotalReplicas:  1,
+		Endpoint:       nb.onuAdapterName,
 	}
 	types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
 	deviceTypes = &voltha.DeviceTypes{Items: types}
@@ -1108,7 +1117,7 @@
 	assert.Nil(t, err)
 }
 
-func TestSuite1(t *testing.T) {
+func TestSuiteNbiApiHandler(t *testing.T) {
 	f, err := os.Create("profile.cpu")
 	if err != nil {
 		logger.Fatalf("could not create CPU profile: %v\n ", err)
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 175fe06..70d809a 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -27,7 +27,8 @@
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
@@ -357,7 +358,7 @@
 }
 
 type LDATest struct {
-	etcdServer     *lm.EtcdServer
+	etcdServer     *mock_etcd.EtcdServer
 	core           *Core
 	kClient        kafka.Client
 	kvClientPort   int
@@ -380,7 +381,7 @@
 		logger.Fatal(err)
 	}
 	// Create the kafka client
-	test.kClient = lm.NewKafkaClient()
+	test.kClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"