[VOL-1442]  This commit handles adapter registration end to end.
It introduces an adapter manager to handle all adapter admin
requests.  Some protos have been cleaned as well.

Change-Id: If75d4f7665c03e841d57f5621c30301940d04d93
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
new file mode 100644
index 0000000..c0ec1e9
--- /dev/null
+++ b/rw_core/core/adapter_manager.go
@@ -0,0 +1,370 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/gogo/protobuf/proto"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"reflect"
+	"sync"
+)
+
+const (
+	SENTINEL_ADAPTER_ID = "adapter_sentinel"
+	SENTINEL_DEVICETYPE_ID = "device_type_sentinel"
+
+)
+
+type AdapterAgent struct {
+	adapter *voltha.Adapter
+	deviceTypes map[string]*voltha.DeviceType
+	lock sync.RWMutex
+}
+
+func newAdapterAgent(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *AdapterAgent {
+	var adapterAgent AdapterAgent
+	adapterAgent.adapter = adapter
+	adapterAgent.lock = sync.RWMutex{}
+	adapterAgent.deviceTypes = make(map[string]*voltha.DeviceType)
+	if deviceTypes != nil {
+		for _, dType := range deviceTypes.Items {
+			adapterAgent.deviceTypes[dType.Id] = dType
+		}
+	}
+	return &adapterAgent
+}
+
+// Returns true if this device agent can handle this device Type
+func (aa *AdapterAgent) handlesDeviceType(deviceType string) bool {
+	aa.lock.RLock()
+	defer aa.lock.RUnlock()
+	_, exist := aa.deviceTypes[deviceType]
+	return exist
+}
+
+func (aa *AdapterAgent) getDeviceType(deviceType string) *voltha.DeviceType {
+	aa.lock.RLock()
+	defer aa.lock.RUnlock()
+	if _, exist := aa.deviceTypes[deviceType]; exist {
+		return aa.deviceTypes[deviceType]
+	}
+	return nil
+}
+
+func (aa *AdapterAgent) getAdapter() *voltha.Adapter {
+	aa.lock.RLock()
+	defer aa.lock.RUnlock()
+	log.Debugw("getAdapter", log.Fields{"adapter": aa.adapter})
+	return aa.adapter
+}
+
+func (aa *AdapterAgent) updateAdapter(adapter *voltha.Adapter) {
+	aa.lock.RLock()
+	defer aa.lock.RUnlock()
+	aa.adapter = adapter
+}
+
+func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType)  {
+	aa.lock.RLock()
+	defer aa.lock.RUnlock()
+	aa.deviceTypes[deviceType.Id] = deviceType
+}
+
+type AdapterManager struct {
+	adapterAgents               map[string]*AdapterAgent
+	deviceTypeToAdapterMap      map[string]string
+	clusterDataProxy            *model.Proxy
+	adapterProxy                *model.Proxy
+	deviceTypeProxy             *model.Proxy
+	coreInstanceId              string
+	exitChannel                 chan int
+	lockAdaptersMap             sync.RWMutex
+	lockdDeviceTypeToAdapterMap sync.RWMutex
+}
+
+func newAdapterManager(cdProxy *model.Proxy, coreInstanceId string) *AdapterManager {
+	var adapterMgr AdapterManager
+	adapterMgr.exitChannel = make(chan int, 1)
+	adapterMgr.coreInstanceId = coreInstanceId
+	adapterMgr.clusterDataProxy = cdProxy
+	adapterMgr.adapterAgents = make(map[string]*AdapterAgent)
+	adapterMgr.deviceTypeToAdapterMap = make(map[string]string)
+	adapterMgr.lockAdaptersMap = sync.RWMutex{}
+	adapterMgr.lockdDeviceTypeToAdapterMap = sync.RWMutex{}
+	return &adapterMgr
+}
+
+func (aMgr *AdapterManager) start(ctx context.Context)  {
+	log.Info("starting-adapter-manager")
+
+	// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
+	// created if there are no data in the dB to start
+	aMgr.loadAdaptersAndDevicetypesInMemory()
+
+	//// Create the proxies
+	aMgr.adapterProxy = aMgr.clusterDataProxy.Root.CreateProxy("/adapters", false)
+	aMgr.deviceTypeProxy = aMgr.clusterDataProxy.Root.CreateProxy("/device_types", false)
+
+	// Register the callbacks
+	aMgr.adapterProxy.RegisterCallback(model.POST_UPDATE, aMgr.adapterUpdated)
+	aMgr.deviceTypeProxy.RegisterCallback(model.POST_UPDATE, aMgr.deviceTypesUpdated)
+
+	log.Info("adapter-manager-started")
+}
+
+func (aMgr *AdapterManager) stop(ctx context.Context) {
+	log.Info("stopping-device-manager")
+	aMgr.exitChannel <- 1
+	log.Info("device-manager-stopped")
+}
+
+//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
+func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() {
+	// Load the adapters
+	if adaptersIf := aMgr.clusterDataProxy.Get("/adapters", 0, false, ""); adaptersIf != nil {
+		for _, adapterIf := range adaptersIf.([]interface{}) {
+			if adapter, ok := adapterIf.(*voltha.Adapter); ok {
+				log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
+				aMgr.addAdapter(adapter, false)
+			}
+		}
+	} else {
+		log.Debug("no-existing-adapter-found")
+		//	No adapter data.   In order to have a proxy setup for that path let's create a fake adapter
+		aMgr.addAdapter(&voltha.Adapter{Id: SENTINEL_ADAPTER_ID}, true)
+	}
+
+	// Load the device types
+	if deviceTypesIf := aMgr.clusterDataProxy.Get("/device_types", 0, false, ""); deviceTypesIf != nil {
+		dTypes := &voltha.DeviceTypes{Items:[]*voltha.DeviceType{}}
+		for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
+			if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
+				log.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
+				dTypes.Items = append(dTypes.Items, dType)
+			}
+		}
+		aMgr.addDeviceTypes(dTypes, false)
+	} else {
+		log.Debug("no-existing-device-type-found")
+		//	No device types data.   In order to have a proxy setup for that path let's create a fake device type
+		aMgr.addDeviceTypes(&voltha.DeviceTypes{Items:[]*voltha.DeviceType{&voltha.DeviceType{Id:SENTINEL_DEVICETYPE_ID, Adapter:SENTINEL_ADAPTER_ID}}}, true)
+	}
+}
+
+func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) {
+	aMgr.lockAdaptersMap.Lock()
+	defer aMgr.lockAdaptersMap.Unlock()
+	log.Debugw("adding-adapter", log.Fields{"adapter": adapter})
+	if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
+		clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
+		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, nil)
+		if saveToDb {
+			// Save the adapter to the KV store - first check if it already exist
+			if kvAdapter := aMgr.clusterDataProxy.Get("/adapters/"+adapter.Id, 0, false, ""); kvAdapter == nil {
+				if added := aMgr.clusterDataProxy.AddWithID("/adapters", adapter.Id, clonedAdapter, ""); added == nil {
+					//TODO:  Errors when saving to KV would require a separate go routine to be launched and try the saving again
+					log.Errorw("failed-to-save-adapter", log.Fields{"adapter": adapter})
+				} else {
+					log.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapter": adapter})
+				}
+			}
+		}
+	}
+}
+
+
+func (aMgr *AdapterManager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) {
+	if deviceTypes == nil {
+		return
+	}
+	log.Debugw("adding-device-types", log.Fields{"deviceTypes": deviceTypes})
+	aMgr.lockAdaptersMap.Lock()
+	defer aMgr.lockAdaptersMap.Unlock()
+	aMgr.lockdDeviceTypeToAdapterMap.Lock()
+	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+	for _, deviceType := range deviceTypes.Items {
+		clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
+		if adapterAgent, exist := aMgr.adapterAgents[clonedDType.Adapter]; exist {
+			adapterAgent.updateDeviceType(clonedDType)
+		} else {
+			log.Debugw("adapter-not-exist", log.Fields{"deviceTypes": deviceTypes, "adapterId": clonedDType.Adapter})
+			aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id:clonedDType.Adapter}, deviceTypes)
+		}
+		aMgr.deviceTypeToAdapterMap[clonedDType.Id] = clonedDType.Adapter
+	}
+	if saveToDb {
+		// Save the device types to the KV store as well
+		for _, deviceType := range deviceTypes.Items {
+			if dType := aMgr.clusterDataProxy.Get("/device_types/"+deviceType.Id, 0, false, ""); dType == nil {
+				//	Does not exist - save it
+				clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
+				if added := aMgr.clusterDataProxy.AddWithID("/device_types", deviceType.Id, clonedDType, ""); added == nil {
+					log.Errorw("failed-to-save-deviceType", log.Fields{"deviceType": deviceType})
+				} else {
+					log.Debugw("device-type-saved-to-KV-Store", log.Fields{"deviceType": deviceType})
+				}
+			}
+		}
+	}
+}
+
+func (aMgr *AdapterManager) listAdapters(ctx context.Context) (*voltha.Adapters, error) {
+	result := &voltha.Adapters{Items:[]*voltha.Adapter{}}
+	aMgr.lockAdaptersMap.Lock()
+	defer aMgr.lockAdaptersMap.Unlock()
+	for _, adapterAgent := range aMgr.adapterAgents {
+		if a := adapterAgent.getAdapter(); a != nil {
+			result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+		}
+	}
+	return result, nil
+}
+
+func (aMgr *AdapterManager) deleteAdapter(adapterId string) {
+	aMgr.lockAdaptersMap.Lock()
+	defer aMgr.lockAdaptersMap.Unlock()
+	delete(aMgr.adapterAgents, adapterId)
+}
+
+func (aMgr *AdapterManager) getAdapter(adapterId string) *voltha.Adapter {
+	aMgr.lockAdaptersMap.Lock()
+	defer aMgr.lockAdaptersMap.Unlock()
+	if adapterAgent, ok := aMgr.adapterAgents[adapterId]; ok {
+		return adapterAgent.getAdapter()
+	}
+	return nil
+}
+
+//updateAdapter updates an adapter if it exist.  Otherwise, it creates it.
+func (aMgr *AdapterManager) updateAdapter(adapter *voltha.Adapter)  {
+	aMgr.lockAdaptersMap.Lock()
+	defer aMgr.lockAdaptersMap.Unlock()
+	if adapterAgent, ok := aMgr.adapterAgents[adapter.Id]; ok {
+		adapterAgent.updateAdapter(adapter)
+	} else {
+		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(adapter, nil)
+	}
+}
+
+//updateDeviceType updates an adapter if it exist.  Otherwise, it creates it.
+func (aMgr *AdapterManager) updateDeviceType(deviceType *voltha.DeviceType)  {
+	aMgr.lockAdaptersMap.Lock()
+	defer aMgr.lockAdaptersMap.Unlock()
+	aMgr.lockdDeviceTypeToAdapterMap.Lock()
+	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+	if adapterAgent, exist := aMgr.adapterAgents[deviceType.Adapter]; !exist {
+		adapterAgent.updateDeviceType(deviceType)
+	} else {
+		aMgr.adapterAgents[deviceType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: deviceType.Adapter},
+														&voltha.DeviceTypes{Items:[]*voltha.DeviceType{deviceType}})
+	}
+	aMgr.deviceTypeToAdapterMap[deviceType.Id] = deviceType.Adapter
+}
+
+func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes  *voltha.DeviceTypes) *voltha.CoreInstance {
+	log.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
+
+	if aMgr.getAdapter(adapter.Id) != nil {
+		//	Already registered
+		return &voltha.CoreInstance{InstanceId:aMgr.coreInstanceId}
+	}
+	// Save the adapter and the device types
+	aMgr.addAdapter(adapter, true)
+	aMgr.addDeviceTypes(deviceTypes, true)
+
+	log.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
+
+	return &voltha.CoreInstance{InstanceId:aMgr.coreInstanceId}
+}
+
+//getAdapterName returns the name of the device adapter that service this device type
+func (aMgr *AdapterManager) getAdapterName(deviceType string) (string, error) {
+	aMgr.lockdDeviceTypeToAdapterMap.Lock()
+	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+	if adapterId, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
+		return adapterId, nil
+	}
+	return "", errors.New(fmt.Sprintf("Adapter-not-registered-for-device-type %s", deviceType))
+}
+
+// getDeviceType returns the device type proto definition given the name of the device type
+func (aMgr *AdapterManager) getDeviceType(deviceType string)  *voltha.DeviceType {
+	aMgr.lockdDeviceTypeToAdapterMap.Lock()
+	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+	if adapterId, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
+		if adapterAgent, _ := aMgr.adapterAgents[adapterId]; adapterAgent != nil {
+			return adapterAgent.getDeviceType(deviceType)
+		}
+	}
+	return nil
+}
+
+//adapterUpdated is a callback invoked when an adapter change has been noticed
+func (aMgr *AdapterManager) adapterUpdated(args ...interface{}) interface{} {
+	log.Debugw("updateAdapter-callback", log.Fields{"argsLen": len(args)})
+
+	var previousData *voltha.Adapters
+	var latestData *voltha.Adapters
+
+	var ok bool
+	if previousData, ok = args[0].(*voltha.Adapters); !ok {
+		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+	}
+	if latestData, ok = args[1].(*voltha.Adapters); !ok {
+		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+	}
+
+	if reflect.DeepEqual(previousData.Items, latestData.Items) {
+		log.Debug("update-not-required")
+		return nil
+	}
+
+	for _, adapter := range latestData.Items {
+		aMgr.updateAdapter(adapter)
+	}
+	return nil
+}
+
+//deviceTypesUpdated is a callback invoked when a device type change has been noticed
+func (aMgr *AdapterManager) deviceTypesUpdated(args ...interface{}) interface{} {
+	log.Debugw("deviceTypesUpdated-callback", log.Fields{"argsLen": len(args)})
+
+	var previousData *voltha.DeviceTypes
+	var latestData *voltha.DeviceTypes
+
+	var ok bool
+	if previousData, ok = args[0].(*voltha.DeviceTypes); !ok {
+		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+	}
+	if latestData, ok = args[1].(*voltha.DeviceTypes); !ok {
+		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+	}
+
+	if reflect.DeepEqual(previousData.Items, latestData.Items) {
+		log.Debug("update-not-required")
+		return nil
+	}
+
+	for _, dType := range latestData.Items {
+		aMgr.updateDeviceType(dType)
+	}
+	return nil
+}
\ No newline at end of file
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 9d029fc..9ddda85 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -54,22 +54,10 @@
 	}
 }
 
