[VOL-1499] Use precreated topic

This commit migrate from dynamically created kafka topic to
pre-created topic.  The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.

Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index f076910..a503c97 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -25,12 +25,16 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"sync"
 )
 
 type CoreProxy struct {
 	kafkaICProxy *kafka.InterContainerProxy
 	adapterTopic string
 	coreTopic    string
+	deviceIdCoreMap map[string]string
+	lockDeviceIdCoreMap sync.RWMutex
+
 }
 
 func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
@@ -38,6 +42,8 @@
 	proxy.kafkaICProxy = kafkaProxy
 	proxy.adapterTopic = adapterTopic
 	proxy.coreTopic = coreTopic
+	proxy.deviceIdCoreMap = make(map[string]string)
+	proxy.lockDeviceIdCoreMap = sync.RWMutex{}
 	log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
 
 	return &proxy
@@ -58,11 +64,40 @@
 	}
 }
 
+// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
+	ap.lockDeviceIdCoreMap.Lock()
+	defer ap.lockDeviceIdCoreMap.Unlock()
+	ap.deviceIdCoreMap[deviceId] = coreReference
+}
+
+// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
+	ap.lockDeviceIdCoreMap.Lock()
+	defer ap.lockDeviceIdCoreMap.Unlock()
+	delete(ap.deviceIdCoreMap, deviceId)
+}
+
+func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
+	ap.lockDeviceIdCoreMap.Lock()
+	defer ap.lockDeviceIdCoreMap.Unlock()
+
+	if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
+		return kafka.Topic{Name: t}
+	}
+
+	return kafka.Topic{Name: ap.coreTopic}
+}
+
+func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
+	return kafka.Topic{Name: ap.adapterTopic}
+}
+
 func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
 	log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
 	rpc := "Register"
 	topic := kafka.Topic{Name: ap.coreTopic}
-	replyToTopic := kafka.Topic{Name: ap.adapterTopic}
+	replyToTopic := ap.getAdapterTopic()
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "adapter",
@@ -81,16 +116,14 @@
 func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
 	rpc := "DeviceUpdate"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, device.Id)
+	toTopic := ap.getCoreTopic(device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, device.Id)
+	replyToTopic := ap.getAdapterTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -101,7 +134,7 @@
 	rpc := "PortCreated"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+	toTopic := ap.getCoreTopic(deviceId)
 	args := make([]*kafka.KVArg, 2)
 	id := &voltha.ID{Id: deviceId}
 	args[0] = &kafka.KVArg{
@@ -114,7 +147,7 @@
 	}
 
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+	replyToTopic := ap.getAdapterTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
@@ -126,7 +159,7 @@
 	rpc := "DeviceStateUpdate"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+	toTopic := ap.getCoreTopic(deviceId)
 	args := make([]*kafka.KVArg, 3)
 	id := &voltha.ID{Id: deviceId}
 	oStatus := &ic.IntType{Val: int64(operStatus)}
@@ -145,7 +178,7 @@
 		Value: cStatus,
 	}
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+	replyToTopic := ap.getAdapterTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
@@ -157,8 +190,8 @@
 	rpc := "ChildDeviceDetected"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, parentDeviceId)
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, parentDeviceId)
+	toTopic := ap.getCoreTopic(parentDeviceId)
+	replyToTopic := ap.getAdapterTopic()
 
 	args := make([]*kafka.KVArg, 4)
 	id := &voltha.ID{Id: parentDeviceId}
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index b3606b0..4d65d89 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -32,12 +32,14 @@
 	TestMode       bool
 	coreInstanceId string
 	adapter        adapters.IAdapter
+	coreProxy *CoreProxy
 }
 
-func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter) *RequestHandlerProxy {
+func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter, cProxy *CoreProxy) *RequestHandlerProxy {
 	var proxy RequestHandlerProxy
 	proxy.coreInstanceId = coreInstanceId
 	proxy.adapter = iadapter
+	proxy.coreProxy = cProxy
 	return &proxy
 }
 
