[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c52f170..3fa68ef 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -25,65 +25,75 @@
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/api"
"github.com/opencord/voltha-go/rw_core/core/device"
- conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
- grpcserver "github.com/opencord/voltha-lib-go/v5/pkg/grpc"
- "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-lib-go/v5/pkg/probe"
- "github.com/opencord/voltha-protos/v4/go/extension"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events"
+ grpcserver "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ "github.com/opencord/voltha-protos/v5/go/core"
+ "github.com/opencord/voltha-protos/v5/go/extension"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
)
// Core represent read,write core attributes
type Core struct {
- shutdown context.CancelFunc
- stopped chan struct{}
+ Shutdown context.CancelFunc
+ Stopped chan struct{}
+ KafkaClient kafka.Client
}
const (
- adapterMessageBus = "adapter-message-bus"
- clusterMessageBus = "cluster-message-bus"
+ clusterMessagingService = "cluster-message-service"
+ grpcNBIService = "grpc-nbi-service"
+ grpcSBIService = "grpc-sbi-service"
+ adapterService = "adapter-service"
+ kvService = "kv-service"
+ deviceService = "device-service"
+ logicalDeviceService = "logical-device-service"
)
// NewCore creates instance of rw core
-func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
+func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) (*Core, context.Context) {
// If the context has a probe then fetch it and register our services
if p := probe.GetProbeFromContext(ctx); p != nil {
p.RegisterService(
ctx,
- adapterMessageBus,
- "kv-store",
- "adapter-manager",
- "device-manager",
- "logical-device-manager",
- "grpc-service",
- "adapter-request-handler",
+ kvService,
+ adapterService,
+ grpcSBIService,
+ clusterMessagingService,
+ deviceService,
+ logicalDeviceService,
)
-
- if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
- p.RegisterService(
- ctx,
- clusterMessageBus,
- )
- }
}
+ // create kafka client for events
+ KafkaClient := kafka.NewSaramaClient(
+ kafka.Address(cf.KafkaClusterAddress),
+ kafka.ProducerReturnOnErrors(true),
+ kafka.ProducerReturnOnSuccess(true),
+ kafka.ProducerMaxRetries(6),
+ kafka.ProducerRetryBackoff(time.Millisecond*30),
+ kafka.AutoCreateTopic(true),
+ kafka.MetadatMaxRetries(15),
+ )
+
// new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
shutdownCtx, cancelCtx := context.WithCancel(ctx)
- core := &Core{shutdown: cancelCtx, stopped: make(chan struct{})}
- go core.start(shutdownCtx, id, cf)
- return core
+ rwCore := &Core{Shutdown: cancelCtx, Stopped: make(chan struct{}), KafkaClient: KafkaClient}
+ return rwCore, shutdownCtx
}
-func (core *Core) start(ctx context.Context, id string, cf *config.RWCoreFlags) {
+func (core *Core) Start(ctx context.Context, id string, cf *config.RWCoreFlags) {
logger.Info(ctx, "starting-core-services", log.Fields{"coreId": id})
// deferred functions are used to run cleanup
// failing partway will stop anything that's been started
- defer close(core.stopped)
- defer core.shutdown()
+ defer close(core.Stopped)
+ defer core.Shutdown()
logger.Info(ctx, "starting-rw-core-components")
@@ -104,91 +114,69 @@
backend.LivenessChannelInterval = cf.LiveProbeInterval / 2
// wait until connection to KV Store is up
- if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil {
+ if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval, kvService); err != nil {
logger.Fatal(ctx, "unable-to-connect-to-kv-store")
}
- go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
+ go monitorKVStoreLiveness(ctx, backend, kvService, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
- // create kafka client
- kafkaClient := kafka.NewSaramaClient(
- kafka.Address(cf.KafkaAdapterAddress),
- kafka.ConsumerType(kafka.GroupCustomer),
- kafka.ProducerReturnOnErrors(true),
- kafka.ProducerReturnOnSuccess(true),
- kafka.ProducerMaxRetries(6),
- kafka.NumPartitions(3),
- kafka.ConsumerGroupName(id),
- kafka.ConsumerGroupPrefix(id),
- kafka.AutoCreateTopic(true),
- kafka.ProducerFlushFrequency(5),
- kafka.ProducerRetryBackoff(time.Millisecond*30),
- kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
- )
-
- // create kafka client for events
- kafkaClientEvent := kafka.NewSaramaClient(
- kafka.Address(cf.KafkaClusterAddress),
- kafka.ProducerReturnOnErrors(true),
- kafka.ProducerReturnOnSuccess(true),
- kafka.ProducerMaxRetries(6),
- kafka.ProducerRetryBackoff(time.Millisecond*30),
- kafka.AutoCreateTopic(true),
- kafka.MetadatMaxRetries(15),
- )
-
- // create event proxy
- updateProbeClusterService := cf.KafkaAdapterAddress != cf.KafkaClusterAddress
- eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval, updateProbeClusterService)
- if err != nil {
- logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
- return
+ // Start kafka communications and artefacts
+ if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, core.KafkaClient, cf.ConnectionRetryInterval, clusterMessagingService); err != nil {
+ logger.Fatal(ctx, "unable-to-connect-to-kafka")
}
- if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
- // if we're using a single kafka cluster we don't need two liveliness probes on the same cluster
- go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
- }
+ defer core.KafkaClient.Stop(ctx)
- defer stopEventProxy(ctx, kafkaClientEvent, eventProxy)
+ // Create the event proxy to post events to KAFKA
+ eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
+ go func() {
+ if err := eventProxy.Start(); err != nil {
+ logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
+ }
+ }()
+ defer eventProxy.Stop()
+
+ // Start the kafka monitoring routine
+ go kafka.MonitorKafkaReadiness(ctx, core.KafkaClient, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessagingService)
// create kv path
dbPath := model.NewDBPath(backend)
// load adapters & device types while other things are starting
- adapterMgr := adapter.NewAdapterManager(ctx, dbPath, id, kafkaClient)
- go adapterMgr.Start(ctx)
-
- // connect to kafka, then wait until reachable and publisher/consumer created
- // core.kmp must be created before deviceMgr and adapterMgr
- kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
- if err != nil {
- logger.Warn(ctx, "failed-to-setup-kafka-adapter-proxy-connection")
- return
- }
- defer kmp.Stop(ctx)
- go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval, adapterMessageBus)
+ adapterMgr := adapter.NewAdapterManager(dbPath, id, backend, cf.LiveProbeInterval)
+ adapterMgr.Start(ctx, adapterService)
// create the core of the system, the device managers
- endpointMgr := kafka.NewEndpointManager(backend)
- deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf, id, eventProxy)
+ deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, cf, id, eventProxy)
// Start the device manager to load the devices. Wait until it is completed to prevent multiple loading happening
// triggered by logicalDeviceMgr.Start(Ctx)
- deviceMgr.Start(ctx)
+ err = deviceMgr.Start(ctx, deviceService)
+ if err != nil {
+ logger.Fatalw(ctx, "failure-starting-device-manager", log.Fields{"error": err})
+ }
// Start the logical device manager to load the logical devices.
- logicalDeviceMgr.Start(ctx)
+ logicalDeviceMgr.Start(ctx, logicalDeviceService)
- // register kafka RPC handler
- registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf, "adapter-request-handler")
+ // Create and start the SBI gRPC service
+ grpcSBIServer := grpcserver.NewGrpcServer(cf.GrpcSBIAddress, nil, false, probe.GetProbeFromContext(ctx))
+ go startGrpcSbiService(ctx, grpcSBIServer, grpcSBIService, api.NewAPIHandler(deviceMgr, nil, adapterMgr))
+ defer grpcSBIServer.Stop()
- // start gRPC handler
- grpcServer := grpcserver.NewGrpcServer(cf.GrpcAddress, nil, false, probe.GetProbeFromContext(ctx))
+ // In the case of a restart, let's wait until all the registered adapters are connected to the Core
+ // before starting the grpc server that handles NBI requests.
+ err = adapterMgr.WaitUntilConnectionsToAdaptersAreUp(ctx, cf.ConnectionRetryInterval)
+ if err != nil {
+ logger.Fatalw(ctx, "failure-connecting-to-adapters", log.Fields{"error": err})
+ }
+
+ // Create the NBI gRPC server
+ grpcNBIServer := grpcserver.NewGrpcServer(cf.GrpcNBIAddress, nil, false, probe.GetProbeFromContext(ctx))
//Register the 'Extension' service on this gRPC server
- addGRPCExtensionService(ctx, grpcServer, device.GetNewExtensionManager(deviceMgr))
+ addGRPCExtensionService(ctx, grpcNBIServer, device.GetNewExtensionManager(deviceMgr))
- go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
- defer grpcServer.Stop()
+ go startGrpcNbiService(ctx, grpcNBIServer, grpcNBIService, api.NewAPIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
+ defer grpcNBIServer.Stop()
// wait for core to be stopped, via Stop() or context cancellation, before running deferred functions
<-ctx.Done()
@@ -196,23 +184,33 @@
// Stop brings down core services
func (core *Core) Stop() {
- core.shutdown()
- <-core.stopped
+ core.Shutdown()
+ <-core.Stopped
}
-// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
-func startGRPCService(ctx context.Context, server *grpcserver.GrpcServer, handler voltha.VolthaServiceServer) {
- logger.Info(ctx, "grpc-server-created")
+// startGrpcSbiService creates the grpc core service handlers, registers it to the grpc server and starts the server
+func startGrpcSbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler core.CoreServiceServer) {
+ logger.Infow(ctx, "starting-grpc-sbi-service", log.Fields{"service": serviceName})
+
+ server.AddService(func(server *grpc.Server) { core.RegisterCoreServiceServer(server, handler) })
+ logger.Infow(ctx, "grpc-sbi-service-added", log.Fields{"service": serviceName})
+
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Infow(ctx, "grpc-sbi-server-started", log.Fields{"service": serviceName})
+ server.Start(ctx)
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
+}
+
+// startGrpcNbiService creates the grpc NBI service handlers, registers it to the grpc server and starts the server
+func startGrpcNbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler voltha.VolthaServiceServer) {
+ logger.Infow(ctx, "starting-grpc-nbi-service", log.Fields{"service": serviceName})
server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) })
- logger.Info(ctx, "grpc-service-added")
+ logger.Infow(ctx, "grpc-nbi-service-added-and-started", log.Fields{"service": serviceName})
- probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
- logger.Info(ctx, "grpc-server-started")
// Note that there is a small window here in which the core could return its status as ready,
// when it really isn't. This is unlikely to cause issues, as the delay is incredibly short.
server.Start(ctx)
- probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
}
func addGRPCExtensionService(ctx context.Context, server *grpcserver.GrpcServer, handler extension.ExtensionServer) {
@@ -221,5 +219,4 @@
server.AddService(func(server *grpc.Server) {
extension.RegisterExtensionServer(server, handler)
})
-
}