-//func kafka.CreateSubTopic(args ...string) kafka.Topic{
-//	topic := ""
-//	for index , arg := range args {
-//		if index == 0 {
-//			topic = arg
-//		} else {
-//			topic = fmt.Sprintf("%s_%s",  topic, arg)
-//		}
-//	}
-//	return kafka.Topic{Name:topic}
-//}
-
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
 	rpc := "adopt_device"
-	topic := kafka.Topic{Name: device.Type}
+	topic := kafka.Topic{Name: device.Adapter}
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -83,14 +71,6 @@
 	}
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
 	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
-	//if success {
-	//	// From now on, any unsolicited requests from the adapters for this device will come over the device topic.
-	//	// We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
-	//	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
-	//		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
-	//		return err
-	//	}
-	//}
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
@@ -99,7 +79,7 @@
 	rpc := "disable_device"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -115,7 +95,7 @@
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reenable_device"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -131,7 +111,7 @@
 func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reboot_device"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -147,7 +127,7 @@
 func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
 	rpc := "delete_device"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -169,7 +149,7 @@
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
 	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -200,7 +180,7 @@
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
 	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -269,7 +249,7 @@
 func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "download_image"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -290,7 +270,7 @@
 func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
 	log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -327,7 +307,7 @@
 func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -348,7 +328,7 @@
 func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -369,7 +349,7 @@
 func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -422,7 +402,7 @@
 
 func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -447,7 +427,7 @@
 
 func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
 	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 6893179..98cc688 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -33,17 +33,19 @@
 	coreInstanceId   string
 	deviceMgr        *DeviceManager
 	lDeviceMgr       *LogicalDeviceManager