@@ -54,13 +56,14 @@
 }
 
 func (rhp *RequestHandlerProxy) Adopt_device(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	device := &voltha.Device{}
 	transactionID := &ic.StrType{}
+	fromTopic := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device":
@@ -73,11 +76,19 @@
 				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.FromTopic:
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 
 	log.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
 
+	//Update the core reference for that device
+	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+
 	//Invoke the adopt device on the adapter
 	if err := rhp.adapter.Adopt_device(device); err != nil {
 		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
@@ -143,7 +154,7 @@
 }
 
 func (rhp *RequestHandlerProxy) Get_ofp_device_info(args []*ic.Argument) (*ic.SwitchCapability, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -177,7 +188,7 @@
 }
 
 func (rhp *RequestHandlerProxy) Get_ofp_port_info(args []*ic.Argument) (*ic.PortCapability, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -214,7 +225,7 @@
 }
 
 func (rhp *RequestHandlerProxy) Process_inter_adapter_message(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index edf3135..529b89d 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -96,16 +96,6 @@
 	return nil
 }
 
-func (so *SimulatedOLT) createDeviceTopic(device *voltha.Device) error {
-	log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
-	deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
-	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, kafka.OffsetOldest); err != nil {
-		log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
-		return err
-	}
-	return nil
-}
-
 func (so *SimulatedOLT) Adopt_device(device *voltha.Device) error {
 	if device == nil {
 		log.Warn("device-is-nil")
@@ -117,8 +107,6 @@
 		handler := NewDeviceHandler(so.coreProxy, device, so)
 		so.addDeviceHandlerToMap(handler)
 		go handler.AdoptDevice(device)
-		// Launch the creation of the device topic
-		go so.createDeviceTopic(device)
 	}
 	return nil
 }
diff --git a/adapters/simulated_olt/main.go b/adapters/simulated_olt/main.go
index 01234a9..ea6fb6c 100644
--- a/adapters/simulated_olt/main.go
+++ b/adapters/simulated_olt/main.go
@@ -91,7 +91,7 @@
 	}
 
 	// Register the core request handler
-	if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+	if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
 		log.Fatal("error-setting-core-request-handler")
 	}
 
@@ -218,9 +218,9 @@
 	return sOLT, nil
 }
 
-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
 	log.Info("setting-request-handler")
-	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
 	if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
 		log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
 		return err
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index 2d43944..c10b844 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -94,16 +94,6 @@
 	return nil
 }
 
-func (so *SimulatedONU) createDeviceTopic(device *voltha.Device) error {
-	log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
-	deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
-	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, kafka.OffsetOldest); err != nil {
-		log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
-		return err
-	}
-	return nil
-}
-
 func (so *SimulatedONU) Adopt_device(device *voltha.Device) error {
 	if device == nil {
 		log.Warn("device-is-nil")
@@ -115,8 +105,6 @@
 		handler := NewDeviceHandler(so.coreProxy, device, so)
 		so.addDeviceHandlerToMap(handler)
 		go handler.AdoptDevice(device)
-		// Launch the creation of the device topic
-		go so.createDeviceTopic(device)
 	}
 	return nil
 }
diff --git a/adapters/simulated_onu/main.go b/adapters/simulated_onu/main.go
index 62cf2db..a5c18aa 100644
--- a/adapters/simulated_onu/main.go
+++ b/adapters/simulated_onu/main.go
@@ -91,7 +91,7 @@
 	}
 
 	// Register the core request handler
-	if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+	if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
 		log.Fatal("error-setting-core-request-handler")
 	}
 
@@ -218,9 +218,9 @@
 	return sOLT, nil
 }
 
-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
 	log.Info("setting-request-handler")
-	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
 	if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
 		log.Errorw("adaptercore-request-handler-setup-failed", log.Fields{"error": err})
 		return err