[VOL-4292] OpenOLT Adapter changes for gRPC migration

Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/cmd/openolt-adapter/main.go b/cmd/openolt-adapter/main.go
index b4fe6dd..148f3bd 100644
--- a/cmd/openolt-adapter/main.go
+++ b/cmd/openolt-adapter/main.go
@@ -26,36 +26,43 @@
 	"syscall"
 	"time"
 
-	"github.com/opencord/voltha-lib-go/v6/pkg/adapters"
-	"github.com/opencord/voltha-lib-go/v6/pkg/adapters/adapterif"
-	com "github.com/opencord/voltha-lib-go/v6/pkg/adapters/common"
-	conf "github.com/opencord/voltha-lib-go/v6/pkg/config"
-	"github.com/opencord/voltha-lib-go/v6/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v6/pkg/events"
-	"github.com/opencord/voltha-lib-go/v6/pkg/events/eventif"
-	"github.com/opencord/voltha-lib-go/v6/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v6/pkg/log"
-	"github.com/opencord/voltha-lib-go/v6/pkg/probe"
-	"github.com/opencord/voltha-lib-go/v6/pkg/version"
+	"github.com/golang/protobuf/ptypes/empty"
+	conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+	"github.com/opencord/voltha-lib-go/v7/pkg/version"
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
 	ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-protos/v5/go/adapter_services"
+	"github.com/opencord/voltha-protos/v5/go/core"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+	"google.golang.org/grpc"
+)
+
+const (
+	clusterMessagingService = "cluster-message-service"
+	oltAdapterService       = "olt-adapter-service"
+	kvService               = "kv-service"
+	coreService             = "core-service"
 )
 
 type adapter struct {
-	instanceID       string
-	config           *config.AdapterFlags
-	iAdapter         adapters.IAdapter
-	kafkaClient      kafka.Client
-	kvClient         kvstore.Client
-	kip              kafka.InterContainerProxy
-	coreProxy        adapterif.CoreProxy
-	adapterProxy     adapterif.AdapterProxy
-	eventProxy       eventif.EventProxy
-	halted           bool
-	exitChannel      chan int
-	receiverChannels []<-chan *ic.InterContainerMessage
+	instanceID  string
+	config      *config.AdapterFlags
+	grpcServer  *vgrpc.GrpcServer
+	oltAdapter  *ac.OpenOLT
+	kafkaClient kafka.Client
+	kvClient    kvstore.Client
+	coreClient  *vgrpc.Client
+	eventProxy  eventif.EventProxy
+	halted      bool
+	exitChannel chan int
 }
 
 func newAdapter(cf *config.AdapterFlags) *adapter {
@@ -64,7 +71,6 @@
 	a.config = cf
 	a.halted = false
 	a.exitChannel = make(chan int, 1)
-	a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
 	return &a
 }
 
@@ -78,11 +84,10 @@
 			p = value.(*probe.Probe)
 			p.RegisterService(
 				ctx,
-				"message-bus",
-				"kv-store",
-				"container-proxy",
-				"core-request-handler",
-				"register-with-core",
+				clusterMessagingService,
+				kvService,
+				oltAdapterService,
+				coreService,
 			)
 		}
 	}
@@ -94,7 +99,7 @@
 	}
 
 	if p != nil {
-		p.UpdateStatus(ctx, "kv-store", probe.ServiceStatusRunning)
+		p.UpdateStatus(ctx, kvService, probe.ServiceStatusRunning)
 	}
 
 	// Setup Log Config
@@ -104,42 +109,52 @@
 	go conf.StartLogFeaturesConfigProcessing(cm, ctx)
 
 	// Setup Kafka Client
-	if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterAddress); err != nil {
+	if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
 		logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
 	}
 
-	if p != nil {
-		p.UpdateStatus(ctx, "message-bus", probe.ServiceStatusRunning)
+	// Start kafka communication with the broker
+	if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
+		logger.Fatal(ctx, "unable-to-connect-to-kafka")
 	}
 
-	// setup endpointManager
-
-	// Start the common InterContainer Proxy - retries indefinitely
-	if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
-		logger.Fatal(ctx, "error-starting-inter-container-proxy")
-	}
-
-	// Create the core proxy to handle requests to the Core
-	a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
-
-	// Create the adaptor proxy to handle request between olt and onu
-	a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
-
 	// Create the event proxy to post events to KAFKA
 	a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
+	go func() {
+		if err := a.eventProxy.Start(); err != nil {
+			logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
+		}
+	}()
+
+	// Create the Core client to handle requests to the Core.  Note that the coreClient is an interface and needs to be
+	// cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
+	if a.coreClient, err = vgrpc.NewClient(a.config.CoreEndpoint,
+		a.coreRestarted,
+		vgrpc.ActivityCheck(true)); err != nil {
+		logger.Fatal(ctx, "grpc-client-not-created")
+	}
+	// Start the core grpc client
+	go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
 
 	// Create the open OLT adapter