+	adapterMgr *AdapterManager
 	localDataProxy   *model.Proxy
 	clusterDataProxy *model.Proxy
 }
 
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
 	var proxy AdapterRequestHandlerProxy
 	proxy.coreInstanceId = coreInstanceId
 	proxy.deviceMgr = dMgr
 	proxy.lDeviceMgr = ldMgr
 	proxy.clusterDataProxy = cdProxy
 	proxy.localDataProxy = ldProxy
+	proxy.adapterMgr = aMgr
 	return &proxy
 }
 
@@ -70,12 +72,11 @@
 		}
 	}
 	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "coreId": rhp.coreInstanceId})
-	// TODO process the request and store the data in the KV store
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
 	}
-	return &voltha.CoreInstance{InstanceId: rhp.coreInstanceId}, nil
+	return rhp.adapterMgr.registerAdapter(adapter, deviceTypes), nil
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 1496200..c81141b 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -33,6 +33,7 @@
 	logicalDeviceMgr  *LogicalDeviceManager
 	grpcServer        *grpcserver.GrpcServer
 	grpcNBIAPIHandler *APIHandler
+	adapterMgr *AdapterManager
 	config            *config.RWCoreFlags
 	kmp               *kafka.InterContainerProxy
 	clusterDataRoot   model.Root
