[VOL-1499] Use precreated topic

This commit migrate from dynamically created kafka topic to
pre-created topic.  The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.

Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index f076910..a503c97 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -25,12 +25,16 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"sync"
 )
 
 type CoreProxy struct {
 	kafkaICProxy *kafka.InterContainerProxy
 	adapterTopic string
 	coreTopic    string
+	deviceIdCoreMap map[string]string
+	lockDeviceIdCoreMap sync.RWMutex
+
 }
 
 func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
@@ -38,6 +42,8 @@
 	proxy.kafkaICProxy = kafkaProxy
 	proxy.adapterTopic = adapterTopic
 	proxy.coreTopic = coreTopic
+	proxy.deviceIdCoreMap = make(map[string]string)
+	proxy.lockDeviceIdCoreMap = sync.RWMutex{}
 	log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
 
 	return &proxy
@@ -58,11 +64,40 @@
 	}
 }
 
+// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
+	ap.lockDeviceIdCoreMap.Lock()
+	defer ap.lockDeviceIdCoreMap.Unlock()
+	ap.deviceIdCoreMap[deviceId] = coreReference
+}
+
+// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
+	ap.lockDeviceIdCoreMap.Lock()
+	defer ap.lockDeviceIdCoreMap.Unlock()
+	delete(ap.deviceIdCoreMap, deviceId)
+}
+
+func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
+	ap.lockDeviceIdCoreMap.Lock()
+	defer ap.lockDeviceIdCoreMap.Unlock()
+
+	if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
+		return kafka.Topic{Name: t}
+	}
+
+	return kafka.Topic{Name: ap.coreTopic}
+}
+
+func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
+	return kafka.Topic{Name: ap.adapterTopic}
+}
+
 func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
 	log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
 	rpc := "Register"
 	topic := kafka.Topic{Name: ap.coreTopic}
-	replyToTopic := kafka.Topic{Name: ap.adapterTopic}
+	replyToTopic := ap.getAdapterTopic()
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "adapter",
@@ -81,16 +116,14 @@
 func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
 	rpc := "DeviceUpdate"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, device.Id)
+	toTopic := ap.getCoreTopic(device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, device.Id)
+	replyToTopic := ap.getAdapterTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -101,7 +134,7 @@
 	rpc := "PortCreated"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+	toTopic := ap.getCoreTopic(deviceId)
 	args := make([]*kafka.KVArg, 2)
 	id := &voltha.ID{Id: deviceId}
 	args[0] = &kafka.KVArg{
@@ -114,7 +147,7 @@
 	}
 
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+	replyToTopic := ap.getAdapterTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
@@ -126,7 +159,7 @@
 	rpc := "DeviceStateUpdate"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+	toTopic := ap.getCoreTopic(deviceId)
 	args := make([]*kafka.KVArg, 3)
 	id := &voltha.ID{Id: deviceId}
 	oStatus := &ic.IntType{Val: int64(operStatus)}
@@ -145,7 +178,7 @@
 		Value: cStatus,
 	}
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+	replyToTopic := ap.getAdapterTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
@@ -157,8 +190,8 @@
 	rpc := "ChildDeviceDetected"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(ap.coreTopic, parentDeviceId)
-	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, parentDeviceId)
+	toTopic := ap.getCoreTopic(parentDeviceId)
+	replyToTopic := ap.getAdapterTopic()
 
 	args := make([]*kafka.KVArg, 4)
 	id := &voltha.ID{Id: parentDeviceId}
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index b3606b0..4d65d89 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -32,12 +32,14 @@
 	TestMode       bool
 	coreInstanceId string
 	adapter        adapters.IAdapter
+	coreProxy *CoreProxy
 }
 
-func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter) *RequestHandlerProxy {
+func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter, cProxy *CoreProxy) *RequestHandlerProxy {
 	var proxy RequestHandlerProxy
 	proxy.coreInstanceId = coreInstanceId
 	proxy.adapter = iadapter
+	proxy.coreProxy = cProxy
 	return &proxy
 }
 
@@ -54,13 +56,14 @@
 }
 
 func (rhp *RequestHandlerProxy) Adopt_device(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	device := &voltha.Device{}
 	transactionID := &ic.StrType{}
+	fromTopic := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device":
@@ -73,11 +76,19 @@
 				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.FromTopic:
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 
 	log.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
 
+	//Update the core reference for that device
+	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+
 	//Invoke the adopt device on the adapter
 	if err := rhp.adapter.Adopt_device(device); err != nil {
 		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
@@ -143,7 +154,7 @@
 }
 
 func (rhp *RequestHandlerProxy) Get_ofp_device_info(args []*ic.Argument) (*ic.SwitchCapability, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -177,7 +188,7 @@
 }
 
 func (rhp *RequestHandlerProxy) Get_ofp_port_info(args []*ic.Argument) (*ic.PortCapability, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -214,7 +225,7 @@
 }
 
 func (rhp *RequestHandlerProxy) Process_inter_adapter_message(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index edf3135..529b89d 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -96,16 +96,6 @@
 	return nil
 }
 
-func (so *SimulatedOLT) createDeviceTopic(device *voltha.Device) error {
-	log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
-	deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
-	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, kafka.OffsetOldest); err != nil {
-		log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
-		return err
-	}
-	return nil
-}
-
 func (so *SimulatedOLT) Adopt_device(device *voltha.Device) error {
 	if device == nil {
 		log.Warn("device-is-nil")
@@ -117,8 +107,6 @@
 		handler := NewDeviceHandler(so.coreProxy, device, so)
 		so.addDeviceHandlerToMap(handler)
 		go handler.AdoptDevice(device)
-		// Launch the creation of the device topic
-		go so.createDeviceTopic(device)
 	}
 	return nil
 }
