[VOL-1499] Use precreated topic

This commit migrate from dynamically created kafka topic to
pre-created topic.  The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.

Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 838235d..d576a6e 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -17,6 +17,8 @@
 
 import (
 	"context"
+	"errors"
+	"fmt"
 	grpcserver "github.com/opencord/voltha-go/common/grpc"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
@@ -43,6 +45,7 @@
 	exitChannel       chan int
 	kvClient          kvstore.Client
 	kafkaClient       kafka.Client
+	coreMembershipRegistered bool
 }
 
 func init() {
@@ -72,6 +75,7 @@
 	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
 	core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
 	core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
+	core.coreMembershipRegistered = false
 	return &core
 }
 
@@ -84,7 +88,8 @@
 	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
 	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
 	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
-	if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+
+	if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
 		log.Fatal("Failure-registering-adapterRequestHandler")
 	}
 	go core.startDeviceManager(ctx)
@@ -113,7 +118,8 @@
 	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
 	log.Info("grpc-server-created")
 
-	core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+	//core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+	core.grpcNBIAPIHandler = NewAPIHandler(core)
 	core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
 	//	Create a function to register the core GRPC service with the GRPC server
 	f := func(gs *grpc.Server) {
@@ -154,17 +160,57 @@
 	return nil
 }
 
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+//func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+//	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+//	) error {
+//	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+//		core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+//	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
+//
+//	log.Info("request-handlers")
+//	return nil
+//}
+
+func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
 	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
-	) error {
+) error {
 	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
 		core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
-	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
 
-	log.Info("request-handlers")
+	// Register the broadcast topic to handle any core-bound broadcast requests
+	if err := core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy); err != nil {
+		log.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": core.config.CoreTopic})
+		return err
+	}
+
+	log.Info("request-handler-registered")
 	return nil
 }
 
+
+func (core *Core) registerCoreTopic(ctx context.Context, coreTopicSuffix string) error {
+	// Sanity check - can only register once
+	if core.coreMembershipRegistered {
+		return errors.New("Can-only-set-once")
+	}
+
+	go func(coreTopicSuffix string) {
+		// Register the core-pair topic to handle core-bound requests aimed to the core pair
+		pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, coreTopicSuffix)}
+		// TODO: Set a retry here to ensure this subscription happens
+		if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
+			log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
+		}
+		// Update the CoreTopic to use by the adapter proxy
+		core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
+	}(coreTopicSuffix)
+
+	core.coreMembershipRegistered = true
+	log.Info("request-handlers-registered")
+	return nil
+}
+
+
 func (core *Core) startDeviceManager(ctx context.Context) {
 	// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
 	// callbacks.  For now, until the model is ready, devicemanager will keep a reference to the