[VOL-4293] OpenONU Adapter update for gRPC migration

Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/cmd/openonu-adapter/main.go b/cmd/openonu-adapter/main.go
index c42cef4..ce5dc6f 100644
--- a/cmd/openonu-adapter/main.go
+++ b/cmd/openonu-adapter/main.go
@@ -28,38 +28,46 @@
 	"syscall"
 	"time"
 
-	"github.com/opencord/voltha-lib-go/v5/pkg/adapters"
-	"github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
-	com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
-	conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
-	"github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v5/pkg/events"
-	"github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-lib-go/v5/pkg/probe"
-	"github.com/opencord/voltha-lib-go/v5/pkg/version"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/golang/protobuf/ptypes/empty"
+	conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+	"github.com/opencord/voltha-lib-go/v7/pkg/version"
+	"github.com/opencord/voltha-protos/v5/go/adapter_services"
+	"github.com/opencord/voltha-protos/v5/go/core"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+	"google.golang.org/grpc"
+
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
 
 	"github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
 	ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/onuadaptercore"
 )
 
+const (
+	clusterMessagingService = "cluster-message-service"
+	onuAdapterService       = "onu-adapter-service"
+	kvService               = "kv-service"
+	coreService             = "core-service"
+)
+
 type adapter struct {
 	//defaultAppName   string
-	instanceID       string
-	config           *config.AdapterFlags
-	iAdapter         adapters.IAdapter // from Voltha interface adapters
-	kafkaClient      kafka.Client
-	kvClient         kvstore.Client
-	kip              kafka.InterContainerProxy
-	coreProxy        adapterif.CoreProxy
-	adapterProxy     adapterif.AdapterProxy
-	eventProxy       eventif.EventProxy
-	halted           bool
-	exitChannel      chan int
-	receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
+	instanceID  string
+	config      *config.AdapterFlags
+	kafkaClient kafka.Client
+	kvClient    kvstore.Client
+	eventProxy  eventif.EventProxy
+	grpcServer  *vgrpc.GrpcServer
+	onuAdapter  *ac.OpenONUAC
+	coreClient  *vgrpc.Client
+	halted      bool
+	exitChannel chan int
 }
 
 func newAdapter(cf *config.AdapterFlags) *adapter {
@@ -68,7 +76,6 @@
 	a.config = cf
 	a.halted = false
 	a.exitChannel = make(chan int, 1)
-	a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
 	return &a
 }
 
@@ -82,11 +89,10 @@
 			p = value.(*probe.Probe)
 			p.RegisterService(
 				ctx,
-				"message-bus",
-				"kv-store",
-				"container-proxy",
-				"core-request-handler",
-				"register-with-core",
+				clusterMessagingService,
+				kvService,
+				onuAdapterService,
+				coreService,
 			)
 		}
 	}
@@ -97,60 +103,84 @@
 		logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
 	}
 
-	if p != nil {
-		p.UpdateStatus(ctx, "kv-store", probe.ServiceStatusRunning)
-	}
-
 	// Setup Log Config
 	cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
 	go conf.StartLogLevelConfigProcessing(cm, ctx)
 
 	// Setup Kafka Client
-	if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterAddress); err != nil {
+	if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
 		logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
 	}
 
-	if p != nil {
-		p.UpdateStatus(ctx, "message-bus", probe.ServiceStatusRunning)
+	// Start kafka communication with the broker
+	if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
+		logger.Fatal(ctx, "unable-to-connect-to-kafka")
 	}
 
-	// Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
-	if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
-		logger.Fatalw(ctx, "error-starting-inter-container-proxy", log.Fields{"error": err})
-		//aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
-		return err
+	// Wait until connection to KV store is established
+	if err := WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
+		logger.Fatal(ctx, "unable-to-connect-to-kv-store")
 	}
 
-	// Create the core proxy to handle requests to the Core
-	a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
-
-	logger.Debugw(ctx, "create adapter proxy", log.Fields{"CoreTopic": a.config.CoreTopic})
-	a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
-
 	// Create the event proxy to post events to KAFKA
 	a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
+	go func() {
+		if err := a.eventProxy.Start(); err != nil {
+			logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
+		}
+	}()
+
+	// Create the Core client to handle requests to the Core.  Note that the coreClient is an interface and needs to be
+	// cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
+	if a.coreClient, err = vgrpc.NewClient(a.config.CoreEndpoint,
+		a.coreRestarted,
+		vgrpc.ActivityCheck(true)); err != nil {
+		logger.Fatal(ctx, "grpc-client-not-created")
+	}
+	// Start the core grpc client
+	go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
 
 	// Create the open ONU interface adapter
