[VOL-1499] Use precreated topic

This commit migrate from dynamically created kafka topic to
pre-created topic.  The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.

Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ed09c9d..b3ba10d 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -30,12 +30,15 @@
 
 type AdapterProxy struct {
 	TestMode     bool
+	deviceTopicRegistered bool
+	coreTopic *kafka.Topic
 	kafkaICProxy *kafka.InterContainerProxy
 }
 
 func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
 	var proxy AdapterProxy
 	proxy.kafkaICProxy = kafkaProxy
+	proxy.deviceTopicRegistered = false
 	return &proxy
 }
 
@@ -55,22 +58,42 @@
 	}
 }
 
+func (ap *AdapterProxy) updateCoreTopic(coreTopic *kafka.Topic) {
+	ap.coreTopic = coreTopic
+}
+
+func (ap *AdapterProxy) getCoreTopic() kafka.Topic{
+	if ap.coreTopic != nil {
+		return *ap.coreTopic
+	}
+	return kafka.Topic{Name:ap.kafkaICProxy.DefaultTopic.Name}
+}
+
+func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic{
+	return kafka.Topic{Name: adapterName}
+}
+
+
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
 	rpc := "adopt_device"
-	topic := kafka.Topic{Name: device.Adapter}
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	//topic := kafka.Topic{Name: device.Adapter}
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device topic for the response as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
-	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
-		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
-		return err
-	}
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	replyToTopic := ap.getCoreTopic()
+	//if !ap.deviceTopicRegistered {
+	//	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
+	//		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+	//		return err
+	//	}
+	//}
+	ap.deviceTopicRegistered = true
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -78,16 +101,19 @@
 func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "disable_device"
+	toTopic := ap.getAdapterTopic(device.Adapter)
+
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	//toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	//replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -96,14 +122,14 @@
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reenable_device"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -112,14 +138,14 @@
 func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reboot_device"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -128,14 +154,14 @@
 func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
 	rpc := "delete_device"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 
@@ -150,14 +176,14 @@
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
 	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
@@ -181,7 +207,7 @@
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
 	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -193,7 +219,7 @@
 		Value: pNo,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
@@ -250,7 +276,7 @@
 func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "download_image"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -261,7 +287,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -271,7 +297,7 @@
 func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
 	log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -282,7 +308,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -308,7 +334,7 @@
 func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -319,7 +345,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -329,7 +355,7 @@
 func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -340,7 +366,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -350,7 +376,7 @@
 func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -361,7 +387,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -375,7 +401,7 @@
 
 func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
 	log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
-	toTopic := kafka.CreateSubTopic(deviceType, deviceId)
+	toTopic := ap.getAdapterTopic(deviceType)
 	rpc := "receive_packet_out"
 	dId := &ic.StrType{Val: deviceId}
 	args := make([]*kafka.KVArg, 3)
@@ -395,15 +421,15 @@
 
 	// TODO:  Do we need to wait for an ACK on a packet Out?
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
+	replyToTopic := ap.getCoreTopic()
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
 
 func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -420,7 +446,7 @@
 	}
 
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -428,7 +454,7 @@
 
 func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
 	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -445,7 +471,7 @@
 	}
 
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)