blob: be2c534022ab44cdb0eaa95dd6f8b3529c57a41d [file] [log] [blame]
/*
* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package main invokes the application
package main
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"time"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
codes "google.golang.org/grpc/codes"
conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v7/pkg/events"
"github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-lib-go/v7/pkg/probe"
"github.com/opencord/voltha-lib-go/v7/pkg/version"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
"github.com/opencord/voltha-protos/v5/go/adapter_service"
ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
"github.com/opencord/voltha-protos/v5/go/core_service"
"github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
)
const (
clusterMessagingService = "cluster-message-service"
oltAdapterService = "olt-adapter-service"
kvService = "kv-service"
coreService = "core-service"
)
type adapter struct {
instanceID string
config *config.AdapterFlags
grpcServer *vgrpc.GrpcServer
oltAdapter *ac.OpenOLT
oltInterAdapter *ac.OpenOLTInterAdapter
kafkaClient kafka.Client
kvClient kvstore.Client
coreClient *vgrpc.Client
eventProxy eventif.EventProxy
halted bool
exitChannel chan int
}
func newAdapter(cf *config.AdapterFlags) *adapter {
var a adapter
a.instanceID = cf.InstanceID
a.config = cf
a.halted = false
a.exitChannel = make(chan int, 1)
return &a
}
func (a *adapter) start(ctx context.Context) {
logger.Info(ctx, "Starting Core Adapter components")
var err error
var p *probe.Probe
if value := ctx.Value(probe.ProbeContextKey); value != nil {
if _, ok := value.(*probe.Probe); ok {
p = value.(*probe.Probe)
p.RegisterService(
ctx,
clusterMessagingService,
kvService,
oltAdapterService,
coreService,
)
}
}
// Setup KV Client
logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
if err = a.setKVClient(ctx); err != nil {
logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
}
if p != nil {
p.UpdateStatus(ctx, kvService, probe.ServiceStatusRunning)
}
// Setup Log Config
cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
go conf.StartLogLevelConfigProcessing(cm, ctx)
go conf.StartLogFeaturesConfigProcessing(cm, ctx)
// Setup Kafka Client
if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
}
// Start kafka communication with the broker
if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
logger.Fatal(ctx, "unable-to-connect-to-kafka")
}
// Create the event proxy to post events to KAFKA
a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
go func() {
if err := a.eventProxy.Start(); err != nil {
logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
}
}()
// Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
// cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
if a.coreClient, err = vgrpc.NewClient(
a.config.AdapterEndpoint,
a.config.CoreEndpoint,
"core_service.CoreService",
a.coreRestarted); err != nil {
logger.Fatal(ctx, "grpc-client-not-created")
}
// Start the core grpc client
retryCodes := []codes.Code{
codes.Unavailable, // server is currently unavailable
codes.DeadlineExceeded, // deadline for the operation was exceeded
}
// the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
// Create the open OLT adapter
if a.oltAdapter, err = a.startOpenOLT(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
}
// Create the open OLT Inter adapter adapter
if a.oltInterAdapter, err = a.startOpenOLTInterAdapter(ctx, a.oltAdapter); err != nil {
logger.Fatalw(ctx, "error-starting-openolt-inter-adapter", log.Fields{"error": err})
}
// Create and start the grpc server
a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
//Register the adapter service
a.addAdapterService(ctx, a.grpcServer, a.oltAdapter)
//Register the olt inter-adapter service
a.addOltInterAdapterService(ctx, a.grpcServer, a.oltInterAdapter)
// Start the grpc server
go a.startGRPCService(ctx, a.grpcServer, oltAdapterService)
// Register this adapter to the Core - retries indefinitely
if err = a.registerWithCore(ctx, coreService, -1); err != nil {
logger.Fatal(ctx, "error-registering-with-core")
}
// check the readiness and liveliness and update the probe status
a.checkServicesReadiness(ctx)
}
// TODO: Any action the adapter needs to do following a Core restart?
func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
return nil
}
// getCoreServiceClientHandler is used to test whether the remote gRPC service is up
func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
if conn == nil {
return nil
}
return core_service.NewCoreServiceClient(conn)
}
/*
*
This function checks the liveliness and readiness of the kakfa and kv-client services
and update the status in the probe.
*/
func (a *adapter) checkServicesReadiness(ctx context.Context) {
// checks the kafka readiness
go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
// checks the kv-store readiness
go a.checkKvStoreReadiness(ctx)
}
/*
*
This function checks the liveliness and readiness of the kv-store service
and update the status in the probe.
*/
func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
// dividing the live probe interval by 2 to get updated status every 30s
timeout := a.config.LiveProbeInterval / 2
kvStoreChannel := make(chan bool, 1)
timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
cancelFunc()
for {
timeoutTimer := time.NewTimer(timeout)
select {
case liveliness := <-kvStoreChannel:
if !liveliness {
// kv-store not reachable or down, updating the status to not ready state
probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
timeout = a.config.NotLiveProbeInterval
} else {
// kv-store is reachable , updating the status to running state
probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
timeout = a.config.LiveProbeInterval / 2
}
// Check if the timer has expired or not
if !timeoutTimer.Stop() {
<-timeoutTimer.C
}
case <-timeoutTimer.C:
// Check the status of the kv-store. Use timeout of 2 seconds to avoid forever blocking
logger.Info(ctx, "kv-store liveliness-recheck")
timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
// Cleanup cancel func resources
cancelFunc()
}
}
}
func (a *adapter) stop(ctx context.Context) {
// Stop leadership tracking
a.halted = true
// send exit signal
a.exitChannel <- 0
// Stop all grpc processing
if err := a.oltAdapter.Stop(ctx); err != nil {
logger.Errorw(ctx, "failure-stopping-olt-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
}
if err := a.oltInterAdapter.Stop(ctx); err != nil {
logger.Errorw(ctx, "failure-stopping-olt-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
}
// Cleanup - applies only if we had a kvClient
if a.kvClient != nil {
// Release all reservations
if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
}
// Close the DB connection
go a.kvClient.Close(ctx)
}
if a.eventProxy != nil {
a.eventProxy.Stop()
}
if a.kafkaClient != nil {
a.kafkaClient.Stop(ctx)
}
// Stop core client
if a.coreClient != nil {
a.coreClient.Stop(ctx)
}
logger.Info(ctx, "main-stop-processing-complete")
// TODO: Stop child devices connections
// TODO: More cleanup
}
func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
switch storeType {
case "etcd":
return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
case "redis":
return kvstore.NewRedisClient(address, timeout, false)
case "redis-sentinel":
return kvstore.NewRedisClient(address, timeout, true)
}
return nil, errors.New("unsupported-kv-store")
}
func newKafkaClient(ctx context.Context, clientType, address string) (kafka.Client, error) {
logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
switch clientType {
case "sarama":
return kafka.NewSaramaClient(
kafka.Address(address),
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
kafka.ProducerMaxRetries(6),
kafka.ProducerRetryBackoff(time.Millisecond*30),
kafka.MetadatMaxRetries(15)), nil
}
return nil, errors.New("unsupported-client-type")
}
func (a *adapter) setKVClient(ctx context.Context) error {
client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
if err != nil {
a.kvClient = nil
return err
}
a.kvClient = client
return nil
}
// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
logger.Infow(ctx, "starting-grpc-service", log.Fields{"service": serviceName})
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Infow(ctx, "grpc-service-started", log.Fields{"service": serviceName})
server.Start(ctx)
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
}
func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
logger.Info(ctx, "adding-adapter-service")
server.AddService(func(gs *grpc.Server) {
adapter_service.RegisterAdapterServiceServer(gs, handler)
})
}
func (a *adapter) addOltInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler olt_inter_adapter_service.OltInterAdapterServiceServer) {
logger.Info(ctx, "adding-olt-inter-adapter-service")
server.AddService(func(gs *grpc.Server) {
olt_inter_adapter_service.RegisterOltInterAdapterServiceServer(gs, handler)
})
}
func (a *adapter) startOpenOLT(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenOLT, error) {
logger.Info(ctx, "starting-open-olt")
var err error
sOLT := ac.NewOpenOLT(ctx, cc, ep, cfg, cm)
if err = sOLT.Start(ctx); err != nil {
return nil, err
}
logger.Info(ctx, "open-olt-started")
return sOLT, nil
}
func (a *adapter) startOpenOLTInterAdapter(ctx context.Context, oo *ac.OpenOLT) (*ac.OpenOLTInterAdapter, error) {
logger.Info(ctx, "starting-open-olt-inter-adapter")
var err error
sOLTInterAdapter := ac.NewOpenOLTInterAdapter(oo)
if err = sOLTInterAdapter.Start(ctx); err != nil {
return nil, err
}
logger.Info(ctx, "open-olt-inter-adapter-started")
return sOLTInterAdapter, nil
}
func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
logger.Infow(ctx, "registering-with-core", log.Fields{
"adapterID": adapterID,
"currentReplica": a.config.CurrentReplica,
"totalReplicas": a.config.TotalReplicas,
})
adapterDescription := &voltha.Adapter{
Id: adapterID, // Unique name for the device type
Vendor: "VOLTHA OpenOLT",
Version: version.VersionInfo.Version,
// The Endpoint refers to the address this service is listening on.
Endpoint: a.config.AdapterEndpoint,
Type: "openolt",
CurrentReplica: int32(a.config.CurrentReplica),
TotalReplicas: int32(a.config.TotalReplicas),
}
types := []*voltha.DeviceType{{
Id: "openolt",
AdapterType: "openolt", // Type of the adapter that handles device type
Adapter: "openolt", // Deprecated attribute
AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
count := 0
for {
gClient, err := a.coreClient.GetCoreServiceClient()
if gClient != nil {
if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ca.AdapterRegistration{
Adapter: adapterDescription,
DTypes: deviceTypes}); err == nil {
break
}
}
logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
if retries == count {
return err
}
count++
// Take a nap before retrying
time.Sleep(2 * time.Second)
}
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Info(ctx, "registered-with-core")
return nil
}
func waitForExit(ctx context.Context) int {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
exitChannel := make(chan int)
go func() {
s := <-signalChannel
switch s {
case syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT:
logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
exitChannel <- 0
default:
logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
exitChannel <- 1
}
}()
code := <-exitChannel
return code
}
func printBanner() {
fmt.Println(` ____ ____ _ _______ `)
fmt.Println(` / _ \ / __ \| | |__ __|`)
fmt.Println(` | | | |_ __ ___ _ __ | | | | | | | `)
fmt.Println(` | | | | '_ \ / _ \ '_ \ | | | | | | | `)
fmt.Println(` | |__| | |_) | __/ | | || |__| | |____| | `)
fmt.Println(` \____/| .__/ \___|_| |_| \____/|______|_| `)
fmt.Println(` | | `)
fmt.Println(` |_| `)
fmt.Println(` `)
}
func printVersion() {
fmt.Println("VOLTHA OpenOLT Adapter")
fmt.Println(version.VersionInfo.String(" "))
}
func main() {
ctx := context.Background()
start := time.Now()
cf := config.NewAdapterFlags()
cf.ParseCommandArguments()
// Setup logging
logLevel, err := log.StringToLogLevel(cf.LogLevel)
if err != nil {
logger.Fatalf(ctx, "Cannot setup logging, %s", err)
}
// Setup default logger - applies for packages that do not have specific logger set
if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
}
// Update all loggers (provisionned via init) with a common field
if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
}
log.SetAllLogLevel(logLevel)
realMain()
defer func() {
err := log.CleanUp()
if err != nil {
logger.Errorw(context.Background(), "unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
}
}()
// Print version / build information and exit
if cf.DisplayVersionOnly {
printVersion()
return
}
// Print banner if specified
if cf.Banner {
printBanner()
}
logger.Infow(ctx, "config", log.Fields{"config": *cf})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ad := newAdapter(cf)
p := &probe.Probe{}
go p.ListenAndServe(ctx, ad.config.ProbeAddress)
probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
if err != nil {
logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
} else {
defer log.TerminateTracing(closer)
}
go ad.start(probeCtx)
code := waitForExit(ctx)
logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
// Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
ctxWithCancel, cancelFunc := context.WithCancel(ctx)
// Cleanup before leaving
ad.stop(ctxWithCancel)
// Will halt any long-running stop routine gracefully
cancelFunc()
elapsed := time.Since(start)
logger.Infow(ctx, "run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
}
// [EOF]