VOL-2909 - Disaggregating rw_core/core/.
This breaks the core package into logical components. (adapter manager, adapter proxy, devices, nbi/api), as well as the "core" which aggregates all these.
Change-Id: I257ac64024a1cf3efe3f5d89d508e60e6e681fb1
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 5043d47..de126a2 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -23,6 +23,9 @@
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/api"
+ "github.com/opencord/voltha-go/rw_core/core/device"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
grpcserver "github.com/opencord/voltha-lib-go/v3/pkg/grpc"
@@ -38,11 +41,11 @@
// Core represent read,write core attributes
type Core struct {
instanceID string
- deviceMgr *DeviceManager
- logicalDeviceMgr *LogicalDeviceManager
+ deviceMgr *device.Manager
+ logicalDeviceMgr *device.LogicalManager
grpcServer *grpcserver.GrpcServer
- grpcNBIAPIHandler *APIHandler
- adapterMgr *AdapterManager
+ grpcNBIAPIHandler *api.NBIHandler
+ adapterMgr *adapter.Manager
config *config.RWCoreFlags
kmp kafka.InterContainerProxy
clusterDataProxy *model.Proxy
@@ -108,6 +111,8 @@
p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
}
+ endpointMgr := kafka.NewEndpointManager(&core.backend)
+
core.clusterDataProxy = model.NewProxy(&core.backend, "/")
core.localDataProxy = model.NewProxy(&core.backend, "/")
@@ -116,10 +121,8 @@
core.initKafkaManager(ctx)
logger.Debugw("values", log.Fields{"kmp": core.kmp})
- core.deviceMgr = newDeviceManager(core)
- core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceID, core.kafkaClient, core.deviceMgr)
- core.deviceMgr.adapterMgr = core.adapterMgr
- core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
+ core.adapterMgr = adapter.NewAdapterManager(core.clusterDataProxy, core.instanceID, core.kafkaClient)
+ core.deviceMgr, core.logicalDeviceMgr = device.NewDeviceManagers(core.clusterDataProxy, core.adapterMgr, core.kmp, endpointMgr, core.config.CorePairTopic, core.instanceID, core.config.DefaultCoreTimeout)
// Start the KafkaManager. This must be done after the deviceMgr, adapterMgr, and
// logicalDeviceMgr have been created, as once the kmp is started, it will register
@@ -151,10 +154,10 @@
core.grpcServer.Stop()
}
if core.logicalDeviceMgr != nil {
- core.logicalDeviceMgr.stop(ctx)
+ core.logicalDeviceMgr.Stop(ctx)
}
if core.deviceMgr != nil {
- core.deviceMgr.stop(ctx)
+ core.deviceMgr.Stop(ctx)
}
if core.kmp != nil {
core.kmp.Stop()
@@ -169,9 +172,9 @@
core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
logger.Info("grpc-server-created")
- core.grpcNBIAPIHandler = NewAPIHandler(core)
+ core.grpcNBIAPIHandler = api.NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr)
logger.Infow("grpc-handler", log.Fields{"core_binding_key": core.config.CoreBindingKey})
- core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
+ core.logicalDeviceMgr.SetEventCallbacks(core.grpcNBIAPIHandler)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
voltha.RegisterVolthaServiceServer(
@@ -355,10 +358,10 @@
return nil
}
-func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceID string, dMgr *DeviceManager,
- ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceID string, dMgr *device.Manager,
+ ldMgr *device.LogicalManager, aMgr *adapter.Manager, cdProxy *model.Proxy, ldProxy *model.Proxy,
) error {
- requestProxy := NewAdapterRequestHandlerProxy(core, coreInstanceID, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+ requestProxy := api.NewAdapterRequestHandlerProxy(coreInstanceID, dMgr, aMgr, cdProxy, ldProxy,
core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
// Register the broadcast topic to handle any core-bound broadcast requests
@@ -379,19 +382,19 @@
func (core *Core) startDeviceManager(ctx context.Context) {
logger.Info("DeviceManager-Starting...")
- core.deviceMgr.start(ctx, core.logicalDeviceMgr)
+ core.deviceMgr.Start(ctx)
logger.Info("DeviceManager-Started")
}
func (core *Core) startLogicalDeviceManager(ctx context.Context) {
logger.Info("Logical-DeviceManager-Starting...")
- core.logicalDeviceMgr.start(ctx)
+ core.logicalDeviceMgr.Start(ctx)
logger.Info("Logical-DeviceManager-Started")
}
func (core *Core) startAdapterManager(ctx context.Context) {
logger.Info("Adapter-Manager-Starting...")
- err := core.adapterMgr.start(ctx)
+ err := core.adapterMgr.Start(ctx)
if err != nil {
logger.Fatalf("failed-to-start-adapter-manager: error %v ", err)
}