[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c52f170..3fa68ef 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -25,65 +25,75 @@
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	"github.com/opencord/voltha-go/rw_core/core/api"
 	"github.com/opencord/voltha-go/rw_core/core/device"
-	conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
-	grpcserver "github.com/opencord/voltha-lib-go/v5/pkg/grpc"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-lib-go/v5/pkg/probe"
-	"github.com/opencord/voltha-protos/v4/go/extension"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events"
+	grpcserver "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+	"github.com/opencord/voltha-protos/v5/go/core"
+	"github.com/opencord/voltha-protos/v5/go/extension"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc"
 )
 
 // Core represent read,write core attributes
 type Core struct {
-	shutdown context.CancelFunc
-	stopped  chan struct{}
+	Shutdown    context.CancelFunc
+	Stopped     chan struct{}
+	KafkaClient kafka.Client
 }
 
 const (
-	adapterMessageBus = "adapter-message-bus"
-	clusterMessageBus = "cluster-message-bus"
+	clusterMessagingService = "cluster-message-service"
+	grpcNBIService          = "grpc-nbi-service"
+	grpcSBIService          = "grpc-sbi-service"
+	adapterService          = "adapter-service"
+	kvService               = "kv-service"
+	deviceService           = "device-service"
+	logicalDeviceService    = "logical-device-service"
 )
 
 // NewCore creates instance of rw core
-func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
+func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) (*Core, context.Context) {
 	// If the context has a probe then fetch it and register our services
 	if p := probe.GetProbeFromContext(ctx); p != nil {
 		p.RegisterService(
 			ctx,
-			adapterMessageBus,
-			"kv-store",
-			"adapter-manager",
-			"device-manager",
-			"logical-device-manager",
-			"grpc-service",
-			"adapter-request-handler",
+			kvService,
+			adapterService,
+			grpcSBIService,
+			clusterMessagingService,
+			deviceService,
+			logicalDeviceService,
 		)
-
-		if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
-			p.RegisterService(
-				ctx,
-				clusterMessageBus,
-			)
-		}
 	}
 
+	// create kafka client for events
+	KafkaClient := kafka.NewSaramaClient(
+		kafka.Address(cf.KafkaClusterAddress),
+		kafka.ProducerReturnOnErrors(true),
+		kafka.ProducerReturnOnSuccess(true),
+		kafka.ProducerMaxRetries(6),
+		kafka.ProducerRetryBackoff(time.Millisecond*30),
+		kafka.AutoCreateTopic(true),
+		kafka.MetadatMaxRetries(15),
+	)
+
 	// new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
 	shutdownCtx, cancelCtx := context.WithCancel(ctx)
 
-	core := &Core{shutdown: cancelCtx, stopped: make(chan struct{})}
-	go core.start(shutdownCtx, id, cf)
-	return core
+	rwCore := &Core{Shutdown: cancelCtx, Stopped: make(chan struct{}), KafkaClient: KafkaClient}
+	return rwCore, shutdownCtx
 }
 
