khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2018-present Open Networking Foundation |
| 3 | |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 16 | |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 17 | package core |
| 18 | |
| 19 | import ( |
| 20 | "context" |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 21 | "time" |
| 22 | |
sbarbari | 17d7e22 | 2019-11-05 10:02:29 -0500 | [diff] [blame] | 23 | "github.com/opencord/voltha-go/db/model" |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 24 | "github.com/opencord/voltha-go/rw_core/config" |
Kent Hagerman | 2b21604 | 2020-04-03 18:28:56 -0400 | [diff] [blame] | 25 | "github.com/opencord/voltha-go/rw_core/core/adapter" |
| 26 | "github.com/opencord/voltha-go/rw_core/core/api" |
| 27 | "github.com/opencord/voltha-go/rw_core/core/device" |
Maninder | dfadc98 | 2020-10-28 14:04:33 +0530 | [diff] [blame] | 28 | conf "github.com/opencord/voltha-lib-go/v4/pkg/config" |
| 29 | grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc" |
| 30 | "github.com/opencord/voltha-lib-go/v4/pkg/kafka" |
| 31 | "github.com/opencord/voltha-lib-go/v4/pkg/log" |
| 32 | "github.com/opencord/voltha-lib-go/v4/pkg/probe" |
Salman Siddiqui | 1cf9504 | 2020-11-19 00:42:56 +0530 | [diff] [blame] | 33 | "github.com/opencord/voltha-protos/v4/go/extension" |
Maninder | dfadc98 | 2020-10-28 14:04:33 +0530 | [diff] [blame] | 34 | "github.com/opencord/voltha-protos/v4/go/voltha" |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 35 | "google.golang.org/grpc" |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 36 | ) |
| 37 | |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 38 | // Core represent read,write core attributes |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 39 | type Core struct { |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 40 | shutdown context.CancelFunc |
| 41 | stopped chan struct{} |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 42 | } |
| 43 | |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 44 | const ( |
| 45 | adapterMessageBus = "adapter-message-bus" |
| 46 | clusterMessageBus = "cluster-message-bus" |
| 47 | ) |
| 48 | |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 49 | // NewCore creates instance of rw core |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 50 | func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core { |
David K. Bainbridge | b4a9ab0 | 2019-09-20 15:12:16 -0700 | [diff] [blame] | 51 | // If the context has a probe then fetch it and register our services |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 52 | if p := probe.GetProbeFromContext(ctx); p != nil { |
| 53 | p.RegisterService( |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 54 | ctx, |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 55 | adapterMessageBus, |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 56 | "kv-store", |
| 57 | "adapter-manager", |
khenaidoo | 7585a96 | 2021-06-10 16:15:38 -0400 | [diff] [blame^] | 58 | "device-manager", |
| 59 | "logical-device-manager", |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 60 | "grpc-service", |
khenaidoo | 5e4fca3 | 2021-05-12 16:02:23 -0400 | [diff] [blame] | 61 | "adapter-request-handler", |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 62 | ) |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 63 | |
| 64 | if cf.KafkaAdapterAddress != cf.KafkaClusterAddress { |
| 65 | p.RegisterService( |
| 66 | ctx, |
| 67 | clusterMessageBus, |
| 68 | ) |
| 69 | } |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 70 | } |
| 71 | |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 72 | // new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called |
| 73 | shutdownCtx, cancelCtx := context.WithCancel(ctx) |
| 74 | |
| 75 | core := &Core{shutdown: cancelCtx, stopped: make(chan struct{})} |
| 76 | go core.start(shutdownCtx, id, cf) |
| 77 | return core |
| 78 | } |
| 79 | |
| 80 | func (core *Core) start(ctx context.Context, id string, cf *config.RWCoreFlags) { |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 81 | logger.Info(ctx, "starting-core-services", log.Fields{"coreId": id}) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 82 | |
| 83 | // deferred functions are used to run cleanup |
| 84 | // failing partway will stop anything that's been started |
| 85 | defer close(core.stopped) |
| 86 | defer core.shutdown() |
| 87 | |
Himani Chawla | b4c2591 | 2020-11-12 17:16:38 +0530 | [diff] [blame] | 88 | logger.Info(ctx, "starting-rw-core-components") |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 89 | |
| 90 | // setup kv client |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 91 | logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType}) |
| 92 | kvClient, err := newKVClient(ctx, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 93 | if err != nil { |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 94 | logger.Fatal(ctx, err) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 95 | } |
Rohan Agrawal | cf12f20 | 2020-08-03 04:42:01 +0000 | [diff] [blame] | 96 | defer stopKVClient(log.WithSpanFromContext(context.Background(), ctx), kvClient) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 97 | |
| 98 | // sync logging config with kv store |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 99 | cm := conf.NewConfigManager(ctx, kvClient, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 100 | go conf.StartLogLevelConfigProcessing(cm, ctx) |
Girish Kumar | f8d4f8d | 2020-08-18 11:45:30 +0000 | [diff] [blame] | 101 | go conf.StartLogFeaturesConfigProcessing(cm, ctx) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 102 | |
serkant.uluderya | 8ff291d | 2020-05-20 00:58:00 -0700 | [diff] [blame] | 103 | backend := cm.Backend |
| 104 | backend.LivenessChannelInterval = cf.LiveProbeInterval / 2 |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 105 | |
| 106 | // wait until connection to KV Store is up |
| 107 | if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil { |
Himani Chawla | b4c2591 | 2020-11-12 17:16:38 +0530 | [diff] [blame] | 108 | logger.Fatal(ctx, "unable-to-connect-to-kv-store") |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 109 | } |
| 110 | go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval) |
| 111 | |
| 112 | // create kafka client |
| 113 | kafkaClient := kafka.NewSaramaClient( |
Neha Sharma | d1387da | 2020-05-07 20:07:28 +0000 | [diff] [blame] | 114 | kafka.Address(cf.KafkaAdapterAddress), |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 115 | kafka.ConsumerType(kafka.GroupCustomer), |
| 116 | kafka.ProducerReturnOnErrors(true), |
| 117 | kafka.ProducerReturnOnSuccess(true), |
| 118 | kafka.ProducerMaxRetries(6), |
| 119 | kafka.NumPartitions(3), |
| 120 | kafka.ConsumerGroupName(id), |
| 121 | kafka.ConsumerGroupPrefix(id), |
| 122 | kafka.AutoCreateTopic(true), |
| 123 | kafka.ProducerFlushFrequency(5), |
| 124 | kafka.ProducerRetryBackoff(time.Millisecond*30), |
| 125 | kafka.LivenessChannelInterval(cf.LiveProbeInterval/2), |
| 126 | ) |
Himani Chawla | b4c2591 | 2020-11-12 17:16:38 +0530 | [diff] [blame] | 127 | |
| 128 | // create kafka client for events |
| 129 | kafkaClientEvent := kafka.NewSaramaClient( |
| 130 | kafka.Address(cf.KafkaClusterAddress), |
| 131 | kafka.ProducerReturnOnErrors(true), |
| 132 | kafka.ProducerReturnOnSuccess(true), |
| 133 | kafka.ProducerMaxRetries(6), |
| 134 | kafka.ProducerRetryBackoff(time.Millisecond*30), |
| 135 | kafka.AutoCreateTopic(true), |
| 136 | kafka.MetadatMaxRetries(15), |
| 137 | ) |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 138 | |
Himani Chawla | b4c2591 | 2020-11-12 17:16:38 +0530 | [diff] [blame] | 139 | // create event proxy |
khenaidoo | 5e4fca3 | 2021-05-12 16:02:23 -0400 | [diff] [blame] | 140 | updateProbeClusterService := cf.KafkaAdapterAddress != cf.KafkaClusterAddress |
| 141 | eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval, updateProbeClusterService) |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 142 | if err != nil { |
| 143 | logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection") |
Himani Chawla | b4c2591 | 2020-11-12 17:16:38 +0530 | [diff] [blame] | 144 | return |
| 145 | } |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 146 | if cf.KafkaAdapterAddress != cf.KafkaClusterAddress { |
| 147 | // if we're using a single kafka cluster we don't need two liveliness probes on the same cluster |
| 148 | go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus) |
| 149 | } |
Himani Chawla | b4c2591 | 2020-11-12 17:16:38 +0530 | [diff] [blame] | 150 | |
Himani Chawla | 606a4f0 | 2021-03-23 19:45:58 +0530 | [diff] [blame] | 151 | defer stopEventProxy(ctx, kafkaClientEvent, eventProxy) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 152 | |
Kent Hagerman | f5a6735 | 2020-04-30 15:15:26 -0400 | [diff] [blame] | 153 | // create kv path |
| 154 | dbPath := model.NewDBPath(backend) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 155 | |
| 156 | // load adapters & device types while other things are starting |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 157 | adapterMgr := adapter.NewAdapterManager(ctx, dbPath, id, kafkaClient) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 158 | go adapterMgr.Start(ctx) |
| 159 | |
| 160 | // connect to kafka, then wait until reachable and publisher/consumer created |
| 161 | // core.kmp must be created before deviceMgr and adapterMgr |
David Bainbridge | 9ae1313 | 2020-06-22 17:28:01 -0700 | [diff] [blame] | 162 | kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 163 | if err != nil { |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 164 | logger.Warn(ctx, "failed-to-setup-kafka-adapter-proxy-connection") |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 165 | return |
| 166 | } |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 167 | defer kmp.Stop(ctx) |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 168 | go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval, adapterMessageBus) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 169 | |
| 170 | // create the core of the system, the device managers |
| 171 | endpointMgr := kafka.NewEndpointManager(backend) |
Maninder | 0aabf0c | 2021-03-17 14:55:14 +0530 | [diff] [blame] | 172 | deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf, id, eventProxy) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 173 | |
khenaidoo | 7585a96 | 2021-06-10 16:15:38 -0400 | [diff] [blame^] | 174 | // Start the device manager to load the devices. Wait until it is completed to prevent multiple loading happening |
| 175 | // triggered by logicalDeviceMgr.Start(Ctx) |
| 176 | deviceMgr.Start(ctx) |
| 177 | |
| 178 | // Start the logical device manager to load the logical devices. |
| 179 | logicalDeviceMgr.Start(ctx) |
| 180 | |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 181 | // register kafka RPC handler |
khenaidoo | 5e4fca3 | 2021-05-12 16:02:23 -0400 | [diff] [blame] | 182 | registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf, "adapter-request-handler") |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 183 | |
| 184 | // start gRPC handler |
Neha Sharma | d1387da | 2020-05-07 20:07:28 +0000 | [diff] [blame] | 185 | grpcServer := grpcserver.NewGrpcServer(cf.GrpcAddress, nil, false, probe.GetProbeFromContext(ctx)) |
Salman Siddiqui | 1cf9504 | 2020-11-19 00:42:56 +0530 | [diff] [blame] | 186 | |
| 187 | //Register the 'Extension' service on this gRPC server |
| 188 | addGRPCExtensionService(ctx, grpcServer, device.GetNewExtensionManager(deviceMgr)) |
| 189 | |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 190 | go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr)) |
| 191 | defer grpcServer.Stop() |
| 192 | |
| 193 | // wait for core to be stopped, via Stop() or context cancellation, before running deferred functions |
| 194 | <-ctx.Done() |
| 195 | } |
| 196 | |
| 197 | // Stop brings down core services |
| 198 | func (core *Core) Stop() { |
| 199 | core.shutdown() |
| 200 | <-core.stopped |
| 201 | } |
| 202 | |
| 203 | // startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server |
| 204 | func startGRPCService(ctx context.Context, server *grpcserver.GrpcServer, handler voltha.VolthaServiceServer) { |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 205 | logger.Info(ctx, "grpc-server-created") |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 206 | |
| 207 | server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) }) |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 208 | logger.Info(ctx, "grpc-service-added") |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 209 | |
David K. Bainbridge | b4a9ab0 | 2019-09-20 15:12:16 -0700 | [diff] [blame] | 210 | probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning) |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 211 | logger.Info(ctx, "grpc-server-started") |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 212 | // Note that there is a small window here in which the core could return its status as ready, |
| 213 | // when it really isn't. This is unlikely to cause issues, as the delay is incredibly short. |
| 214 | server.Start(ctx) |
David K. Bainbridge | b4a9ab0 | 2019-09-20 15:12:16 -0700 | [diff] [blame] | 215 | probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped) |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 216 | } |
Salman Siddiqui | 1cf9504 | 2020-11-19 00:42:56 +0530 | [diff] [blame] | 217 | |
| 218 | func addGRPCExtensionService(ctx context.Context, server *grpcserver.GrpcServer, handler extension.ExtensionServer) { |
| 219 | logger.Info(ctx, "extension-grpc-server-created") |
| 220 | |
| 221 | server.AddService(func(server *grpc.Server) { |
| 222 | extension.RegisterExtensionServer(server, handler) |
| 223 | }) |
| 224 | |
| 225 | } |