-	if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy, a.config, cm); err != nil {
+	if a.oltAdapter, err = a.startOpenOLT(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
 		logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
 	}
 
-	// Register the core request handler
-	if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
-		logger.Fatalw(ctx, "error-setting-core-request-handler", log.Fields{"error": err})
-	}
+	// Create and start the grpc server
+	a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
+
+	//Register the  adapter  service
+	a.addAdapterService(ctx, a.grpcServer, a.oltAdapter)
+
+	//Register the olt inter-adapter  service
+	a.addOltInterAdapterService(ctx, a.grpcServer, a.oltAdapter)
+
+	// Start the grpc server
+	go a.startGRPCService(ctx, a.grpcServer, oltAdapterService)
 
 	// Register this adapter to the Core - retries indefinitely
-	if err = a.registerWithCore(ctx, -1); err != nil {
+	if err = a.registerWithCore(ctx, coreService, -1); err != nil {
 		logger.Fatal(ctx, "error-registering-with-core")
 	}
 
@@ -147,13 +162,28 @@
 	a.checkServicesReadiness(ctx)
 }
 
+// TODO:  Any action the adapter needs to do following a Core restart?
+func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
+	logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
+	return nil
+}
+
+// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
+func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+	svc := core.NewCoreServiceClient(conn)
+	if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+		return nil
+	}
+	return svc
+}
+
 /**
 This function checks the liveliness and readiness of the kakfa and kv-client services
 and update the status in the probe.
 */
 func (a *adapter) checkServicesReadiness(ctx context.Context) {
 	// checks the kafka readiness
-	go a.checkKafkaReadiness(ctx)
+	go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
 
 	// checks the kv-store readiness
 	go a.checkKvStoreReadiness(ctx)
@@ -176,11 +206,11 @@
 		case liveliness := <-kvStoreChannel:
 			if !liveliness {
 				// kv-store not reachable or down, updating the status to not ready state
-				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+				probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
 				timeout = a.config.NotLiveProbeInterval
 			} else {
 				// kv-store is reachable , updating the status to running state
-				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+				probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
 				timeout = a.config.LiveProbeInterval / 2
 			}
 			// Check if the timer has expired or not
@@ -199,61 +229,6 @@
 	}
 }
 
-/**
-This function checks the liveliness and readiness of the kafka service
-and update the status in the probe.
-*/
-func (a *adapter) checkKafkaReadiness(ctx context.Context) {
-	livelinessChannel := a.kafkaClient.EnableLivenessChannel(ctx, true)
-	healthinessChannel := a.kafkaClient.EnableHealthinessChannel(ctx, true)
-	timeout := a.config.LiveProbeInterval
-	failed := false
-	for {
-		timeoutTimer := time.NewTimer(timeout)
-
-		select {
-		case healthiness := <-healthinessChannel:
-			if !healthiness {
-				// This will eventually cause K8s to restart the container, and will do
-				// so in a way that allows cleanup to continue, rather than an immediate
-				// panic and exit here.
-				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusFailed)
-				failed = true
-			}
-			// Check if the timer has expired or not
-			if !timeoutTimer.Stop() {
-				<-timeoutTimer.C
-			}
-		case liveliness := <-livelinessChannel:
-			if failed {
-				// Failures of the message bus are permanent and can't ever be recovered from,
-				// so make sure we never inadvertently reset a failed state back to unready.
-			} else if !liveliness {
-				// kafka not reachable or down, updating the status to not ready state
-				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-				timeout = a.config.NotLiveProbeInterval
-			} else {
-				// kafka is reachable , updating the status to running state
-				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
-				timeout = a.config.LiveProbeInterval
-			}
-			// Check if the timer has expired or not
-			if !timeoutTimer.Stop() {
-				<-timeoutTimer.C
-			}
-		case <-timeoutTimer.C:
-			logger.Info(ctx, "kafka-proxy-liveness-recheck")
-			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
-			// the liveness probe may wait (and block) writing to our channel.
-			err := a.kafkaClient.SendLiveness(ctx)
-			if err != nil {
-				// Catch possible error case if sending liveness after Sarama has been stopped.
-				logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
-			}
-		}
-	}
-}
-
 func (a *adapter) stop(ctx context.Context) {
 	// Stop leadership tracking
 	a.halted = true
@@ -271,10 +246,21 @@
 		a.kvClient.Close(ctx)
 	}
 
-	if a.kip != nil {
-		a.kip.Stop(ctx)
+	if a.eventProxy != nil {
+		a.eventProxy.Stop()
 	}
 
+	if a.kafkaClient != nil {
+		a.kafkaClient.Stop(ctx)
+	}
+
+	// Stop core client
+	if a.coreClient != nil {
+		a.coreClient.Stop(ctx)
+	}
+
+	// TODO: Stop child devices connections
+
 	// TODO:  More cleanup
 }
 