-func (core *Core) start(ctx context.Context, id string, cf *config.RWCoreFlags) {
+func (core *Core) Start(ctx context.Context, id string, cf *config.RWCoreFlags) {
 	logger.Info(ctx, "starting-core-services", log.Fields{"coreId": id})
 
 	// deferred functions are used to run cleanup
 	// failing partway will stop anything that's been started
-	defer close(core.stopped)
-	defer core.shutdown()
+	defer close(core.Stopped)
+	defer core.Shutdown()
 
 	logger.Info(ctx, "starting-rw-core-components")
 
@@ -104,91 +114,69 @@
 	backend.LivenessChannelInterval = cf.LiveProbeInterval / 2
 
 	// wait until connection to KV Store is up
-	if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil {
+	if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval, kvService); err != nil {
 		logger.Fatal(ctx, "unable-to-connect-to-kv-store")
 	}
-	go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
+	go monitorKVStoreLiveness(ctx, backend, kvService, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
 
-	// create kafka client
-	kafkaClient := kafka.NewSaramaClient(
-		kafka.Address(cf.KafkaAdapterAddress),
-		kafka.ConsumerType(kafka.GroupCustomer),
-		kafka.ProducerReturnOnErrors(true),
-		kafka.ProducerReturnOnSuccess(true),
-		kafka.ProducerMaxRetries(6),
-		kafka.NumPartitions(3),
-		kafka.ConsumerGroupName(id),
-		kafka.ConsumerGroupPrefix(id),
-		kafka.AutoCreateTopic(true),
-		kafka.ProducerFlushFrequency(5),
-		kafka.ProducerRetryBackoff(time.Millisecond*30),
-		kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
-	)
-
-	// create kafka client for events
-	kafkaClientEvent := kafka.NewSaramaClient(
-		kafka.Address(cf.KafkaClusterAddress),
-		kafka.ProducerReturnOnErrors(true),
-		kafka.ProducerReturnOnSuccess(true),
-		kafka.ProducerMaxRetries(6),
-		kafka.ProducerRetryBackoff(time.Millisecond*30),
-		kafka.AutoCreateTopic(true),
-		kafka.MetadatMaxRetries(15),
-	)
-
-	// create event proxy
-	updateProbeClusterService := cf.KafkaAdapterAddress != cf.KafkaClusterAddress
-	eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval, updateProbeClusterService)
-	if err != nil {
-		logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
-		return
+	// Start kafka communications and artefacts
+	if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, core.KafkaClient, cf.ConnectionRetryInterval, clusterMessagingService); err != nil {
+		logger.Fatal(ctx, "unable-to-connect-to-kafka")
 	}
-	if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
-		// if we're using a single kafka cluster we don't need two liveliness probes on the same cluster
-		go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
-	}
+	defer core.KafkaClient.Stop(ctx)
 
-	defer stopEventProxy(ctx, kafkaClientEvent, eventProxy)
+	// Create the event proxy to post events to KAFKA
+	eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
+	go func() {
+		if err := eventProxy.Start(); err != nil {
+			logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
+		}
+	}()
+	defer eventProxy.Stop()
+
+	// Start the kafka monitoring routine
+	go kafka.MonitorKafkaReadiness(ctx, core.KafkaClient, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessagingService)
 
 	// create kv path
 	dbPath := model.NewDBPath(backend)
 
 	// load adapters & device types while other things are starting
-	adapterMgr := adapter.NewAdapterManager(ctx, dbPath, id, kafkaClient)
-	go adapterMgr.Start(ctx)
-
-	// connect to kafka, then wait until reachable and publisher/consumer created
-	// core.kmp must be created before deviceMgr and adapterMgr
-	kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
-	if err != nil {
-		logger.Warn(ctx, "failed-to-setup-kafka-adapter-proxy-connection")
-		return
-	}
-	defer kmp.Stop(ctx)
-	go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval, adapterMessageBus)
+	adapterMgr := adapter.NewAdapterManager(dbPath, id, backend, cf.LiveProbeInterval)
+	adapterMgr.Start(ctx, adapterService)
 
 	// create the core of the system, the device managers
-	endpointMgr := kafka.NewEndpointManager(backend)
-	deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf, id, eventProxy)
+	deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, cf, id, eventProxy)
 
 	// Start the device manager to load the devices. Wait until it is completed to prevent multiple loading happening
 	// triggered by logicalDeviceMgr.Start(Ctx)
-	deviceMgr.Start(ctx)
+	err = deviceMgr.Start(ctx, deviceService)
+	if err != nil {
+		logger.Fatalw(ctx, "failure-starting-device-manager", log.Fields{"error": err})
+	}
 
 	// Start the logical device manager to load the logical devices.
-	logicalDeviceMgr.Start(ctx)
+	logicalDeviceMgr.Start(ctx, logicalDeviceService)
 