@@ -80,14 +81,16 @@
 		log.Fatal("Failure-starting-kafkaMessagingProxy")
 	}
 	log.Info("values", log.Fields{"kmp": core.kmp})
-	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.instanceId)
+	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
+	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
 	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
-	if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+	if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
 		log.Fatal("Failure-registering-adapterRequestHandler")
 	}
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
 	go core.startGRPCService(ctx)
+	go core.startAdapterManager(ctx)
 
 	log.Info("adaptercore-started")
 }
@@ -110,7 +113,7 @@
 	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
 	log.Info("grpc-server-created")
 
-	core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+	core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
 	core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
 	//	Create a function to register the core GRPC service with the GRPC server
 	f := func(gs *grpc.Server) {
@@ -152,8 +155,8 @@
 }
 
 func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
-	cdProxy *model.Proxy, ldProxy *model.Proxy) error {
-	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, cdProxy, ldProxy)
+	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) error {
+	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy)
 	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
 
 	log.Info("request-handlers")
@@ -164,13 +167,19 @@
 	// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
 	// callbacks.  For now, until the model is ready, devicemanager will keep a reference to the
 	// logicaldevicemanager to initiate the creation of logical devices
-	log.Info("starting-DeviceManager")
+	log.Info("DeviceManager-Starting...")
 	core.deviceMgr.start(ctx, core.logicalDeviceMgr)
