This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 6015e7f..7423563 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -34,32 +34,34 @@
 	grpcServer        *grpcserver.GrpcServer
 	grpcNBIAPIHanfler *APIHandler
 	config            *config.RWCoreFlags
-	kmp               *kafka.KafkaMessagingProxy
+	kmp               *kafka.InterContainerProxy
 	clusterDataRoot   model.Root
 	localDataRoot     model.Root
 	clusterDataProxy  *model.Proxy
 	localDataProxy    *model.Proxy
 	exitChannel       chan int
 	kvClient          kvstore.Client
+	kafkaClient       kafka.Client
 }
 
 func init() {
 	log.AddPackage(log.JSON, log.WarnLevel, nil)
 }
 
-func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client) *Core {
+func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client, kafkaClient kafka.Client) *Core {
 	var core Core
 	core.instanceId = id
 	core.exitChannel = make(chan int, 1)
 	core.config = cf
 	core.kvClient = kvClient
+	core.kafkaClient = kafkaClient
 
 	// Setup the KV store
 	// Do not call NewBackend constructor; it creates its own KV client
 	// Commented the backend for now until the issue between the model and the KV store
 	// is resolved.
 	//backend := model.Backend{
-	//	Client:     kvClient,
+	//	MsgClient:     kvClient,
 	//	StoreType:  cf.KVStoreType,
 	//	Host:       cf.KVStoreHost,
 	//	Port:       cf.KVStorePort,
@@ -67,8 +69,8 @@
 	//	PathPrefix: "service/voltha"}
 	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
 	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
-	core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
-	core.localDataProxy = core.localDataRoot.GetProxy("/", false)
+	core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
+	core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
 	return &core
 }
 
@@ -78,7 +80,7 @@
 	log.Info("values", log.Fields{"kmp": core.kmp})
 	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy)
 	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
-	core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
+	core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy)
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
 	go core.startGRPCService(ctx)
@@ -89,6 +91,11 @@
 func (core *Core) Stop(ctx context.Context) {
 	log.Info("stopping-core")
 	core.exitChannel <- 1
+	// Stop all the started services
+	core.grpcServer.Stop()
+	core.logicalDeviceMgr.stop(ctx)
+	core.deviceMgr.stop(ctx)
+	core.kmp.Stop()
 	log.Info("core-stopped")
 }
 
@@ -120,9 +127,10 @@
 	log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
 		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
 	var err error
-	if core.kmp, err = kafka.NewKafkaMessagingProxy(
-		kafka.KafkaHost(core.config.KafkaAdapterHost),
-		kafka.KafkaPort(core.config.KafkaAdapterPort),
+	if core.kmp, err = kafka.NewInterContainerProxy(
+		kafka.InterContainerHost(core.config.KafkaAdapterHost),
+		kafka.InterContainerPort(core.config.KafkaAdapterPort),
+		kafka.MsgClient(core.kafkaClient),
 		kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic})); err != nil {
 		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
 		return err
@@ -140,7 +148,7 @@
 func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
 	cdProxy *model.Proxy, ldProxy *model.Proxy) error {
 	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, cdProxy, ldProxy)
-	core.kmp.SubscribeWithTarget(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
+	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
 
 	log.Info("request-handlers")
 	return nil