[VOL-1499] Use precreated topic

This commit migrate from dynamically created kafka topic to
pre-created topic.  The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.

Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index f076910..a503c97 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -25,12 +25,16 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"sync"
 )
 
 type CoreProxy struct {
 	kafkaICProxy *kafka.InterContainerProxy
 	adapterTopic string
 	coreTopic    string
+	deviceIdCoreMap map[string]string
+	lockDeviceIdCoreMap sync.RWMutex
+
 }
 
 func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
@@ -38,6 +42,8 @@
 	proxy.kafkaICProxy = kafkaProxy
 	proxy.adapterTopic = adapterTopic
 	proxy.coreTopic = coreTopic
+	proxy.deviceIdCoreMap = make(map[string]string)
+	proxy.lockDeviceIdCoreMap = sync.RWMutex{}
 	log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
 
 	return &proxy
@@ -58,11 +64,40 @@
 	}
 }
 
+// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
+	ap.lockDeviceIdCoreMap.Lock()
+	defer ap.lockDeviceIdCoreMap.Unlock()
+	ap.deviceIdCoreMap[deviceId] = coreReference
+}
+
+// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
+	ap.lockDeviceIdCoreMap.Lock()
+	defer ap.lockDeviceIdCoreMap.Unlock()
+	delete(ap.deviceIdCoreMap, deviceId)
+}
+
+func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
+	ap.lockDeviceIdCoreMap.Lock()
+	defer ap.lockDeviceIdCoreMap.Unlock()
+
+	if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
+		return kafka.Topic{Name: t}
+	}
+
+	return kafka.Topic{Name: ap.coreTopic}
+}
+
+func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
+	return kafka.Topic{Name: ap.adapterTopic}
+}
+
 func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
 	log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
 	rpc := "Register"
 	topic := kafka.Topic{Name: ap.coreTopic}
-	replyToTopic := kafka.Topic{Name: ap.adapterTopic}
+	replyToTopic := ap.getAdapterTopic()
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "adapter",
@@ -81,16 +116,14 @@
 func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
 	rpc := "DeviceUpdate"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, device.Id)
+	toTopic := ap.getCoreTopic(device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, device.Id)
+	replyToTopic := ap.getAdapterTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -101,7 +134,7 @@
 	rpc := "PortCreated"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+	toTopic := ap.getCoreTopic(deviceId)
 	args := make([]*kafka.KVArg, 2)
 	id := &voltha.ID{Id: deviceId}
 	args[0] = &kafka.KVArg{
@@ -114,7 +147,7 @@
 	}
 
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+	replyToTopic := ap.getAdapterTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
@@ -126,7 +159,7 @@
 	rpc := "DeviceStateUpdate"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+	toTopic := ap.getCoreTopic(deviceId)
 	args := make([]*kafka.KVArg, 3)
 	id := &voltha.ID{Id: deviceId}
 	oStatus := &ic.IntType{Val: int64(operStatus)}
@@ -145,7 +178,7 @@
 		Value: cStatus,
 	}
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+	replyToTopic := ap.getAdapterTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
@@ -157,8 +190,8 @@
 	rpc := "ChildDeviceDetected"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, parentDeviceId)
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, parentDeviceId)
+	toTopic := ap.getCoreTopic(parentDeviceId)
+	replyToTopic := ap.getAdapterTopic()
 
 	args := make([]*kafka.KVArg, 4)
 	id := &voltha.ID{Id: parentDeviceId}