[VOL-4293] OpenONU Adapter update for gRPC migration
Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/cmd/openonu-adapter/main.go b/cmd/openonu-adapter/main.go
index c42cef4..ce5dc6f 100644
--- a/cmd/openonu-adapter/main.go
+++ b/cmd/openonu-adapter/main.go
@@ -28,38 +28,46 @@
"syscall"
"time"
- "github.com/opencord/voltha-lib-go/v5/pkg/adapters"
- "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
- com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
- conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
- "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v5/pkg/events"
- "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
- "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-lib-go/v5/pkg/probe"
- "github.com/opencord/voltha-lib-go/v5/pkg/version"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/golang/protobuf/ptypes/empty"
+ conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ "github.com/opencord/voltha-lib-go/v7/pkg/version"
+ "github.com/opencord/voltha-protos/v5/go/adapter_services"
+ "github.com/opencord/voltha-protos/v5/go/core"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/grpc"
+
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
"github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/onuadaptercore"
)
+const (
+ clusterMessagingService = "cluster-message-service"
+ onuAdapterService = "onu-adapter-service"
+ kvService = "kv-service"
+ coreService = "core-service"
+)
+
type adapter struct {
//defaultAppName string
- instanceID string
- config *config.AdapterFlags
- iAdapter adapters.IAdapter // from Voltha interface adapters
- kafkaClient kafka.Client
- kvClient kvstore.Client
- kip kafka.InterContainerProxy
- coreProxy adapterif.CoreProxy
- adapterProxy adapterif.AdapterProxy
- eventProxy eventif.EventProxy
- halted bool
- exitChannel chan int
- receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
+ instanceID string
+ config *config.AdapterFlags
+ kafkaClient kafka.Client
+ kvClient kvstore.Client
+ eventProxy eventif.EventProxy
+ grpcServer *vgrpc.GrpcServer
+ onuAdapter *ac.OpenONUAC
+ coreClient *vgrpc.Client
+ halted bool
+ exitChannel chan int
}
func newAdapter(cf *config.AdapterFlags) *adapter {
@@ -68,7 +76,6 @@
a.config = cf
a.halted = false
a.exitChannel = make(chan int, 1)
- a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
return &a
}
@@ -82,11 +89,10 @@
p = value.(*probe.Probe)
p.RegisterService(
ctx,
- "message-bus",
- "kv-store",
- "container-proxy",
- "core-request-handler",
- "register-with-core",
+ clusterMessagingService,
+ kvService,
+ onuAdapterService,
+ coreService,
)
}
}
@@ -97,60 +103,84 @@
logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
}
- if p != nil {
- p.UpdateStatus(ctx, "kv-store", probe.ServiceStatusRunning)
- }
-
// Setup Log Config
cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
go conf.StartLogLevelConfigProcessing(cm, ctx)
// Setup Kafka Client
- if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterAddress); err != nil {
+ if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
}
- if p != nil {
- p.UpdateStatus(ctx, "message-bus", probe.ServiceStatusRunning)
+ // Start kafka communication with the broker
+ if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
+ logger.Fatal(ctx, "unable-to-connect-to-kafka")
}
- // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
- if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
- logger.Fatalw(ctx, "error-starting-inter-container-proxy", log.Fields{"error": err})
- //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
- return err
+ // Wait until connection to KV store is established
+ if err := WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
+ logger.Fatal(ctx, "unable-to-connect-to-kv-store")
}
- // Create the core proxy to handle requests to the Core
- a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
-
- logger.Debugw(ctx, "create adapter proxy", log.Fields{"CoreTopic": a.config.CoreTopic})
- a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
-
// Create the event proxy to post events to KAFKA
a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
+ go func() {
+ if err := a.eventProxy.Start(); err != nil {
+ logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
+ }
+ }()
+
+ // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
+ // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
+ if a.coreClient, err = vgrpc.NewClient(a.config.CoreEndpoint,
+ a.coreRestarted,
+ vgrpc.ActivityCheck(true)); err != nil {
+ logger.Fatal(ctx, "grpc-client-not-created")
+ }
+ // Start the core grpc client
+ go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
// Create the open ONU interface adapter
- if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
- a.config, cm); err != nil {
+ if a.onuAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
}
- // Register the core request handler
- if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
- logger.Fatalw(ctx, "error-setting-core-request-handler", log.Fields{"error": err})
- }
+ // Create and start the grpc server
+ a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
+
+ //Register the adapter service
+ a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
+
+ //Register the onu inter adapter service
+ a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuAdapter)
+
+ go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
// Register this adapter to the Core - retries indefinitely
- if err = a.registerWithCore(ctx, -1); err != nil {
+ if err = a.registerWithCore(ctx, coreService, -1); err != nil {
logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
}
- // check the readiness and liveliness and update the probe status
+ // Start the readiness and liveliness check and update the probe status
a.checkServicesReadiness(ctx)
return err
}
+// TODO: Any action the adapter needs to do following a Core restart?
+func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
+ logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
+ return nil
+}
+
+// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
+func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+ svc := core.NewCoreServiceClient(conn)
+ if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+ return nil
+ }
+ return svc
+}
+
func (a *adapter) stop(ctx context.Context) {
// Stop leadership tracking
a.halted = true
@@ -168,9 +198,20 @@
a.kvClient.Close(ctx)
}
- if a.kip != nil {
- a.kip.Stop(ctx)
+ if a.eventProxy != nil {
+ a.eventProxy.Stop()
}
+
+ if a.kafkaClient != nil {
+ a.kafkaClient.Stop(ctx)
+ }
+
+ // Stop core client
+ if a.coreClient != nil {
+ a.coreClient.Stop(ctx)
+ }
+
+ // TODO: More cleanup
}
// #############################################
@@ -214,38 +255,10 @@
return nil
}
-func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
- logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"addr": a.config.KafkaAdapterAddress, "topic": a.config.Topic})
- var err error
- /* address config update acc. to [VOL-2736] */
- kip := kafka.NewInterContainerProxy(
- kafka.InterContainerAddress(a.config.KafkaAdapterAddress),
- kafka.MsgClient(a.kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
- count := 0
- for {
- if err = kip.Start(ctx); err != nil {
- logger.Warnw(ctx, "error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
- if retries == count {
- return nil, err
- }
- count++
- // Take a nap before retrying
- time.Sleep(2 * time.Second)
- } else {
- break
- }
- }
- probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
- logger.Info(ctx, "common-messaging-proxy-created")
- return kip, nil
-}
-
-func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
- cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy,
+func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
var err error
- sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg, cm)
+ sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
if err = sAcONU.Start(ctx); err != nil {
logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
@@ -256,20 +269,7 @@
return sAcONU, nil
}
-func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
- logger.Info(ctx, "setting-request-handler")
- requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
- if err := a.kip.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
- logger.Errorw(ctx, "request-handler-setup-failed", log.Fields{"error": err})
- return err
-
- }
- probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
- logger.Info(ctx, "request-handler-setup-done")
- return nil
-}
-
-func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
+func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
logger.Infow(ctx, "registering-with-core", log.Fields{
@@ -282,43 +282,78 @@
Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
Vendor: "VOLTHA OpenONUGo",
Version: version.VersionInfo.Version,
- Endpoint: a.config.Topic,
+ Endpoint: a.config.AdapterEndpoint,
Type: "brcm_openomci_onu",
CurrentReplica: int32(a.config.CurrentReplica),
TotalReplicas: int32(a.config.TotalReplicas),
}
types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
VendorIds: vendorIdsList,
- Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
+ AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
+ Adapter: "brcm_openomci_onu", // Deprecated attribute
AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
count := 0
for {
- if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
- logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"error": err})
+ gClient, err := a.coreClient.GetCoreServiceClient()
+ if gClient != nil {
+ if gClient != nil {
+ if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ic.AdapterRegistration{
+ Adapter: adapterDescription,
+ DTypes: deviceTypes}); err == nil {
+ break
+ }
+ }
+ logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
if retries == count {
return err
}
count++
- // Take a nap before retrying
+ // Take a power nap before retrying
time.Sleep(2 * time.Second)
- } else {
- break
+
}
}
- probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Info(ctx, "registered-with-core")
return nil
}
+// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
+func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
+ logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
+
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
+
+ server.Start(ctx)
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
+}
+
+func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.AdapterServiceServer) {
+ logger.Info(ctx, "adding-adapter-service")
+
+ server.AddService(func(gs *grpc.Server) {
+ adapter_services.RegisterAdapterServiceServer(gs, handler)
+ })
+}
+
+func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.OnuInterAdapterServiceServer) {
+ logger.Info(ctx, "adding-onu-inter-adapter-service")
+
+ server.AddService(func(gs *grpc.Server) {
+ adapter_services.RegisterOnuInterAdapterServiceServer(gs, handler)
+ })
+}
+
/**
This function checks the liveliness and readiness of the kakfa and kv-client services
and update the status in the probe.
*/
func (a *adapter) checkServicesReadiness(ctx context.Context) {
// checks the kafka readiness
- go a.checkKafkaReadiness(ctx)
+ go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
// checks the kv-store readiness
go a.checkKvStoreReadiness(ctx)
@@ -333,19 +368,19 @@
timeout := a.config.LiveProbeInterval / 2
kvStoreChannel := make(chan bool, 1)
- // Default false to check the liveliness.
- kvStoreChannel <- false
+ // Default true - we are here only after we already had a KV store connection
+ kvStoreChannel <- true
for {
timeoutTimer := time.NewTimer(timeout)
select {
case liveliness := <-kvStoreChannel:
if !liveliness {
// kv-store not reachable or down, updating the status to not ready state
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+ probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
timeout = a.config.NotLiveProbeInterval
} else {
// kv-store is reachable , updating the status to running state
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+ probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
timeout = a.config.LiveProbeInterval / 2
}
// Check if the timer has expired or not
@@ -364,48 +399,28 @@
}
}
-/**
-This function checks the liveliness and readiness of the kafka service
-and update the status in the probe.
-*/
-func (a *adapter) checkKafkaReadiness(ctx context.Context) {
- livelinessChannel := a.kafkaClient.EnableLivenessChannel(ctx, true)
- healthinessChannel := a.kafkaClient.EnableHealthinessChannel(ctx, true)
- timeout := a.config.LiveProbeInterval
+// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
+// context times out.
+func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
+ if kvClient == nil {
+ return errors.New("kvclient-is-nil")
+ }
for {
- timeoutTimer := time.NewTimer(timeout)
-
- select {
- case healthiness := <-healthinessChannel:
- if !healthiness {
- // logger.Fatal will call os.Exit(1) to terminate
- logger.Fatal(ctx, "Kafka service has become unhealthy")
- }
- case liveliness := <-livelinessChannel:
- if !liveliness {
- // kafka not reachable or down, updating the status to not ready state
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- timeout = a.config.NotLiveProbeInterval
- } else {
- // kafka is reachable , updating the status to running state
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
- timeout = a.config.LiveProbeInterval
- }
- // Check if the timer has expired or not
- if !timeoutTimer.Stop() {
- <-timeoutTimer.C
- }
- case <-timeoutTimer.C:
- logger.Info(ctx, "kafka-proxy-liveness-recheck")
- // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
- // the liveness probe may wait (and block) writing to our channel.
- err := a.kafkaClient.SendLiveness(ctx)
- if err != nil {
- // Catch possible error case if sending liveness after Sarama has been stopped.
- logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
+ if !kvClient.IsConnectionUp(ctx) {
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+ logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
+ select {
+ case <-time.After(connectionRetryInterval):
+ continue
+ case <-ctx.Done():
+ return ctx.Err()
}
}
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Info(ctx, "kv-connection-up")
+ break
}
+ return nil
}
// Adapter Utility methods ##### end #########
@@ -478,9 +493,10 @@
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- cf := config.NewAdapterFlags()
+ cf := &config.AdapterFlags{}
+ cf.ParseCommandArguments(os.Args[1:])
+
defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
- cf.ParseCommandArguments()
// Setup logging