[VOL-1499] Use precreated topic

This commit migrate from dynamically created kafka topic to
pre-created topic.  The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.

Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ed09c9d..b3ba10d 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -30,12 +30,15 @@
 
 type AdapterProxy struct {
 	TestMode     bool
+	deviceTopicRegistered bool
+	coreTopic *kafka.Topic
 	kafkaICProxy *kafka.InterContainerProxy
 }
 
 func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
 	var proxy AdapterProxy
 	proxy.kafkaICProxy = kafkaProxy
+	proxy.deviceTopicRegistered = false
 	return &proxy
 }
 
@@ -55,22 +58,42 @@
 	}
 }
 
+func (ap *AdapterProxy) updateCoreTopic(coreTopic *kafka.Topic) {
+	ap.coreTopic = coreTopic
+}
+
+func (ap *AdapterProxy) getCoreTopic() kafka.Topic{
+	if ap.coreTopic != nil {
+		return *ap.coreTopic
+	}
+	return kafka.Topic{Name:ap.kafkaICProxy.DefaultTopic.Name}
+}
+
+func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic{
+	return kafka.Topic{Name: adapterName}
+}
+
+
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
 	rpc := "adopt_device"
-	topic := kafka.Topic{Name: device.Adapter}
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	//topic := kafka.Topic{Name: device.Adapter}
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device topic for the response as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
-	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
-		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
-		return err
-	}
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	replyToTopic := ap.getCoreTopic()
+	//if !ap.deviceTopicRegistered {
+	//	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
+	//		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+	//		return err
+	//	}
+	//}
+	ap.deviceTopicRegistered = true
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -78,16 +101,19 @@
 func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "disable_device"
+	toTopic := ap.getAdapterTopic(device.Adapter)
+
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	//toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	//replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -96,14 +122,14 @@
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reenable_device"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -112,14 +138,14 @@
 func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reboot_device"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -128,14 +154,14 @@
 func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
 	rpc := "delete_device"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 
@@ -150,14 +176,14 @@
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
 	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
@@ -181,7 +207,7 @@
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
 	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -193,7 +219,7 @@
 		Value: pNo,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
@@ -250,7 +276,7 @@
 func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "download_image"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -261,7 +287,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -271,7 +297,7 @@
 func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
 	log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -282,7 +308,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -308,7 +334,7 @@
 func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -319,7 +345,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -329,7 +355,7 @@
 func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -340,7 +366,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -350,7 +376,7 @@
 func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -361,7 +387,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -375,7 +401,7 @@
 
 func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
 	log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
-	toTopic := kafka.CreateSubTopic(deviceType, deviceId)
+	toTopic := ap.getAdapterTopic(deviceType)
 	rpc := "receive_packet_out"
 	dId := &ic.StrType{Val: deviceId}
 	args := make([]*kafka.KVArg, 3)
@@ -395,15 +421,15 @@
 
 	// TODO:  Do we need to wait for an ACK on a packet Out?
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
+	replyToTopic := ap.getCoreTopic()
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
 
 func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -420,7 +446,7 @@
 	}
 
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -428,7 +454,7 @@
 
 func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
 	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -445,7 +471,7 @@
 	}
 
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 88b622e..1be0cff 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -81,7 +81,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -130,7 +130,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -198,7 +198,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -292,7 +292,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -330,7 +330,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Devices, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -623,7 +623,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -674,7 +674,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 838235d..d576a6e 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -17,6 +17,8 @@
 
 import (
 	"context"
+	"errors"
+	"fmt"
 	grpcserver "github.com/opencord/voltha-go/common/grpc"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
@@ -43,6 +45,7 @@
 	exitChannel       chan int
 	kvClient          kvstore.Client
 	kafkaClient       kafka.Client
+	coreMembershipRegistered bool
 }
 
 func init() {
@@ -72,6 +75,7 @@
 	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
 	core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
 	core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
+	core.coreMembershipRegistered = false
 	return &core
 }
 
@@ -84,7 +88,8 @@
 	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
 	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
 	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
-	if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+
+	if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
 		log.Fatal("Failure-registering-adapterRequestHandler")
 	}
 	go core.startDeviceManager(ctx)
@@ -113,7 +118,8 @@
 	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
 	log.Info("grpc-server-created")
 
