blob: 8aa6c1cef2ab7f2cb6595b96b1468f4ed0534c1e [file] [log] [blame]
/*
* Copyright 2020-2023 Open Networking Foundation (ONF) and the ONF Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//Package main -> this is the entry point of the OpenOnuAdapter
package main
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"os/signal"
"strings"
"syscall"
"time"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v7/pkg/events"
"github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-lib-go/v7/pkg/probe"
"github.com/opencord/voltha-lib-go/v7/pkg/version"
"github.com/opencord/voltha-protos/v5/go/adapter_service"
"github.com/opencord/voltha-protos/v5/go/core_service"
"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
"github.com/opencord/voltha-protos/v5/go/core_adapter"
"github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/core"
)
const (
clusterMessagingService = "cluster-message-service"
onuAdapterService = "onu-adapter-service"
kvService = "kv-service"
coreService = "core-service"
)
type adapter struct {
//defaultAppName string
instanceID string
config *config.AdapterFlags
kafkaClient kafka.Client
kvClient kvstore.Client
eventProxy eventif.EventProxy
grpcServer *vgrpc.GrpcServer
onuAdapter *ac.OpenONUAC
onuInterAdapter *ac.OpenONUACInterAdapter
coreClient *vgrpc.Client
}
func newAdapter(cf *config.AdapterFlags) *adapter {
var a adapter
a.instanceID = cf.InstanceID
a.config = cf
return &a
}
func (a *adapter) start(ctx context.Context) error {
logger.Info(ctx, "Starting Core Adapter components")
var err error
var p *probe.Probe
if value := ctx.Value(probe.ProbeContextKey); value != nil {
if _, ok := value.(*probe.Probe); ok {
p = value.(*probe.Probe)
p.RegisterService(
ctx,
clusterMessagingService,
kvService,
onuAdapterService,
coreService,
)
}
}
// Setup KV Client
logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
if err = a.setKVClient(ctx); err != nil {
logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
}
// Setup Log Config
cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
go conf.StartLogLevelConfigProcessing(cm, ctx)
// Setup Kafka Client
if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
}
// Start kafka communication with the broker
if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
logger.Fatal(ctx, "unable-to-connect-to-kafka")
}
// Wait until connection to KV store is established
if err := WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
logger.Fatal(ctx, "unable-to-connect-to-kv-store")
}
// Create the event proxy to post events to KAFKA
a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
go func() {
if err := a.eventProxy.Start(); err != nil {
logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
}
}()
// Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
// cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
if a.coreClient, err = vgrpc.NewClient(
a.config.AdapterEndpoint,
a.config.CoreEndpoint,
"core_service.CoreService",
a.coreRestarted); err != nil {
logger.Fatal(ctx, "grpc-client-not-created")
}
// the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
retryCodes := []codes.Code{
codes.Unavailable, // server is currently unavailable
codes.DeadlineExceeded, // deadline for the operation was exceeded
}
grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
// Start the core grpc client
go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
// Create the open ONU interface adapter
if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
logger.Fatalw(ctx, "error-starting-startONUAdapter", log.Fields{"error": err})
}
// Create the open ONU Inter adapter
if a.onuInterAdapter, err = a.startONUInterAdapter(ctx, a.onuAdapter); err != nil {
logger.Fatalw(ctx, "error-starting-startONUInterAdapter", log.Fields{"error": err})
}
// Create and start the grpc server
a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
//Register the adapter service
a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
//Register the onu inter adapter service
a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuInterAdapter)
go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
// Register this adapter to the Core - retries indefinitely
if err = a.registerWithCore(ctx, coreService, -1); err != nil {
logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
}
// Start the readiness and liveliness check and update the probe status
a.checkServicesReadiness(ctx)
return err
}
// TODO: Any action the adapter needs to do following a Core restart?
func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
return nil
}
// getCoreServiceClientHandler is used to setup the remote gRPC service
func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
if conn == nil {
return nil
}
return core_service.NewCoreServiceClient(conn)
}
func (a *adapter) stop(ctx context.Context) {
// Cleanup the grpc services first
if err := a.onuAdapter.Stop(ctx); err != nil {
logger.Errorw(ctx, "failure-stopping-onu-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
}
if err := a.onuInterAdapter.Stop(ctx); err != nil {
logger.Errorw(ctx, "failure-stopping-onu-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
}
// Cleanup - applies only if we had a kvClient
if a.kvClient != nil {
// Release all reservations
if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
}
// Close the DB connection
go a.kvClient.Close(ctx)
}
if a.eventProxy != nil {
a.eventProxy.Stop()
}
if a.kafkaClient != nil {
a.kafkaClient.Stop(ctx)
}
// Stop core client
if a.coreClient != nil {
a.coreClient.Stop(ctx)
}
// TODO: More cleanup
}
// #############################################
// Adapter Utility methods ##### begin #########
func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
switch storeType {
case "etcd":
return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
}
return nil, errors.New("unsupported-kv-store")
}
func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
switch clientType {
case "sarama":
return kafka.NewSaramaClient(
kafka.Address(addr),
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
kafka.ProducerMaxRetries(6),
kafka.ProducerRetryBackoff(time.Millisecond*30),
kafka.MetadatMaxRetries(15)), nil
}
return nil, errors.New("unsupported-client-type")
}
func (a *adapter) setKVClient(ctx context.Context) error {
client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
if err != nil {
a.kvClient = nil
logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
return err
}
a.kvClient = client
return nil
}
func (a *adapter) startONUAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
var err error
sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
if err = sAcONU.Start(ctx); err != nil {
logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
return nil, err
}
logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
return sAcONU, nil
}
func (a *adapter) startONUInterAdapter(ctx context.Context, onuA *ac.OpenONUAC) (*ac.OpenONUACInterAdapter, error) {
var err error
sAcONUInterAdapter := ac.NewOpenONUACAdapter(ctx, onuA)
if err = sAcONUInterAdapter.Start(ctx); err != nil {
logger.Fatalw(ctx, "error-starting-OpenONUACInterAdapter", log.Fields{"error": err})
return nil, err
}
logger.Info(ctx, "OpenONUACInterAdapter-started")
return sAcONUInterAdapter, nil
}
func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
logger.Infow(ctx, "registering-with-core", log.Fields{
"adapterID": adapterID,
"currentReplica": a.config.CurrentReplica,
"totalReplicas": a.config.TotalReplicas,
"onuVendorIds": vendorIdsList,
})
adapterDescription := &voltha.Adapter{
Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
Vendor: "VOLTHA OpenONUGo",
Version: version.VersionInfo.Version,
Endpoint: a.config.AdapterEndpoint,
Type: "brcm_openomci_onu",
CurrentReplica: int32(a.config.CurrentReplica),
TotalReplicas: int32(a.config.TotalReplicas),
}
types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
VendorIds: vendorIdsList,
AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
Adapter: "brcm_openomci_onu", // Deprecated attribute
AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
count := 0
for {
gClient, err := a.coreClient.GetCoreServiceClient()
if gClient != nil {
if gClient != nil {
if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &core_adapter.AdapterRegistration{
Adapter: adapterDescription,
DTypes: deviceTypes}); err == nil {
break
}
}
logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
if retries == count {
return err
}
count++
// Take a power nap before retrying
time.Sleep(2 * time.Second)
}
}
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Info(ctx, "registered-with-core")
return nil
}
// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
server.Start(ctx)
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
}
func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
logger.Info(ctx, "adding-adapter-service")
server.AddService(func(gs *grpc.Server) {
adapter_service.RegisterAdapterServiceServer(gs, handler)
})
}
func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler onu_inter_adapter_service.OnuInterAdapterServiceServer) {
logger.Info(ctx, "adding-onu-inter-adapter-service")
server.AddService(func(gs *grpc.Server) {
onu_inter_adapter_service.RegisterOnuInterAdapterServiceServer(gs, handler)
})
}
/**
This function checks the liveliness and readiness of the kakfa and kv-client services
and update the status in the probe.
*/
func (a *adapter) checkServicesReadiness(ctx context.Context) {
// checks the kafka readiness
go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
// checks the kv-store readiness
go a.checkKvStoreReadiness(ctx)
}
/**
This function checks the liveliness and readiness of the kv-store service
and update the status in the probe.
*/
func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
// dividing the live probe interval by 2 to get updated status every 30s
timeout := a.config.LiveProbeInterval / 2
kvStoreChannel := make(chan bool, 1)
// Default true - we are here only after we already had a KV store connection
kvStoreChannel <- true
for {
timeoutTimer := time.NewTimer(timeout)
select {
case liveliness := <-kvStoreChannel:
if !liveliness {
// kv-store not reachable or down, updating the status to not ready state
probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
timeout = a.config.NotLiveProbeInterval
} else {
// kv-store is reachable , updating the status to running state
probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
timeout = a.config.LiveProbeInterval / 2
}
// Check if the timer has expired or not
if !timeoutTimer.Stop() {
<-timeoutTimer.C
}
case <-timeoutTimer.C:
// Check the status of the kv-store
logger.Info(ctx, "kv-store liveliness-recheck")
if a.kvClient.IsConnectionUp(ctx) {
kvStoreChannel <- true
} else {
kvStoreChannel <- false
}
}
}
}
// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
// context times out.
func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
if kvClient == nil {
return errors.New("kvclient-is-nil")
}
for {
if !kvClient.IsConnectionUp(ctx) {
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
select {
case <-time.After(connectionRetryInterval):
continue
case <-ctx.Done():
return ctx.Err()
}
}
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Info(ctx, "kv-connection-up")
break
}
return nil
}
// Adapter Utility methods ##### end #########
// #############################################
func getVerifiedCodeVersion(ctx context.Context) string {
if version.VersionInfo.Version == "unknown-version" {
content, err := ioutil.ReadFile("VERSION")
if err == nil {
return string(content)
}
logger.Error(ctx, "'VERSION'-file not readable")
}
return version.VersionInfo.Version
}
func printVersion(appName string) {
fmt.Println(appName)
fmt.Println(version.VersionInfo.String(" "))
}
func printBanner() {
fmt.Println(" ____ ____ ___ ___ _ ")
fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
fmt.Println(" | | __| |")
fmt.Println(" |_| |____/")
fmt.Println(" ")
}
func waitForExit(ctx context.Context) int {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
exitChannel := make(chan int)
go func() {
select {
case <-ctx.Done():
logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
exitChannel <- 2
case s := <-signalChannel:
switch s {
case syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT:
logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
exitChannel <- 0
default:
logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
exitChannel <- 1
}
}
}()
code := <-exitChannel
return code
}
func main() {
start := time.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cf := &config.AdapterFlags{}
cf.ParseCommandArguments(os.Args[1:])
defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
// Setup logging
logLevel, err := log.StringToLogLevel(cf.LogLevel)
if err != nil {
logger.Fatalf(ctx, "Cannot setup logging, %s", err)
}
// Setup default logger - applies for packages that do not have specific logger set
if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
}
// Update all loggers (provisioned via init) with a common field
if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
}
log.SetAllLogLevel(logLevel)
realMain(ctx) //fatal on httpListen(0,6060) ...
defer func() {
_ = log.CleanUp()
}()
// Print version / build information and exit
if cf.DisplayVersionOnly {
printVersion(defaultAppName)
return
}
logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
// Print banner if specified
if cf.Banner {
printBanner()
}
logger.Infow(ctx, "config", log.Fields{"config": *cf})
ad := newAdapter(cf)
p := &probe.Probe{}
logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
if err != nil {
logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
} else {
defer log.TerminateTracing(closer)
}
go func() {
err := ad.start(probeCtx)
// If this operation returns an error
// cancel all operations using this context
if err != nil {
cancel()
}
}()
code := waitForExit(ctx)
logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
// Set the ONU adapter GRPC service as not ready. This will prevent any request from coming to this adapter instance
probe.UpdateStatusFromContext(probeCtx, onuAdapterService, probe.ServiceStatusStopped)
// Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
ctxWithCancel, cancelFunc := context.WithCancel(ctx)
// Cleanup before leaving
ad.stop(ctxWithCancel)
// Will halt any long-running stop routine gracefully
cancelFunc()
elapsed := time.Since(start)
logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
//logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
}