-	if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
-		a.config, cm); err != nil {
+	if a.onuAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
 		logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
 	}
 
-	// Register the core request handler
-	if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
-		logger.Fatalw(ctx, "error-setting-core-request-handler", log.Fields{"error": err})
-	}
+	// Create and start the grpc server
+	a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
+
+	//Register the  adapter  service
+	a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
+
+	//Register the onu inter adapter  service
+	a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuAdapter)
+
+	go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
 
 	// Register this adapter to the Core - retries indefinitely
-	if err = a.registerWithCore(ctx, -1); err != nil {
+	if err = a.registerWithCore(ctx, coreService, -1); err != nil {
 		logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
 	}
 
-	// check the readiness and liveliness and update the probe status
+	// Start the readiness and liveliness check and update the probe status
 	a.checkServicesReadiness(ctx)
 	return err
 }
 
+// TODO:  Any action the adapter needs to do following a Core restart?
+func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
+	logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
+	return nil
+}
+
+// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
+func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+	svc := core.NewCoreServiceClient(conn)
+	if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+		return nil
+	}
+	return svc
+}
+
 func (a *adapter) stop(ctx context.Context) {
 	// Stop leadership tracking
 	a.halted = true
@@ -168,9 +198,20 @@
 		a.kvClient.Close(ctx)
 	}
 
-	if a.kip != nil {
-		a.kip.Stop(ctx)
+	if a.eventProxy != nil {
+		a.eventProxy.Stop()
 	}
+
+	if a.kafkaClient != nil {
+		a.kafkaClient.Stop(ctx)
+	}
+
+	// Stop core client
+	if a.coreClient != nil {
+		a.coreClient.Stop(ctx)
+	}
+
+	// TODO:  More cleanup
 }
 
 // #############################################
@@ -214,38 +255,10 @@
 	return nil
 }
 
-func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
-	logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"addr": a.config.KafkaAdapterAddress, "topic": a.config.Topic})
-	var err error
-	/* address config update acc. to [VOL-2736] */
-	kip := kafka.NewInterContainerProxy(
-		kafka.InterContainerAddress(a.config.KafkaAdapterAddress),
-		kafka.MsgClient(a.kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
-	count := 0
-	for {
-		if err = kip.Start(ctx); err != nil {
-			logger.Warnw(ctx, "error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
-			if retries == count {
-				return nil, err
-			}
-			count++
-			// Take a nap before retrying
-			time.Sleep(2 * time.Second)
-		} else {
-			break
-		}
-	}
-	probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
-	logger.Info(ctx, "common-messaging-proxy-created")
-	return kip, nil
-}
-
-func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
-	cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy,
+func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
 	cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
 	var err error
-	sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg, cm)
+	sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
 
 	if err = sAcONU.Start(ctx); err != nil {
 		logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
@@ -256,20 +269,7 @@
 	return sAcONU, nil
 }
 
-func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
-	logger.Info(ctx, "setting-request-handler")
-	requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
-	if err := a.kip.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
-		logger.Errorw(ctx, "request-handler-setup-failed", log.Fields{"error": err})
-		return err
-
-	}
-	probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
-	logger.Info(ctx, "request-handler-setup-done")
-	return nil
-}
-
-func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
+func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
 	adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
 	vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
 	logger.Infow(ctx, "registering-with-core", log.Fields{
@@ -282,43 +282,78 @@
 		Id:             adapterID, // Unique name for the device type ->exact type required for OLT comm????
 		Vendor:         "VOLTHA OpenONUGo",
 		Version:        version.VersionInfo.Version,
-		Endpoint:       a.config.Topic,
+		Endpoint:       a.config.AdapterEndpoint,
 		Type:           "brcm_openomci_onu",
 		CurrentReplica: int32(a.config.CurrentReplica),
 		TotalReplicas:  int32(a.config.TotalReplicas),
 	}
 	types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
 		VendorIds:                   vendorIdsList,
-		Adapter:                     "brcm_openomci_onu", // Name of the adapter that handles device type
+		AdapterType:                 "brcm_openomci_onu", // Type of adapter that handles this device type
+		Adapter:                     "brcm_openomci_onu", // Deprecated attribute
 		AcceptsBulkFlowUpdate:       false,               // Currently openolt adapter does not support bulk flow handling
 		AcceptsAddRemoveFlowUpdates: true}}
 	deviceTypes := &voltha.DeviceTypes{Items: types}
 	count := 0
 	for {
-		if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
-			logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"error": err})
+		gClient, err := a.coreClient.GetCoreServiceClient()
+		if gClient != nil {
+			if gClient != nil {
+				if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ic.AdapterRegistration{
+					Adapter: adapterDescription,
+					DTypes:  deviceTypes}); err == nil {
+					break
+				}
+			}
+			logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
 			if retries == count {
 				return err
 			}
 			count++