@@ -316,39 +302,38 @@
 	return nil
 }
 
-func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
-	logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"address": a.config.KafkaAdapterAddress,
-		"topic": a.config.Topic})
-	var err error
-	kip := kafka.NewInterContainerProxy(
-		kafka.InterContainerAddress(a.config.KafkaAdapterAddress),
-		kafka.MsgClient(a.kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
-	count := 0
-	for {
-		if err = kip.Start(ctx); err != nil {
-			logger.Warnw(ctx, "error-starting-messaging-proxy", log.Fields{"error": err})
-			if retries == count {
-				return nil, err
-			}
-			count = +1
-			// Take a nap before retrying
-			time.Sleep(2 * time.Second)
-		} else {
-			break
-		}
-	}
-	probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
-	logger.Info(ctx, "common-messaging-proxy-created")
-	return kip, nil
+// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
+func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
+	logger.Infow(ctx, "starting-grpc-service", log.Fields{"service": serviceName})
+
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+	logger.Infow(ctx, "grpc-service-started", log.Fields{"service": serviceName})
+
+	server.Start(ctx)
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
 }
 
-func (a *adapter) startOpenOLT(ctx context.Context, kip kafka.InterContainerProxy,
-	cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy,
+func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.AdapterServiceServer) {
+	logger.Info(ctx, "adding-adapter-service")
+
+	server.AddService(func(gs *grpc.Server) {
+		adapter_services.RegisterAdapterServiceServer(gs, handler)
+	})
+}
+
+func (a *adapter) addOltInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.OltInterAdapterServiceServer) {
+	logger.Info(ctx, "adding-olt-inter-adapter-service")
+
+	server.AddService(func(gs *grpc.Server) {
+		adapter_services.RegisterOltInterAdapterServiceServer(gs, handler)
+	})
+}
+
+func (a *adapter) startOpenOLT(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
 	cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenOLT, error) {
 	logger.Info(ctx, "starting-open-olt")
 	var err error
-	sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, ep, cfg, cm)
+	sOLT := ac.NewOpenOLT(ctx, cc, ep, cfg, cm)
 
 	if err = sOLT.Start(ctx); err != nil {
 		return nil, err
@@ -358,19 +343,7 @@
 	return sOLT, nil
 }
 
-func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
-	logger.Info(ctx, "setting-request-handler")
-	requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
-	if err := a.kip.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
-		return err
-
-	}
-	probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
-	logger.Info(ctx, "request-handler-setup-done")
-	return nil
-}
-
-func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
+func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
 	adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
 	logger.Infow(ctx, "registering-with-core", log.Fields{
 		"adapterID":      adapterID,
@@ -381,34 +354,38 @@
 		Id:      adapterID, // Unique name for the device type
 		Vendor:  "VOLTHA OpenOLT",
 		Version: version.VersionInfo.Version,
-		// TODO once we'll be ready to support multiple versions of the OpenOLT adapter
-		// the Endpoint will have to change to `openolt_<currentReplica`>
-		Endpoint:       a.config.Topic,
+		// The Endpoint refers to the address this service is listening on.
+		Endpoint:       a.config.AdapterEndpoint,
 		Type:           "openolt",
 		CurrentReplica: int32(a.config.CurrentReplica),
 		TotalReplicas:  int32(a.config.TotalReplicas),
 	}
 	types := []*voltha.DeviceType{{
 		Id:                          "openolt",
-		Adapter:                     "openolt", // Type of the adapter that handles device type
+		AdapterType:                 "openolt", // Type of the adapter that handles device type
+		Adapter:                     "openolt", // Deprecated attribute
 		AcceptsBulkFlowUpdate:       false,     // Currently openolt adapter does not support bulk flow handling
 		AcceptsAddRemoveFlowUpdates: true}}
 	deviceTypes := &voltha.DeviceTypes{Items: types}
 	count := 0
 	for {
-		if err := a.coreProxy.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), adapterDescription, deviceTypes); err != nil {
-			logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"error": err})
-			if retries == count {
-				return err
+		gClient, err := a.coreClient.GetCoreServiceClient()
+		if gClient != nil {
+			if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ic.AdapterRegistration{
+				Adapter: adapterDescription,
+				DTypes:  deviceTypes}); err == nil {
+				break
 			}
-			count++
-			// Take a nap before retrying
-			time.Sleep(2 * time.Second)
-		} else {
-			break
 		}
+		logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
+		if retries == count {
+			return err
+		}
+		count++
+		// Take a nap before retrying
+		time.Sleep(2 * time.Second)
 	}
-	probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
 	logger.Info(ctx, "registered-with-core")
 	return nil
 }