diff --git a/adapters/simulated_olt/main.go b/adapters/simulated_olt/main.go
index 01234a9..ea6fb6c 100644
--- a/adapters/simulated_olt/main.go
+++ b/adapters/simulated_olt/main.go
@@ -91,7 +91,7 @@
 	}
 
 	// Register the core request handler
-	if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+	if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
 		log.Fatal("error-setting-core-request-handler")
 	}
 
@@ -218,9 +218,9 @@
 	return sOLT, nil
 }
 
-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
 	log.Info("setting-request-handler")
-	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
 	if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
 		log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
 		return err
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index 2d43944..c10b844 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -94,16 +94,6 @@
 	return nil
 }
 
-func (so *SimulatedONU) createDeviceTopic(device *voltha.Device) error {
-	log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
-	deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
-	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, kafka.OffsetOldest); err != nil {
-		log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
-		return err
-	}
-	return nil
-}
-
 func (so *SimulatedONU) Adopt_device(device *voltha.Device) error {
 	if device == nil {
 		log.Warn("device-is-nil")
@@ -115,8 +105,6 @@
 		handler := NewDeviceHandler(so.coreProxy, device, so)
 		so.addDeviceHandlerToMap(handler)
 		go handler.AdoptDevice(device)
-		// Launch the creation of the device topic
-		go so.createDeviceTopic(device)
 	}
 	return nil
 }
diff --git a/adapters/simulated_onu/main.go b/adapters/simulated_onu/main.go
index 62cf2db..a5c18aa 100644
--- a/adapters/simulated_onu/main.go
+++ b/adapters/simulated_onu/main.go
@@ -91,7 +91,7 @@
 	}
 
 	// Register the core request handler
-	if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+	if err = a.setupRequestHandler(a.instanceId, a.iAdapter, a.coreProxy); err != nil {
 		log.Fatal("error-setting-core-request-handler")
 	}
 
@@ -218,9 +218,9 @@
 	return sOLT, nil
 }
 
-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter, coreProxy *com.CoreProxy) error {
 	log.Info("setting-request-handler")
-	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, coreProxy)
 	if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
 		log.Errorw("adaptercore-request-handler-setup-failed", log.Fields{"error": err})
 		return err
diff --git a/common/core/northbound/grpc/default_api_handler.go b/common/core/northbound/grpc/default_api_handler.go
index 0cc76c5..30320a2 100644
--- a/common/core/northbound/grpc/default_api_handler.go
+++ b/common/core/northbound/grpc/default_api_handler.go
@@ -42,6 +42,11 @@
 	return nil, errors.New("UnImplemented")
 }
 
+func (handler *DefaultAPIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
+	log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
+	return nil, errors.New("UnImplemented")
+}
+
 func (handler *DefaultAPIHandler) GetVoltha(ctx context.Context, empty *empty.Empty) (*voltha.Voltha, error) {
 	log.Debug("GetVoltha-request")
 	return nil, errors.New("UnImplemented")
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 099d0d8..c5e0772 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -43,6 +43,7 @@
 
 const (
 	TransactionKey = "transactionID"
+	FromTopic      = "fromTopic"
 )
 
 // requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
@@ -331,7 +332,7 @@
 	kp.defaultRequestHandlerInterface = handler
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForRequest(ch, topic, handler)
+	go kp.waitForMessages(ch, topic, handler)
 
 	return nil
 }
@@ -342,14 +343,14 @@
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key:Offset, Value:initialOffset}); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 		return err
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
 
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
+	go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
 
 	return nil
 }
@@ -615,7 +616,21 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+	var marshalledArg *any.Any
+	var err error
+	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
+		log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		return currentArgs
+	}
+	protoArg := &ic.Argument{
+		Key:   FromTopic,
+		Value: marshalledArg,
+	}
+	return append(currentArgs, protoArg)
+}
+
+func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ic.MessageType_REQUEST {
@@ -633,6 +648,11 @@
 			// Augment the requestBody with the message Id as it will be used in scenarios where cores
 			// are set in pairs and competing
 			requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
+
+			// Augment the requestBody with the From topic name as it will be used in scenarios where a container
+			// needs to send an unsollicited message to the currently requested container
+			requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
+
 			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
 				log.Warn(err)
@@ -660,7 +680,7 @@
 						returnError = &ic.Error{Reason: "incorrect-error-returns"}
 						returnedValues = append(returnedValues, returnError)
 					}
