[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 838235d..d576a6e 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -17,6 +17,8 @@
import (
"context"
+ "errors"
+ "fmt"
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
@@ -43,6 +45,7 @@
exitChannel chan int
kvClient kvstore.Client
kafkaClient kafka.Client
+ coreMembershipRegistered bool
}
func init() {
@@ -72,6 +75,7 @@
core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
+ core.coreMembershipRegistered = false
return &core
}
@@ -84,7 +88,8 @@
core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
- if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+
+ if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
log.Fatal("Failure-registering-adapterRequestHandler")
}
go core.startDeviceManager(ctx)
@@ -113,7 +118,8 @@
core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
log.Info("grpc-server-created")
- core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+ //core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+ core.grpcNBIAPIHandler = NewAPIHandler(core)
core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
@@ -154,17 +160,57 @@
return nil
}
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+//func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+// ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+// ) error {
+// requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+// core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+// core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
+//
+// log.Info("request-handlers")
+// return nil
+//}
+
+func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
- ) error {
+) error {
requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
- core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
- log.Info("request-handlers")
+ // Register the broadcast topic to handle any core-bound broadcast requests
+ if err := core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy); err != nil {
+ log.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": core.config.CoreTopic})
+ return err
+ }
+
+ log.Info("request-handler-registered")
return nil
}
+
+func (core *Core) registerCoreTopic(ctx context.Context, coreTopicSuffix string) error {
+ // Sanity check - can only register once
+ if core.coreMembershipRegistered {
+ return errors.New("Can-only-set-once")
+ }
+
+ go func(coreTopicSuffix string) {
+ // Register the core-pair topic to handle core-bound requests aimed to the core pair
+ pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, coreTopicSuffix)}
+ // TODO: Set a retry here to ensure this subscription happens
+ if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
+ log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
+ }
+ // Update the CoreTopic to use by the adapter proxy
+ core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
+ }(coreTopicSuffix)
+
+ core.coreMembershipRegistered = true
+ log.Info("request-handlers-registered")
+ return nil
+}
+
+
func (core *Core) startDeviceManager(ctx context.Context) {
// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
// callbacks. For now, until the model is ready, devicemanager will keep a reference to the