VOL-2909 - Disaggregating rw_core/core/.

This breaks the core package into logical components. (adapter manager, adapter proxy, devices, nbi/api), as well as the "core" which aggregates all these.

Change-Id: I257ac64024a1cf3efe3f5d89d508e60e6e681fb1
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 5043d47..de126a2 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -23,6 +23,9 @@
 
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/config"
+	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	"github.com/opencord/voltha-go/rw_core/core/api"
+	"github.com/opencord/voltha-go/rw_core/core/device"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
 	grpcserver "github.com/opencord/voltha-lib-go/v3/pkg/grpc"
@@ -38,11 +41,11 @@
 // Core represent read,write core attributes
 type Core struct {
 	instanceID        string
-	deviceMgr         *DeviceManager
-	logicalDeviceMgr  *LogicalDeviceManager
+	deviceMgr         *device.Manager
+	logicalDeviceMgr  *device.LogicalManager
 	grpcServer        *grpcserver.GrpcServer
-	grpcNBIAPIHandler *APIHandler
-	adapterMgr        *AdapterManager
+	grpcNBIAPIHandler *api.NBIHandler
+	adapterMgr        *adapter.Manager
 	config            *config.RWCoreFlags
 	kmp               kafka.InterContainerProxy
 	clusterDataProxy  *model.Proxy
@@ -108,6 +111,8 @@
 		p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
 	}
 
+	endpointMgr := kafka.NewEndpointManager(&core.backend)
+
 	core.clusterDataProxy = model.NewProxy(&core.backend, "/")
 	core.localDataProxy = model.NewProxy(&core.backend, "/")
 
@@ -116,10 +121,8 @@
 	core.initKafkaManager(ctx)
 
 	logger.Debugw("values", log.Fields{"kmp": core.kmp})
-	core.deviceMgr = newDeviceManager(core)
-	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceID, core.kafkaClient, core.deviceMgr)
-	core.deviceMgr.adapterMgr = core.adapterMgr
-	core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
+	core.adapterMgr = adapter.NewAdapterManager(core.clusterDataProxy, core.instanceID, core.kafkaClient)
+	core.deviceMgr, core.logicalDeviceMgr = device.NewDeviceManagers(core.clusterDataProxy, core.adapterMgr, core.kmp, endpointMgr, core.config.CorePairTopic, core.instanceID, core.config.DefaultCoreTimeout)
 
 	// Start the KafkaManager. This must be done after the deviceMgr, adapterMgr, and
 	// logicalDeviceMgr have been created, as once the kmp is started, it will register
@@ -151,10 +154,10 @@
 			core.grpcServer.Stop()
 		}
 		if core.logicalDeviceMgr != nil {
-			core.logicalDeviceMgr.stop(ctx)
+			core.logicalDeviceMgr.Stop(ctx)
 		}
 		if core.deviceMgr != nil {
-			core.deviceMgr.stop(ctx)
+			core.deviceMgr.Stop(ctx)
 		}
 		if core.kmp != nil {
 			core.kmp.Stop()
@@ -169,9 +172,9 @@
 	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
 	logger.Info("grpc-server-created")
 
-	core.grpcNBIAPIHandler = NewAPIHandler(core)
+	core.grpcNBIAPIHandler = api.NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr)
 	logger.Infow("grpc-handler", log.Fields{"core_binding_key": core.config.CoreBindingKey})
-	core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
+	core.logicalDeviceMgr.SetEventCallbacks(core.grpcNBIAPIHandler)
 	//	Create a function to register the core GRPC service with the GRPC server
 	f := func(gs *grpc.Server) {
 		voltha.RegisterVolthaServiceServer(
@@ -355,10 +358,10 @@
 	return nil
 }
 
-func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceID string, dMgr *DeviceManager,
-	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceID string, dMgr *device.Manager,
+	ldMgr *device.LogicalManager, aMgr *adapter.Manager, cdProxy *model.Proxy, ldProxy *model.Proxy,
 ) error {
-	requestProxy := NewAdapterRequestHandlerProxy(core, coreInstanceID, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+	requestProxy := api.NewAdapterRequestHandlerProxy(coreInstanceID, dMgr, aMgr, cdProxy, ldProxy,
 		core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
 
 	// Register the broadcast topic to handle any core-bound broadcast requests
@@ -379,19 +382,19 @@
 
 func (core *Core) startDeviceManager(ctx context.Context) {
 	logger.Info("DeviceManager-Starting...")
-	core.deviceMgr.start(ctx, core.logicalDeviceMgr)
+	core.deviceMgr.Start(ctx)
 	logger.Info("DeviceManager-Started")
 }
 
 func (core *Core) startLogicalDeviceManager(ctx context.Context) {
 	logger.Info("Logical-DeviceManager-Starting...")
-	core.logicalDeviceMgr.start(ctx)
+	core.logicalDeviceMgr.Start(ctx)
 	logger.Info("Logical-DeviceManager-Started")
 }
 
 func (core *Core) startAdapterManager(ctx context.Context) {
 	logger.Info("Adapter-Manager-Starting...")
-	err := core.adapterMgr.start(ctx)
+	err := core.adapterMgr.Start(ctx)
 	if err != nil {
 		logger.Fatalf("failed-to-start-adapter-manager: error %v ", err)
 	}