-				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil()  {
+				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
 					return // Ignore case - when core is in competing mode
 				} else { // Non-error case
 					success = true
@@ -688,15 +708,19 @@
 			// TODO: handle error response.
 			kp.kafkaClient.Send(icm, replyTopic, key)
 		}
-
+	} else if msg.Header.Type == ic.MessageType_RESPONSE {
+		log.Debugw("response-received", log.Fields{"msg": msg})
+		go kp.dispatchResponse(msg)
+	} else {
+		log.Warnw("unsupported-message-received", log.Fields{"msg": msg})
 	}
 }
 
-func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
 		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
-		go kp.handleRequest(msg, targetInterface)
+		go kp.handleMessage(msg, targetInterface)
 	}
 }
 
@@ -710,33 +734,6 @@
 	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
 }
 
-// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
-// and then dispatches to the consumers
-func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
-	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
-startloop:
-	for {
-		select {
-		case msg, ok := <-subscribedCh:
-			if !ok {
-				log.Debugw("channel-closed", log.Fields{"topic": topic.Name})
-				break startloop
-			}
-			log.Debugw("message-received", log.Fields{"msg": msg})
-			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
-			if msg.Header.Type == ic.MessageType_RESPONSE {
-				go kp.dispatchResponse(msg)
-			}
-		case <-kp.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
-			break startloop
-		}
-	}
-	//log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
-	//	We got an exit signal.  Unsubscribe to the channel
-	//kp.kafkaClient.UnSubscribe(topic, subscribedCh)
-}
-
 // subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
@@ -744,26 +741,6 @@
 func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
 	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
-	// First check whether we already have a channel listening for response on that topic.  If there is
-	// already one then it will be reused.  If not, it will be created.
-	if !kp.isTopicSubscribedForResponse(topic.Name) {
-		log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
-		var subscribedCh <-chan *ic.InterContainerMessage
-		var err error
-		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
-			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
-			return nil, err
-		}
-		kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
-		go kp.waitForResponseLoop(subscribedCh, &topic)
-
-		//	Wait until topic is ready - it takes on average 300 ms for a topic to be created.  This is a one time
-		//	delay everything a device is created.
-		// TODO:  Implement a mechanism to determine when a topic is ready instead of relying on a timeout
-		//kp.kafkaClient.WaitForTopicToBeReady
-		time.Sleep(400 * time.Millisecond)
-	}
-
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
 	ch := make(chan *ic.InterContainerMessage)
diff --git a/kafka/utils.go b/kafka/utils.go
index beac9f9..0cb9535 100644
--- a/kafka/utils.go
+++ b/kafka/utils.go
@@ -15,10 +15,7 @@
  */
 package kafka
 
-import (
-	"fmt"
-	"strings"
-)
+import "strings"
 
 const (
 	TopicSeparator = "_"
@@ -39,19 +36,7 @@
 	Value interface{}
 }
 
-//CreateSubTopic concatenate a list of arguments together using underscores.
-func CreateSubTopic(args ...string) Topic {
-	topic := ""
-	for index, arg := range args {
-		if index == 0 {
-			topic = arg
-		} else {
-			topic = fmt.Sprintf("%s%s%s", topic, TopicSeparator, arg)
-		}
-	}
-	return Topic{Name: topic}
-}
-
+// TODO:  Remove and provide better may to get the device id
 // GetDeviceIdFromTopic extract the deviceId from the topic name.  The topic name is formatted either as:
 //			<any string> or <any string>_<deviceId>.  The device Id is 24 characters long.
 func GetDeviceIdFromTopic(topic Topic) string {
diff --git a/protos/voltha.proto b/protos/voltha.proto
index fdff7f6..ef6eedf 100644
--- a/protos/voltha.proto
+++ b/protos/voltha.proto
@@ -137,6 +137,15 @@
     string voltha_id = 2;
 }
 