-			// Take a nap before retrying
+			// Take a power nap before retrying
 			time.Sleep(2 * time.Second)
-		} else {
-			break
+
 		}
 	}
-	probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
 	logger.Info(ctx, "registered-with-core")
 	return nil
 }
 
+// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
+func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
+	logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
+
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+	logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
+
+	server.Start(ctx)
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
+}
+
+func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.AdapterServiceServer) {
+	logger.Info(ctx, "adding-adapter-service")
+
+	server.AddService(func(gs *grpc.Server) {
+		adapter_services.RegisterAdapterServiceServer(gs, handler)
+	})
+}
+
+func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.OnuInterAdapterServiceServer) {
+	logger.Info(ctx, "adding-onu-inter-adapter-service")
+
+	server.AddService(func(gs *grpc.Server) {
+		adapter_services.RegisterOnuInterAdapterServiceServer(gs, handler)
+	})
+}
+
 /**
 This function checks the liveliness and readiness of the kakfa and kv-client services
 and update the status in the probe.
 */
 func (a *adapter) checkServicesReadiness(ctx context.Context) {
 	// checks the kafka readiness
-	go a.checkKafkaReadiness(ctx)
+	go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
 
 	// checks the kv-store readiness
 	go a.checkKvStoreReadiness(ctx)
@@ -333,19 +368,19 @@
 	timeout := a.config.LiveProbeInterval / 2
 	kvStoreChannel := make(chan bool, 1)
 
-	// Default false to check the liveliness.
-	kvStoreChannel <- false
+	// Default true - we are here only after we already had a KV store connection
+	kvStoreChannel <- true
 	for {
 		timeoutTimer := time.NewTimer(timeout)
 		select {
 		case liveliness := <-kvStoreChannel:
 			if !liveliness {
 				// kv-store not reachable or down, updating the status to not ready state
-				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+				probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
 				timeout = a.config.NotLiveProbeInterval
 			} else {
 				// kv-store is reachable , updating the status to running state
-				probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+				probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
 				timeout = a.config.LiveProbeInterval / 2
 			}
 			// Check if the timer has expired or not
@@ -364,48 +399,28 @@
 	}
 }
 
-/**
-This function checks the liveliness and readiness of the kafka service
-and update the status in the probe.
-*/
-func (a *adapter) checkKafkaReadiness(ctx context.Context) {
-	livelinessChannel := a.kafkaClient.EnableLivenessChannel(ctx, true)
-	healthinessChannel := a.kafkaClient.EnableHealthinessChannel(ctx, true)
-	timeout := a.config.LiveProbeInterval
+// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
+// context times out.
+func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
+	if kvClient == nil {
+		return errors.New("kvclient-is-nil")
+	}
 	for {
-		timeoutTimer := time.NewTimer(timeout)
-
-		select {
-		case healthiness := <-healthinessChannel:
-			if !healthiness {
-				// logger.Fatal will call os.Exit(1) to terminate
-				logger.Fatal(ctx, "Kafka service has become unhealthy")
-			}
-		case liveliness := <-livelinessChannel:
-			if !liveliness {
-				// kafka not reachable or down, updating the status to not ready state
-				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-				timeout = a.config.NotLiveProbeInterval
-			} else {
-				// kafka is reachable , updating the status to running state
-				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
-				timeout = a.config.LiveProbeInterval
-			}
-			// Check if the timer has expired or not
-			if !timeoutTimer.Stop() {
-				<-timeoutTimer.C
-			}
-		case <-timeoutTimer.C:
-			logger.Info(ctx, "kafka-proxy-liveness-recheck")
-			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
-			// the liveness probe may wait (and block) writing to our channel.
-			err := a.kafkaClient.SendLiveness(ctx)
-			if err != nil {
-				// Catch possible error case if sending liveness after Sarama has been stopped.
-				logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
+		if !kvClient.IsConnectionUp(ctx) {
+			probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+			logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
+			select {
+			case <-time.After(connectionRetryInterval):
+				continue
+			case <-ctx.Done():
+				return ctx.Err()
 			}
 		}
+		probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+		logger.Info(ctx, "kv-connection-up")
+		break
 	}
+	return nil
 }
 
 // Adapter Utility methods ##### end   #########
@@ -478,9 +493,10 @@
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	cf := config.NewAdapterFlags()
+	cf := &config.AdapterFlags{}
+	cf.ParseCommandArguments(os.Args[1:])
+
 	defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
-	cf.ParseCommandArguments()
 
 	// Setup logging