[VOL-1442]  This commit handles adapter registration end to end.
It introduces an adapter manager to handle all adapter admin
requests.  Some protos have been cleaned as well.

Change-Id: If75d4f7665c03e841d57f5621c30301940d04d93
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 9d029fc..9ddda85 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -54,22 +54,10 @@
 	}
 }
 
-//func kafka.CreateSubTopic(args ...string) kafka.Topic{
-//	topic := ""
-//	for index , arg := range args {
-//		if index == 0 {
-//			topic = arg
-//		} else {
-//			topic = fmt.Sprintf("%s_%s",  topic, arg)
-//		}
-//	}
-//	return kafka.Topic{Name:topic}
-//}
-
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
 	rpc := "adopt_device"
-	topic := kafka.Topic{Name: device.Type}
+	topic := kafka.Topic{Name: device.Adapter}
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -83,14 +71,6 @@
 	}
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
 	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
-	//if success {
-	//	// From now on, any unsolicited requests from the adapters for this device will come over the device topic.
-	//	// We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
-	//	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
-	//		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
-	//		return err
-	//	}
-	//}
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
@@ -99,7 +79,7 @@
 	rpc := "disable_device"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -115,7 +95,7 @@
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reenable_device"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -131,7 +111,7 @@
 func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reboot_device"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -147,7 +127,7 @@
 func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
 	rpc := "delete_device"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -169,7 +149,7 @@
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
 	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -200,7 +180,7 @@
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
 	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -269,7 +249,7 @@
 func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "download_image"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -290,7 +270,7 @@
 func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
 	log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -327,7 +307,7 @@
 func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -348,7 +328,7 @@
 func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -369,7 +349,7 @@
 func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -422,7 +402,7 @@
 
 func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -447,7 +427,7 @@
 
 func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
 	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{