+// Identifies a membership group a Core belongs to
+message Membership {
+    //  Group name
+    string group_name = 1;
+
+    // Unique ID of a container within that group
+    string id = 2;
+}
+
 /*
  * Voltha APIs
  *
@@ -150,6 +159,14 @@
         };
     }
 
+    // Set the membership group of a Voltha Core
+    rpc UpdateMembership(Membership) returns(google.protobuf.Empty) {
+        option (google.api.http) = {
+            post: "/api/v1/membership"
+            body: "*"
+        };
+    }
+
     // Get high level information on the Voltha cluster
     rpc GetVoltha(google.protobuf.Empty) returns(Voltha) {
         option (google.api.http) = {
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index fccb049..68344fc 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -30,7 +30,7 @@
 from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
     FlowGroupChanges, ofp_packet_out
 from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
-    get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
+    get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST, ARG_FROM_TOPIC
 
 log = structlog.get_logger()
 
@@ -56,8 +56,9 @@
     defined in
     """
 
-    def __init__(self, adapter):
+    def __init__(self, adapter, core_proxy):
         self.adapter = adapter
+        self.core_proxy = core_proxy
 
     @inlineCallbacks
     def start(self):
@@ -67,24 +68,32 @@
     def stop(self):
         log.debug('stopping')
 
-    @inlineCallbacks
-    def createKafkaDeviceTopic(self, deviceId):
-        log.debug("subscribing-to-topic", device_id=deviceId)
-        kafka_proxy = get_messaging_proxy()
-        device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
-        # yield kafka_proxy.create_topic(topic=device_topic)
-        yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
-        log.debug("subscribed-to-topic", topic=device_topic)
+    # @inlineCallbacks
+    # def createKafkaDeviceTopic(self, deviceId):
+    #     log.debug("subscribing-to-topic", device_id=deviceId)
+    #     kafka_proxy = get_messaging_proxy()
+    #     device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
+    #     # yield kafka_proxy.create_topic(topic=device_topic)
+    #     yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
+    #     log.debug("subscribed-to-topic", topic=device_topic)
 
-    def adopt_device(self, device):
+    def adopt_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
 
-            # Start the creation of a device specific topic to handle all
-            # subsequent requests from the Core. This adapter instance will
-            # handle all requests for that device.
-            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+            # Update the core reference for that device as it will be used
+            # by the adapter to send async messages to the Core.
+            if ARG_FROM_TOPIC in kwargs:
+                t = StrType()
+                kwargs[ARG_FROM_TOPIC].Unpack(t)
+                # Update the core reference for that device
+                self.core_proxy.update_device_core_reference(d.id, t.val)
+
+            # # Start the creation of a device specific topic to handle all
+            # # subsequent requests from the Core. This adapter instance will
+            # # handle all requests for that device.
+            # reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
 
             result = self.adapter.adopt_device(d)
             # return True, self.adapter.adopt_device(d)
@@ -94,7 +103,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def get_ofp_device_info(self, device):
+    def get_ofp_device_info(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -103,7 +112,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def get_ofp_port_info(self, device, port_no):
+    def get_ofp_port_info(self, device, port_no, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -119,13 +128,13 @@
 
         return True, self.adapter.get_ofp_port_info(d, p.val)
 
-    def reconcile_device(self, device):
+    def reconcile_device(self, device, **kwargs):
         return self.adapter.reconcile_device(device)
 
-    def abandon_device(self, device):
+    def abandon_device(self, device, **kwargs):
         return self.adapter.abandon_device(device)
 
-    def disable_device(self, device):
+    def disable_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -134,7 +143,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def reenable_device(self, device):
+    def reenable_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -143,7 +152,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def reboot_device(self, device):
+    def reboot_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -152,7 +161,7 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def download_image(self, device, request):
+    def download_image(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -168,7 +177,7 @@
 
         return True, self.adapter.download_image(device, request)
 
-    def get_image_download_status(self, device, request):
+    def get_image_download_status(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -184,7 +193,7 @@
 
         return True, self.adapter.get_image_download_status(device, request)
 
-    def cancel_image_download(self, device, request):
+    def cancel_image_download(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -200,7 +209,7 @@
 
         return True, self.adapter.cancel_image_download(device, request)
 
-    def activate_image_update(self, device, request):
+    def activate_image_update(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -216,7 +225,7 @@
 
         return True, self.adapter.activate_image_update(device, request)
 
-    def revert_image_update(self, device, request):
+    def revert_image_update(self, device, request, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -233,10 +242,10 @@
         return True, self.adapter.revert_image_update(device, request)
 
 
-    def self_test(self, device):
+    def self_test(self, device, **kwargs):
         return self.adapter.self_test_device(device)
 
-    def delete_device(self, device):
+    def delete_device(self, device, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -254,10 +263,10 @@
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
 
-    def get_device_details(self, device):
+    def get_device_details(self, device, **kwargs):
         return self.adapter.get_device_details(device)
 
-    def update_flows_bulk(self, device, flows, groups):
+    def update_flows_bulk(self, device, flows, groups, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -274,7 +283,7 @@
 
         return (True, self.adapter.update_flows_bulk(d, f, g))
 
-    def update_flows_incrementally(self, device, flow_changes, group_changes):
+    def update_flows_incrementally(self, device, flow_changes, group_changes, **kwargs):
         d = Device()
         if device:
             device.Unpack(d)
@@ -291,13 +300,13 @@
 
         return (True, self.adapter.update_flows_incrementally(d, f, g))
 
-    def suppress_alarm(self, filter):
+    def suppress_alarm(self, filter, **kwargs):
         return self.adapter.suppress_alarm(filter)
 
-    def unsuppress_alarm(self, filter):
+    def unsuppress_alarm(self, filter, **kwargs):
         return self.adapter.unsuppress_alarm(filter)
 
-    def process_inter_adapter_message(self, msg):
+    def process_inter_adapter_message(self, msg, **kwargs):
         m = InterAdapterMessage()
         if msg:
             msg.Unpack(m)
@@ -308,7 +317,7 @@
         return (True, self.adapter.process_inter_adapter_message(m))
 
 
-    def receive_packet_out(self, deviceId, outPort, packet):
+    def receive_packet_out(self, deviceId, outPort, packet, **kwargs):
         try:
             d_id = StrType()
             if deviceId:
diff --git a/python/adapters/kafka/container_proxy.py b/python/adapters/kafka/container_proxy.py
index efcf558..bd873d8 100644
--- a/python/adapters/kafka/container_proxy.py
+++ b/python/adapters/kafka/container_proxy.py
@@ -38,10 +38,10 @@
 @implementer(IComponent)
 class ContainerProxy(object):
 
-    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+    def __init__(self, kafka_proxy, remote_topic, my_listening_topic):
         self.kafka_proxy = kafka_proxy
         self.listening_topic = my_listening_topic
-        self.core_topic = core_topic
+        self.remote_topic = remote_topic
         self.default_timeout = 3
 
     def start(self):
@@ -92,7 +92,7 @@
                           to_topic=to_topic,
                           reply_topic=reply_topic)
                 if to_topic is None:
-                    to_topic = self.core_topic
+                    to_topic = self.remote_topic
                 if reply_topic is None:
                     reply_topic = self.listening_topic
                 result = yield self.kafka_proxy.send_request(rpc=rpc,
@@ -130,4 +130,4 @@
                     raise e
                 retry += 1
                 if retry == max_retry:
-                    to_topic = self.core_topic
+                    to_topic = self.remote_topic
diff --git a/python/adapters/kafka/core_proxy.py b/python/adapters/kafka/core_proxy.py
index b897188..8d252a3 100644
--- a/python/adapters/kafka/core_proxy.py
+++ b/python/adapters/kafka/core_proxy.py
@@ -35,9 +35,27 @@
 
 class CoreProxy(ContainerProxy):
 
-    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
-        super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+    def __init__(self, kafka_proxy, default_core_topic, my_listening_topic):
+        super(CoreProxy, self).__init__(kafka_proxy, default_core_topic,
                                         my_listening_topic)
+        self.core_default_topic = default_core_topic
+        self.deviceId_to_core_map = dict()
+
+    def update_device_core_reference(self, device_id, core_topic):
+        log.debug("update_device_core_reference")
+        self.deviceId_to_core_map[device_id] = core_topic
+
+    def delete_device_core_reference(self, device_id, core_topic):
+        log.debug("delete_device_core_reference")
+        del self.deviceId_to_core_map[device_id]
+
+    def get_adapter_topic(self, **kwargs):
+        return self.listening_topic
+
+    def get_core_topic(self, device_id):
+        if device_id in self.deviceId_to_core_map:
+            return self.deviceId_to_core_map[device_id]
+        return self.core_default_topic
 
     @ContainerProxy.wrap_request(CoreInstance)
     @inlineCallbacks
@@ -62,8 +80,11 @@
         # Once we have a device being managed, all communications between the
         # the adapter and the core occurs over a topic associated with that
         # device
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="GetDevice",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -82,8 +103,11 @@
         id.id = device_id
         p_type = IntType()
         p_type.val = port_type
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="GetPorts",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -132,8 +156,11 @@
         cdt.val = child_device_type
         channel = IntType()
         channel.val = channel_id
-        to_topic = createSubTopic(self.core_topic, parent_device_id)
-        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+        to_topic = self.get_core_topic(parent_device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, parent_device_id)
+        # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         args = self._to_proto(**kw)
         res = yield self.invoke(rpc="ChildDeviceDetected",
                                 to_topic=to_topic,
@@ -149,8 +176,11 @@
     @inlineCallbacks
     def device_update(self, device):
         log.debug("device_update")
-        to_topic = createSubTopic(self.core_topic, device.id)
-        reply_topic = createSubTopic(self.listening_topic, device.id)
+        to_topic = self.get_core_topic(device.id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device.id)
+        # reply_topic = createSubTopic(self.listening_topic, device.id)
         res = yield self.invoke(rpc="DeviceUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -178,8 +208,11 @@
         else:
             c_status.val = -1
 
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        #     reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="DeviceStateUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -206,8 +239,11 @@
         else:
             c_status.val = -1
 
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="ChildrenStateUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -232,8 +268,11 @@
         o_status = IntType()
         o_status.val = oper_status
 
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortStateUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -262,8 +301,11 @@
         else:
             c_status.val = -1
 
-        to_topic = createSubTopic(self.core_topic, parent_device_id)
-        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+        to_topic = self.get_core_topic(parent_device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, parent_device_id)
+        # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         res = yield self.invoke(rpc="child_devices_state_update",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -281,8 +323,11 @@
         log.debug("device_pm_config_update")
         b = BoolType()
         b.val = init
-        to_topic = createSubTopic(self.core_topic, device_pm_config.id)
-        reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
+        to_topic = self.get_core_topic(device_pm_config.id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+        # reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
         res = yield self.invoke(rpc="DevicePMConfigUpdate",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -296,8 +341,11 @@
         log.debug("port_created")
         proto_id = ID()
         proto_id.id = device_id
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortCreated",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
@@ -333,8 +381,10 @@
         p.val = port
         pac = Packet()
         pac.payload = packet
-        to_topic = createSubTopic(self.core_topic, device_id)
-        reply_topic = createSubTopic(self.listening_topic, device_id)
+        to_topic = self.get_core_topic(device_id)
+        reply_topic = self.get_adapter_topic()
+        # to_topic = createSubTopic(self.core_topic, device_id)
+        # reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PacketIn",
                                 to_topic=to_topic,
                                 reply_topic=reply_topic,
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index 5cad2e8..12a5dbe 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -28,12 +28,13 @@
 from kafka_proxy import KafkaProxy, get_kafka_proxy
 from python.protos.inter_container_pb2 import MessageType, Argument, \
     InterContainerRequestBody, InterContainerMessage, Header, \
-    InterContainerResponseBody
+    InterContainerResponseBody, StrType
 
 log = structlog.get_logger()
 
 KAFKA_OFFSET_LATEST = 'latest'
 KAFKA_OFFSET_EARLIEST = 'earliest'
+ARG_FROM_TOPIC = 'fromTopic'
 
 
 class KafkaMessagingError(BaseException):
@@ -406,6 +407,13 @@
         from Kafka.
         """
 
+        def _augment_args_with_FromTopic(args, from_topic):
+            arg = Argument(key=ARG_FROM_TOPIC)
+            t = StrType(val=from_topic)
+            arg.value.Pack(t)
+            args.extend([arg])
+            return args
+
         def _toDict(args):
             """
             Convert a repeatable Argument type into a python dictionary
@@ -451,12 +459,15 @@
                     log.debug("unsupported-msg", msg_type=type(message.body))
                     return
                 if targetted_topic in self.topic_target_cls_map:
-                    if msg_body.args:
+                    # Augment the request arguments with the from_topic
+                    augmented_args = _augment_args_with_FromTopic(msg_body.args,
+                                                        msg_body.reply_to_topic)
+                    if augmented_args:
                         log.debug("message-body-args-present", body=msg_body)
                         (status, res) = yield getattr(
                             self.topic_target_cls_map[targetted_topic],
                             self._to_string(msg_body.rpc))(
-                            **_toDict(msg_body.args))
+                            **_toDict(augmented_args))
                     else:
                         log.debug("message-body-args-absent", body=msg_body,
                                   rpc=msg_body.rpc)
diff --git a/python/adapters/ponsim_olt/main.py b/python/adapters/ponsim_olt/main.py
index 86b6be0..e3c5b7f 100755
--- a/python/adapters/ponsim_olt/main.py
+++ b/python/adapters/ponsim_olt/main.py
@@ -359,7 +359,7 @@
 
             self.core_proxy = CoreProxy(
                 kafka_proxy=None,
-                core_topic=self.core_topic,
+                default_core_topic=self.core_topic,
                 my_listening_topic=self.listening_topic)
 
             self.adapter_proxy = AdapterProxy(
@@ -371,7 +371,8 @@
                                             adapter_proxy=self.adapter_proxy,
                                             config=config)
 
-            ponsim_request_handler = AdapterRequestFacade(adapter=self.adapter)
+            ponsim_request_handler = AdapterRequestFacade(adapter=self.adapter,
+                                                    core_proxy=self.core_proxy)
 
             yield registry.register(
                 'kafka_adapter_proxy',
diff --git a/python/adapters/ponsim_olt/ponsim_olt.py b/python/adapters/ponsim_olt/ponsim_olt.py
index ff5ee00..4f5af65 100644
--- a/python/adapters/ponsim_olt/ponsim_olt.py
+++ b/python/adapters/ponsim_olt/ponsim_olt.py
@@ -315,6 +315,7 @@
                 hw_desc='ponsim pon',
                 sw_desc='ponsim pon',
                 serial_num=device.serial_number,
+                mfr_desc="VOLTHA Project",
                 dp_desc='n/a'
             ),
             switch_features=ofp_switch_features(
diff --git a/python/adapters/ponsim_onu/main.py b/python/adapters/ponsim_onu/main.py
index c4c12db..79320c2 100755
--- a/python/adapters/ponsim_onu/main.py
+++ b/python/adapters/ponsim_onu/main.py
@@ -358,7 +358,7 @@
 
             self.core_proxy = CoreProxy(
                 kafka_proxy=None,
-                core_topic=self.core_topic,
+                default_core_topic=self.core_topic,
                 my_listening_topic=self.listening_topic)
 
             self.adapter_proxy = AdapterProxy(
@@ -370,7 +370,7 @@
                 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
                 config=config)
             ponsim_request_handler = AdapterRequestFacade(
-                adapter=self.adapter)
+                adapter=self.adapter, core_proxy=self.core_proxy)
 
             yield registry.register(
                 'kafka_adapter_proxy',
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ed09c9d..b3ba10d 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -30,12 +30,15 @@
 
 type AdapterProxy struct {
 	TestMode     bool
+	deviceTopicRegistered bool
+	coreTopic *kafka.Topic
 	kafkaICProxy *kafka.InterContainerProxy
 }
 
 func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
 	var proxy AdapterProxy
 	proxy.kafkaICProxy = kafkaProxy
+	proxy.deviceTopicRegistered = false
 	return &proxy
 }
 
@@ -55,22 +58,42 @@
 	}
 }
 
+func (ap *AdapterProxy) updateCoreTopic(coreTopic *kafka.Topic) {
+	ap.coreTopic = coreTopic
+}
+
+func (ap *AdapterProxy) getCoreTopic() kafka.Topic{
+	if ap.coreTopic != nil {
+		return *ap.coreTopic
+	}
+	return kafka.Topic{Name:ap.kafkaICProxy.DefaultTopic.Name}
+}
+
+func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic{
+	return kafka.Topic{Name: adapterName}
+}
+
+
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
 	rpc := "adopt_device"
-	topic := kafka.Topic{Name: device.Adapter}
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	//topic := kafka.Topic{Name: device.Adapter}
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device topic for the response as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
-	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
-		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
-		return err
-	}
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	replyToTopic := ap.getCoreTopic()
+	//if !ap.deviceTopicRegistered {
+	//	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
+	//		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+	//		return err
+	//	}
+	//}
+	ap.deviceTopicRegistered = true
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -78,16 +101,19 @@
 func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "disable_device"
+	toTopic := ap.getAdapterTopic(device.Adapter)
+
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
 	// specific topic
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	//toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	//replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -96,14 +122,14 @@
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reenable_device"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -112,14 +138,14 @@
 func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reboot_device"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -128,14 +154,14 @@
 func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
 	rpc := "delete_device"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 
@@ -150,14 +176,14 @@
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
 	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
@@ -181,7 +207,7 @@
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
 	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -193,7 +219,7 @@
 		Value: pNo,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
@@ -250,7 +276,7 @@
 func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "download_image"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -261,7 +287,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -271,7 +297,7 @@
 func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
 	log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -282,7 +308,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -308,7 +334,7 @@
 func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -319,7 +345,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -329,7 +355,7 @@
 func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -340,7 +366,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -350,7 +376,7 @@
 func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
 	log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -361,7 +387,7 @@
 		Value: download,
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
@@ -375,7 +401,7 @@
 
 func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
 	log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
-	toTopic := kafka.CreateSubTopic(deviceType, deviceId)
+	toTopic := ap.getAdapterTopic(deviceType)
 	rpc := "receive_packet_out"
 	dId := &ic.StrType{Val: deviceId}
 	args := make([]*kafka.KVArg, 3)
@@ -395,15 +421,15 @@
 
 	// TODO:  Do we need to wait for an ACK on a packet Out?
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
+	replyToTopic := ap.getCoreTopic()
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
 
 func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -420,7 +446,7 @@
 	}
 
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
@@ -428,7 +454,7 @@
 
 func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
 	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
-	toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
+	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -445,7 +471,7 @@
 	}
 
 	// Use a device specific topic as we are the only core handling requests for this device
-	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	replyToTopic := ap.getCoreTopic()
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 88b622e..1be0cff 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -81,7 +81,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -130,7 +130,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -198,7 +198,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -292,7 +292,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -330,7 +330,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Devices, error) {
-	if len(args) != 2 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -623,7 +623,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -674,7 +674,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 3 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 838235d..d576a6e 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -17,6 +17,8 @@
 
 import (
 	"context"
+	"errors"
+	"fmt"
 	grpcserver "github.com/opencord/voltha-go/common/grpc"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
@@ -43,6 +45,7 @@
 	exitChannel       chan int
 	kvClient          kvstore.Client
 	kafkaClient       kafka.Client
+	coreMembershipRegistered bool
 }
 
 func init() {
@@ -72,6 +75,7 @@
 	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
 	core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
 	core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
+	core.coreMembershipRegistered = false
 	return &core
 }
 
@@ -84,7 +88,8 @@
 	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
 	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
 	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
-	if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+
+	if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
 		log.Fatal("Failure-registering-adapterRequestHandler")
 	}
 	go core.startDeviceManager(ctx)
@@ -113,7 +118,8 @@
 	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
 	log.Info("grpc-server-created")
 
-	core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+	//core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+	core.grpcNBIAPIHandler = NewAPIHandler(core)
 	core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
 	//	Create a function to register the core GRPC service with the GRPC server
 	f := func(gs *grpc.Server) {
@@ -154,17 +160,57 @@
 	return nil
 }
 
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+//func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+//	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+//	) error {
+//	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+//		core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+//	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
+//
+//	log.Info("request-handlers")
+//	return nil
+//}
+
+func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
 	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
-	) error {
+) error {
 	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
 		core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
-	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
 
-	log.Info("request-handlers")
+	// Register the broadcast topic to handle any core-bound broadcast requests
+	if err := core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy); err != nil {
+		log.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": core.config.CoreTopic})
+		return err
+	}
+
+	log.Info("request-handler-registered")
 	return nil
 }
 
+
+func (core *Core) registerCoreTopic(ctx context.Context, coreTopicSuffix string) error {
+	// Sanity check - can only register once
+	if core.coreMembershipRegistered {
+		return errors.New("Can-only-set-once")
+	}
+
+	go func(coreTopicSuffix string) {
+		// Register the core-pair topic to handle core-bound requests aimed to the core pair
+		pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, coreTopicSuffix)}
+		// TODO: Set a retry here to ensure this subscription happens
+		if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
+			log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
+		}
+		// Update the CoreTopic to use by the adapter proxy
+		core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
+	}(coreTopicSuffix)
+
+	core.coreMembershipRegistered = true
+	log.Info("request-handlers-registered")
+	return nil
+}
+
+
 func (core *Core) startDeviceManager(ctx context.Context) {
 	// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
 	// callbacks.  For now, until the model is ready, devicemanager will keep a reference to the
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index f72d615..46382c2 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -60,18 +60,34 @@
 	longRunningRequestTimeout int64
 	defaultRequestTimeout int64
 	da.DefaultAPIHandler
+	core *Core
 }
 
-func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+//func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+//	handler := &APIHandler{
+//		deviceMgr:        deviceMgr,
+//		logicalDeviceMgr: lDeviceMgr,
+//		adapterMgr:adapterMgr,
+//		coreInCompetingMode:inCompetingMode,
+//		longRunningRequestTimeout:longRunningRequestTimeout,
+//		defaultRequestTimeout:defaultRequestTimeout,
+//		// TODO: Figure out what the 'hint' parameter to queue.New does
+//		packetInQueue: queue.New(10),
+//	}
+//	return handler
+//}
+
+func NewAPIHandler(core *Core) *APIHandler {
 	handler := &APIHandler{
-		deviceMgr:        deviceMgr,
-		logicalDeviceMgr: lDeviceMgr,
-		adapterMgr:adapterMgr,
-		coreInCompetingMode:inCompetingMode,
-		longRunningRequestTimeout:longRunningRequestTimeout,
-		defaultRequestTimeout:defaultRequestTimeout,
+		deviceMgr:        core.deviceMgr,
+		logicalDeviceMgr: core.logicalDeviceMgr,
+		adapterMgr: core.adapterMgr,
+		coreInCompetingMode: core.config.InCompetingMode,
+		longRunningRequestTimeout:core.config.LongRunningRequestTimeout,
+		defaultRequestTimeout:core.config.DefaultRequestTimeout,
 		// TODO: Figure out what the 'hint' parameter to queue.New does
 		packetInQueue: queue.New(10),
+		core: core,
 	}
 	return handler
 }
@@ -186,6 +202,16 @@
 	return out, nil
 }
 
+
+func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
+	log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
+	out := new(empty.Empty)
+	if err := handler.core.registerCoreTopic(ctx, membership.GroupName); err != nil {
+		return out, err
+	}
+	return out, nil
+}
+
 func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
 	log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
 	if isTestMode(ctx) {
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 1b2fb1e..e313b77 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -306,7 +306,7 @@
 }
 
 func (ldMgr *LogicalDeviceManager) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
-	log.Debugw("AddUNILogicalPort", log.Fields{"deviceId": childDevice.Id})
+	log.Debugw("AddUNILogicalPort", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId})
 	// Sanity check
 	if childDevice.Root {
 		return errors.New("Device-root")
diff --git a/rw_core/main.go b/rw_core/main.go
index 2a29499..41d595b 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -77,7 +77,7 @@
 			kafka.NumPartitions(3),
 			kafka.ConsumerGroupName(instanceID),
 			kafka.ConsumerGroupPrefix(instanceID),
-			kafka.AutoCreateTopic(false),
+			kafka.AutoCreateTopic(true),
 			kafka.ProducerFlushFrequency(5),
 			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
 	}