-	log.Info("started-DeviceManager")
+	log.Info("DeviceManager-Started")
 }
 
 func (core *Core) startLogicalDeviceManager(ctx context.Context) {
-	log.Info("starting-Logical-DeviceManager")
+	log.Info("Logical-DeviceManager-Starting...")
 	core.logicalDeviceMgr.start(ctx)
-	log.Info("started-Logical-DeviceManager")
+	log.Info("Logical-DeviceManager-Started")
 }
+
+func (core *Core) startAdapterManager(ctx context.Context) {
+	log.Info("Adapter-Manager-Starting...")
+	core.adapterMgr.start(ctx)
+	log.Info("Adapter-Manager-Started")
+}
\ No newline at end of file
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 31720c1..8bf8664 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -36,6 +36,7 @@
 	deviceType       string
 	lastData         *voltha.Device
 	adapterProxy     *AdapterProxy
+	adapterMgr *AdapterManager
 	deviceMgr        *DeviceManager
 	clusterDataProxy *model.Proxy
 	deviceProxy      *model.Proxy
@@ -66,6 +67,7 @@
 	agent.deviceType = cloned.Type
 	agent.lastData = cloned
 	agent.deviceMgr = deviceMgr
+	agent.adapterMgr = deviceMgr.adapterMgr
 	agent.exitChannel = make(chan int, 1)
 	agent.clusterDataProxy = cdProxy
 	agent.lockDevice = sync.RWMutex{}
@@ -136,9 +138,20 @@
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debugw("enableDevice", log.Fields{"id": agent.deviceId})
+
 	if device, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