-	// register kafka RPC handler
-	registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf, "adapter-request-handler")
+	// Create and start the SBI gRPC service
+	grpcSBIServer := grpcserver.NewGrpcServer(cf.GrpcSBIAddress, nil, false, probe.GetProbeFromContext(ctx))
+	go startGrpcSbiService(ctx, grpcSBIServer, grpcSBIService, api.NewAPIHandler(deviceMgr, nil, adapterMgr))
+	defer grpcSBIServer.Stop()
 
-	// start gRPC handler
-	grpcServer := grpcserver.NewGrpcServer(cf.GrpcAddress, nil, false, probe.GetProbeFromContext(ctx))
+	// In the case of a restart, let's wait until all the registered adapters are connected to the Core
+	// before starting the grpc server that handles NBI requests.
+	err = adapterMgr.WaitUntilConnectionsToAdaptersAreUp(ctx, cf.ConnectionRetryInterval)
+	if err != nil {
+		logger.Fatalw(ctx, "failure-connecting-to-adapters", log.Fields{"error": err})
+	}
+
+	// Create the NBI gRPC server
+	grpcNBIServer := grpcserver.NewGrpcServer(cf.GrpcNBIAddress, nil, false, probe.GetProbeFromContext(ctx))
 
 	//Register the 'Extension' service on this gRPC server
-	addGRPCExtensionService(ctx, grpcServer, device.GetNewExtensionManager(deviceMgr))
+	addGRPCExtensionService(ctx, grpcNBIServer, device.GetNewExtensionManager(deviceMgr))
 
-	go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
-	defer grpcServer.Stop()
+	go startGrpcNbiService(ctx, grpcNBIServer, grpcNBIService, api.NewAPIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
+	defer grpcNBIServer.Stop()
 
 	// wait for core to be stopped, via Stop() or context cancellation, before running deferred functions
 	<-ctx.Done()
@@ -196,23 +184,33 @@
 
 // Stop brings down core services
 func (core *Core) Stop() {
-	core.shutdown()
-	<-core.stopped
+	core.Shutdown()
+	<-core.Stopped
 }
 
-// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
-func startGRPCService(ctx context.Context, server *grpcserver.GrpcServer, handler voltha.VolthaServiceServer) {
-	logger.Info(ctx, "grpc-server-created")
+// startGrpcSbiService creates the grpc core service handlers, registers it to the grpc server and starts the server
+func startGrpcSbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler core.CoreServiceServer) {
+	logger.Infow(ctx, "starting-grpc-sbi-service", log.Fields{"service": serviceName})
+
+	server.AddService(func(server *grpc.Server) { core.RegisterCoreServiceServer(server, handler) })
+	logger.Infow(ctx, "grpc-sbi-service-added", log.Fields{"service": serviceName})
+
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+	logger.Infow(ctx, "grpc-sbi-server-started", log.Fields{"service": serviceName})
+	server.Start(ctx)
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
+}
+
+// startGrpcNbiService creates the grpc NBI service handlers, registers it to the grpc server and starts the server
+func startGrpcNbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler voltha.VolthaServiceServer) {
+	logger.Infow(ctx, "starting-grpc-nbi-service", log.Fields{"service": serviceName})
 
 	server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) })
-	logger.Info(ctx, "grpc-service-added")
+	logger.Infow(ctx, "grpc-nbi-service-added-and-started", log.Fields{"service": serviceName})
 
-	probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
-	logger.Info(ctx, "grpc-server-started")
 	// Note that there is a small window here in which the core could return its status as ready,
 	// when it really isn't.  This is unlikely to cause issues, as the delay is incredibly short.
 	server.Start(ctx)
-	probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
 }
 
 func addGRPCExtensionService(ctx context.Context, server *grpcserver.GrpcServer, handler extension.ExtensionServer) {
@@ -221,5 +219,4 @@
 	server.AddService(func(server *grpc.Server) {
 		extension.RegisterExtensionServer(server, handler)
 	})
-
 }