-	core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+	//core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+	core.grpcNBIAPIHandler = NewAPIHandler(core)
 	core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
 	//	Create a function to register the core GRPC service with the GRPC server
 	f := func(gs *grpc.Server) {
@@ -154,17 +160,57 @@
 	return nil
 }
 
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+//func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+//	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+//	) error {
+//	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+//		core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+//	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
+//
+//	log.Info("request-handlers")
+//	return nil
+//}
+
+func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
 	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
-	) error {
+) error {
 	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
 		core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
-	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
 
-	log.Info("request-handlers")
+	// Register the broadcast topic to handle any core-bound broadcast requests
+	if err := core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy); err != nil {
+		log.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": core.config.CoreTopic})
+		return err
+	}
+
+	log.Info("request-handler-registered")
 	return nil
 }
 
+
+func (core *Core) registerCoreTopic(ctx context.Context, coreTopicSuffix string) error {
+	// Sanity check - can only register once
+	if core.coreMembershipRegistered {
+		return errors.New("Can-only-set-once")
+	}
+
+	go func(coreTopicSuffix string) {
+		// Register the core-pair topic to handle core-bound requests aimed to the core pair
+		pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, coreTopicSuffix)}
+		// TODO: Set a retry here to ensure this subscription happens
+		if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
+			log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
+		}
+		// Update the CoreTopic to use by the adapter proxy
+		core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
+	}(coreTopicSuffix)
+
+	core.coreMembershipRegistered = true
+	log.Info("request-handlers-registered")
+	return nil
+}
+
+
 func (core *Core) startDeviceManager(ctx context.Context) {
 	// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
 	// callbacks.  For now, until the model is ready, devicemanager will keep a reference to the
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index f72d615..46382c2 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -60,18 +60,34 @@
 	longRunningRequestTimeout int64
 	defaultRequestTimeout int64
 	da.DefaultAPIHandler
+	core *Core
 }
 
-func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+//func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+//	handler := &APIHandler{
+//		deviceMgr:        deviceMgr,
+//		logicalDeviceMgr: lDeviceMgr,
+//		adapterMgr:adapterMgr,
+//		coreInCompetingMode:inCompetingMode,
+//		longRunningRequestTimeout:longRunningRequestTimeout,
+//		defaultRequestTimeout:defaultRequestTimeout,
+//		// TODO: Figure out what the 'hint' parameter to queue.New does
+//		packetInQueue: queue.New(10),
+//	}
+//	return handler
+//}
+
+func NewAPIHandler(core *Core) *APIHandler {
 	handler := &APIHandler{
-		deviceMgr:        deviceMgr,
-		logicalDeviceMgr: lDeviceMgr,
-		adapterMgr:adapterMgr,
-		coreInCompetingMode:inCompetingMode,
-		longRunningRequestTimeout:longRunningRequestTimeout,
-		defaultRequestTimeout:defaultRequestTimeout,
+		deviceMgr:        core.deviceMgr,
+		logicalDeviceMgr: core.logicalDeviceMgr,
+		adapterMgr: core.adapterMgr,
+		coreInCompetingMode: core.config.InCompetingMode,
+		longRunningRequestTimeout:core.config.LongRunningRequestTimeout,
+		defaultRequestTimeout:core.config.DefaultRequestTimeout,
 		// TODO: Figure out what the 'hint' parameter to queue.New does
 		packetInQueue: queue.New(10),
+		core: core,
 	}
 	return handler
 }
@@ -186,6 +202,16 @@
 	return out, nil
 }
 
+
+func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
+	log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
+	out := new(empty.Empty)
+	if err := handler.core.registerCoreTopic(ctx, membership.GroupName); err != nil {
+		return out, err
+	}
+	return out, nil
+}
+
 func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
 	log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
 	if isTestMode(ctx) {
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 1b2fb1e..e313b77 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -306,7 +306,7 @@
 }
 
 func (ldMgr *LogicalDeviceManager) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
-	log.Debugw("AddUNILogicalPort", log.Fields{"deviceId": childDevice.Id})
+	log.Debugw("AddUNILogicalPort", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId})
 	// Sanity check
 	if childDevice.Root {
 		return errors.New("Device-root")