+		// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
+		// pre-provisionned with the required adapter not registered.   At this stage, since we need to communicate
+		// with the adapter then we need to know the adapter that will handle this request
+		if adapterName, err := agent.adapterMgr.getAdapterName(device.Type); err != nil {
+			log.Warnw("no-adapter-registered-for-device-type", log.Fields{"deviceType": device.Type, "deviceAdapter": device.Adapter})
+			return err
+		} else {
+			device.Adapter = adapterName
+		}
+
 		if device.AdminState == voltha.AdminState_ENABLED {
 			log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
 			//TODO:  Needs customized error message
@@ -795,7 +808,7 @@
 }
 
 //flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapters
+//to the adapterAgents
 func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
@@ -829,11 +842,9 @@
 	}
 	groups := device.FlowGroups
 
-	// Send update to adapters
-	// TODO: Check whether the device supports incremental flow changes
-	// Assume false for test
-	acceptsAddRemoveFlowUpdates := false
-	if !acceptsAddRemoveFlowUpdates {
+	// Send update to adapterAgents
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if !dType.AcceptsAddRemoveFlowUpdates {
 		if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
 			log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
 			return err
@@ -871,7 +882,7 @@
 }
 
 //groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapters
+//to the adapterAgents
 func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
@@ -905,7 +916,7 @@
 	}
 	flows := device.Flows
 
-	// Send update to adapters
+	// Send update to adapterAgents
 	// TODO: Check whether the device supports incremental flow changes
 	// Assume false for test
 	acceptsAddRemoveFlowUpdates := false
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 76475f9..376433b 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -34,6 +34,7 @@
 type DeviceManager struct {
 	deviceAgents        map[string]*DeviceAgent
 	adapterProxy        *AdapterProxy
+	adapterMgr *AdapterManager
 	logicalDeviceMgr    *LogicalDeviceManager
 	kafkaICProxy        *kafka.InterContainerProxy
 	stateTransitions    *TransitionMap
@@ -43,7 +44,7 @@
 	lockDeviceAgentsMap sync.RWMutex
 }
 
-func newDeviceManager(kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, coreInstanceId string) *DeviceManager {
+func newDeviceManager(kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, adapterMgr *AdapterManager, coreInstanceId string) *DeviceManager {
 	var deviceMgr DeviceManager
 	deviceMgr.exitChannel = make(chan int, 1)
 	deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
@@ -51,6 +52,7 @@
 	deviceMgr.kafkaICProxy = kafkaICProxy
 	deviceMgr.coreInstanceId = coreInstanceId
 	deviceMgr.clusterDataProxy = cdProxy
+	deviceMgr.adapterMgr= adapterMgr
 	deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
 	return &deviceMgr
 }
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index c65178d..877fcb4 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -47,6 +47,7 @@
 type APIHandler struct {
 	deviceMgr        *DeviceManager
 	logicalDeviceMgr *LogicalDeviceManager
+	adapterMgr *AdapterManager
 	packetInQueue    *queue.Queue
 	coreInCompetingMode bool
 	longRunningRequestTimeout int64
@@ -54,10 +55,11 @@
 	da.DefaultAPIHandler
 }
 
-func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
 	handler := &APIHandler{
 		deviceMgr:        deviceMgr,
 		logicalDeviceMgr: lDeviceMgr,
+		adapterMgr:adapterMgr,
 		coreInCompetingMode:inCompetingMode,
 		longRunningRequestTimeout:longRunningRequestTimeout,
 		defaultRequestTimeout:defaultRequestTimeout,
@@ -301,6 +303,13 @@
 	return handler.logicalDeviceMgr.listLogicalDevices()
 }
 
+
+// ListAdapters returns the contents of all adapters known to the system
+func (handler *APIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
+	log.Debug("ListDevices")
+	return handler.adapterMgr.listAdapters(ctx)
+}
+
 // ListLogicalDevicePorts must be implemented in the read-only containers - should it also be implemented here?
 func (handler *APIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
 	log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 19eb6a4..18bc30a 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -291,7 +291,7 @@
 		// Update the device routes - let it run in its own go routine as it can take time
 		go agent.updateRoutes()
 	}
-	return status.Errorf(codes.NotFound, "%s", childDevice.Id)
+	return nil
 }
 
 func (ldMgr *LogicalDeviceManager) updateFlowTable(ctx context.Context, id string, flow *openflow_13.OfpFlowMod, ch chan interface{}) {