khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2018-present Open Networking Foundation |
| 3 | |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 16 | |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 17 | package core |
| 18 | |
| 19 | import ( |
| 20 | "context" |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 21 | "strconv" |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 22 | "time" |
| 23 | |
sbarbari | 17d7e22 | 2019-11-05 10:02:29 -0500 | [diff] [blame] | 24 | "github.com/opencord/voltha-go/db/model" |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 25 | "github.com/opencord/voltha-go/rw_core/config" |
Kent Hagerman | 2b21604 | 2020-04-03 18:28:56 -0400 | [diff] [blame] | 26 | "github.com/opencord/voltha-go/rw_core/core/adapter" |
| 27 | "github.com/opencord/voltha-go/rw_core/core/api" |
| 28 | "github.com/opencord/voltha-go/rw_core/core/device" |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 29 | conf "github.com/opencord/voltha-lib-go/v3/pkg/config" |
serkant.uluderya | 2ae470f | 2020-01-21 11:13:09 -0800 | [diff] [blame] | 30 | "github.com/opencord/voltha-lib-go/v3/pkg/db" |
serkant.uluderya | 2ae470f | 2020-01-21 11:13:09 -0800 | [diff] [blame] | 31 | grpcserver "github.com/opencord/voltha-lib-go/v3/pkg/grpc" |
| 32 | "github.com/opencord/voltha-lib-go/v3/pkg/kafka" |
| 33 | "github.com/opencord/voltha-lib-go/v3/pkg/log" |
| 34 | "github.com/opencord/voltha-lib-go/v3/pkg/probe" |
| 35 | "github.com/opencord/voltha-protos/v3/go/voltha" |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 36 | "google.golang.org/grpc" |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 37 | ) |
| 38 | |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 39 | // Core represent read,write core attributes |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 40 | type Core struct { |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 41 | shutdown context.CancelFunc |
| 42 | stopped chan struct{} |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 43 | } |
| 44 | |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 45 | // NewCore creates instance of rw core |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 46 | func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core { |
David K. Bainbridge | b4a9ab0 | 2019-09-20 15:12:16 -0700 | [diff] [blame] | 47 | // If the context has a probe then fetch it and register our services |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 48 | if p := probe.GetProbeFromContext(ctx); p != nil { |
| 49 | p.RegisterService( |
| 50 | "message-bus", |
| 51 | "kv-store", |
| 52 | "adapter-manager", |
| 53 | "grpc-service", |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 54 | ) |
| 55 | } |
| 56 | |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 57 | // new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called |
| 58 | shutdownCtx, cancelCtx := context.WithCancel(ctx) |
| 59 | |
| 60 | core := &Core{shutdown: cancelCtx, stopped: make(chan struct{})} |
| 61 | go core.start(shutdownCtx, id, cf) |
| 62 | return core |
| 63 | } |
| 64 | |
| 65 | func (core *Core) start(ctx context.Context, id string, cf *config.RWCoreFlags) { |
| 66 | logger.Info("starting-core-services", log.Fields{"coreId": id}) |
| 67 | |
| 68 | // deferred functions are used to run cleanup |
| 69 | // failing partway will stop anything that's been started |
| 70 | defer close(core.stopped) |
| 71 | defer core.shutdown() |
| 72 | |
| 73 | logger.Info("Starting RW Core components") |
| 74 | |
| 75 | // setup kv client |
| 76 | logger.Debugw("create-kv-client", log.Fields{"kvstore": cf.KVStoreType}) |
| 77 | kvClient, err := newKVClient(cf.KVStoreType, cf.KVStoreHost+":"+strconv.Itoa(cf.KVStorePort), cf.KVStoreTimeout) |
| 78 | if err != nil { |
| 79 | logger.Fatal(err) |
| 80 | } |
| 81 | defer stopKVClient(context.Background(), kvClient) |
| 82 | |
| 83 | // sync logging config with kv store |
| 84 | cm := conf.NewConfigManager(kvClient, cf.KVStoreType, cf.KVStoreHost, cf.KVStorePort, cf.KVStoreTimeout) |
| 85 | go conf.StartLogLevelConfigProcessing(cm, ctx) |
| 86 | |
| 87 | backend := &db.Backend{ |
| 88 | Client: kvClient, |
| 89 | StoreType: cf.KVStoreType, |
| 90 | Host: cf.KVStoreHost, |
| 91 | Port: cf.KVStorePort, |
| 92 | Timeout: cf.KVStoreTimeout, |
| 93 | // Configure backend to push Liveness Status at least every (cf.LiveProbeInterval / 2) seconds |
| 94 | // so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive |
| 95 | LivenessChannelInterval: cf.LiveProbeInterval / 2, |
| 96 | PathPrefix: cf.KVStoreDataPrefix, |
| 97 | } |
| 98 | |
| 99 | // wait until connection to KV Store is up |
| 100 | if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil { |
| 101 | logger.Fatal("Unable-to-connect-to-KV-store") |
| 102 | } |
| 103 | go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval) |
| 104 | |
| 105 | // create kafka client |
| 106 | kafkaClient := kafka.NewSaramaClient( |
| 107 | kafka.Host(cf.KafkaAdapterHost), |
| 108 | kafka.Port(cf.KafkaAdapterPort), |
| 109 | kafka.ConsumerType(kafka.GroupCustomer), |
| 110 | kafka.ProducerReturnOnErrors(true), |
| 111 | kafka.ProducerReturnOnSuccess(true), |
| 112 | kafka.ProducerMaxRetries(6), |
| 113 | kafka.NumPartitions(3), |
| 114 | kafka.ConsumerGroupName(id), |
| 115 | kafka.ConsumerGroupPrefix(id), |
| 116 | kafka.AutoCreateTopic(true), |
| 117 | kafka.ProducerFlushFrequency(5), |
| 118 | kafka.ProducerRetryBackoff(time.Millisecond*30), |
| 119 | kafka.LivenessChannelInterval(cf.LiveProbeInterval/2), |
| 120 | ) |
| 121 | // defer kafkaClient.Stop() |
| 122 | |
| 123 | // create kv proxy |
| 124 | proxy := model.NewProxy(backend, "/") |
| 125 | |
| 126 | // load adapters & device types while other things are starting |
| 127 | adapterMgr := adapter.NewAdapterManager(proxy, id, kafkaClient) |
| 128 | go adapterMgr.Start(ctx) |
| 129 | |
| 130 | // connect to kafka, then wait until reachable and publisher/consumer created |
| 131 | // core.kmp must be created before deviceMgr and adapterMgr |
| 132 | kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterHost, cf.KafkaAdapterPort, cf.CoreTopic, cf.AffinityRouterTopic, cf.ConnectionRetryInterval) |
| 133 | if err != nil { |
| 134 | logger.Warn("Failed to setup kafka connection") |
| 135 | return |
| 136 | } |
| 137 | defer kmp.Stop() |
| 138 | go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval) |
| 139 | |
| 140 | // create the core of the system, the device managers |
| 141 | endpointMgr := kafka.NewEndpointManager(backend) |
| 142 | deviceMgr, logicalDeviceMgr := device.NewManagers(proxy, adapterMgr, kmp, endpointMgr, cf.CorePairTopic, id, cf.DefaultCoreTimeout) |
| 143 | |
| 144 | // register kafka RPC handler |
| 145 | registerAdapterRequestHandlers(kmp, deviceMgr, adapterMgr, cf.CoreTopic, cf.CorePairTopic) |
| 146 | |
| 147 | // start gRPC handler |
| 148 | grpcServer := grpcserver.NewGrpcServer(cf.GrpcHost, cf.GrpcPort, nil, false, probe.GetProbeFromContext(ctx)) |
| 149 | go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr)) |
| 150 | defer grpcServer.Stop() |
| 151 | |
| 152 | // wait for core to be stopped, via Stop() or context cancellation, before running deferred functions |
| 153 | <-ctx.Done() |
| 154 | } |
| 155 | |
| 156 | // Stop brings down core services |
| 157 | func (core *Core) Stop() { |
| 158 | core.shutdown() |
| 159 | <-core.stopped |
| 160 | } |
| 161 | |
| 162 | // startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server |
| 163 | func startGRPCService(ctx context.Context, server *grpcserver.GrpcServer, handler voltha.VolthaServiceServer) { |
| 164 | logger.Info("grpc-server-created") |
| 165 | |
| 166 | server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) }) |
Girish Kumar | f56a468 | 2020-03-20 20:07:46 +0000 | [diff] [blame] | 167 | logger.Info("grpc-service-added") |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 168 | |
David K. Bainbridge | b4a9ab0 | 2019-09-20 15:12:16 -0700 | [diff] [blame] | 169 | probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning) |
Girish Kumar | f56a468 | 2020-03-20 20:07:46 +0000 | [diff] [blame] | 170 | logger.Info("grpc-server-started") |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 171 | // Note that there is a small window here in which the core could return its status as ready, |
| 172 | // when it really isn't. This is unlikely to cause issues, as the delay is incredibly short. |
| 173 | server.Start(ctx) |
David K. Bainbridge | b4a9ab0 | 2019-09-20 15:12:16 -0700 | [diff] [blame] | 174 | probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped) |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 175 | } |