khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 1 | /* |
Joey Armstrong | 5f51f2e | 2023-01-17 17:06:26 -0500 | [diff] [blame] | 2 | * Copyright 2018-2023 Open Networking Foundation (ONF) and the ONF Contributors |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 3 | |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 16 | |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 17 | package core |
| 18 | |
| 19 | import ( |
| 20 | "context" |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 21 | "time" |
| 22 | |
sbarbari | 17d7e22 | 2019-11-05 10:02:29 -0500 | [diff] [blame] | 23 | "github.com/opencord/voltha-go/db/model" |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 24 | "github.com/opencord/voltha-go/rw_core/config" |
Kent Hagerman | 2b21604 | 2020-04-03 18:28:56 -0400 | [diff] [blame] | 25 | "github.com/opencord/voltha-go/rw_core/core/adapter" |
| 26 | "github.com/opencord/voltha-go/rw_core/core/api" |
| 27 | "github.com/opencord/voltha-go/rw_core/core/device" |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 28 | conf "github.com/opencord/voltha-lib-go/v7/pkg/config" |
| 29 | "github.com/opencord/voltha-lib-go/v7/pkg/events" |
| 30 | grpcserver "github.com/opencord/voltha-lib-go/v7/pkg/grpc" |
| 31 | "github.com/opencord/voltha-lib-go/v7/pkg/kafka" |
| 32 | "github.com/opencord/voltha-lib-go/v7/pkg/log" |
| 33 | "github.com/opencord/voltha-lib-go/v7/pkg/probe" |
khenaidoo | 9beaaf1 | 2021-10-19 17:32:01 -0400 | [diff] [blame] | 34 | "github.com/opencord/voltha-protos/v5/go/core_service" |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 35 | "github.com/opencord/voltha-protos/v5/go/extension" |
| 36 | "github.com/opencord/voltha-protos/v5/go/voltha" |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 37 | "google.golang.org/grpc" |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 38 | ) |
| 39 | |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 40 | // Core represent read,write core attributes |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 41 | type Core struct { |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 42 | Shutdown context.CancelFunc |
| 43 | Stopped chan struct{} |
| 44 | KafkaClient kafka.Client |
khenaidoo | a46458b | 2021-12-15 16:50:44 -0500 | [diff] [blame] | 45 | adapterMgr *adapter.Manager |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 46 | } |
| 47 | |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 48 | const ( |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 49 | clusterMessagingService = "cluster-message-service" |
| 50 | grpcNBIService = "grpc-nbi-service" |
| 51 | grpcSBIService = "grpc-sbi-service" |
| 52 | adapterService = "adapter-service" |
| 53 | kvService = "kv-service" |
| 54 | deviceService = "device-service" |
| 55 | logicalDeviceService = "logical-device-service" |
Matteo Scandolo | b3ba79c | 2021-03-01 10:53:23 -0800 | [diff] [blame] | 56 | ) |
| 57 | |
npujar | 1d86a52 | 2019-11-14 17:11:16 +0530 | [diff] [blame] | 58 | // NewCore creates instance of rw core |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 59 | func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) (*Core, context.Context) { |
David K. Bainbridge | b4a9ab0 | 2019-09-20 15:12:16 -0700 | [diff] [blame] | 60 | // If the context has a probe then fetch it and register our services |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 61 | if p := probe.GetProbeFromContext(ctx); p != nil { |
| 62 | p.RegisterService( |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 63 | ctx, |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 64 | kvService, |
| 65 | adapterService, |
| 66 | grpcSBIService, |
| 67 | clusterMessagingService, |
| 68 | deviceService, |
| 69 | logicalDeviceService, |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 70 | ) |
| 71 | } |
| 72 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 73 | // create kafka client for events |
| 74 | KafkaClient := kafka.NewSaramaClient( |
| 75 | kafka.Address(cf.KafkaClusterAddress), |
| 76 | kafka.ProducerReturnOnErrors(true), |
| 77 | kafka.ProducerReturnOnSuccess(true), |
| 78 | kafka.ProducerMaxRetries(6), |
| 79 | kafka.ProducerRetryBackoff(time.Millisecond*30), |
| 80 | kafka.AutoCreateTopic(true), |
| 81 | kafka.MetadatMaxRetries(15), |
| 82 | ) |
| 83 | |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 84 | // new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called |
| 85 | shutdownCtx, cancelCtx := context.WithCancel(ctx) |
| 86 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 87 | rwCore := &Core{Shutdown: cancelCtx, Stopped: make(chan struct{}), KafkaClient: KafkaClient} |
| 88 | return rwCore, shutdownCtx |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 89 | } |
| 90 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 91 | func (core *Core) Start(ctx context.Context, id string, cf *config.RWCoreFlags) { |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 92 | logger.Info(ctx, "starting-core-services", log.Fields{"coreId": id}) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 93 | |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 94 | // setup kv client |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 95 | logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType}) |
| 96 | kvClient, err := newKVClient(ctx, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 97 | if err != nil { |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 98 | logger.Fatal(ctx, err) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 99 | } |
Rohan Agrawal | cf12f20 | 2020-08-03 04:42:01 +0000 | [diff] [blame] | 100 | defer stopKVClient(log.WithSpanFromContext(context.Background(), ctx), kvClient) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 101 | |
| 102 | // sync logging config with kv store |
Rohan Agrawal | 31f2180 | 2020-06-12 05:38:46 +0000 | [diff] [blame] | 103 | cm := conf.NewConfigManager(ctx, kvClient, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 104 | go conf.StartLogLevelConfigProcessing(cm, ctx) |
Girish Kumar | f8d4f8d | 2020-08-18 11:45:30 +0000 | [diff] [blame] | 105 | go conf.StartLogFeaturesConfigProcessing(cm, ctx) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 106 | |
serkant.uluderya | 8ff291d | 2020-05-20 00:58:00 -0700 | [diff] [blame] | 107 | backend := cm.Backend |
| 108 | backend.LivenessChannelInterval = cf.LiveProbeInterval / 2 |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 109 | |
| 110 | // wait until connection to KV Store is up |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 111 | if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval, kvService); err != nil { |
Himani Chawla | b4c2591 | 2020-11-12 17:16:38 +0530 | [diff] [blame] | 112 | logger.Fatal(ctx, "unable-to-connect-to-kv-store") |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 113 | } |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 114 | go monitorKVStoreLiveness(ctx, backend, kvService, cf.LiveProbeInterval, cf.NotLiveProbeInterval) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 115 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 116 | // Start kafka communications and artefacts |
| 117 | if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, core.KafkaClient, cf.ConnectionRetryInterval, clusterMessagingService); err != nil { |
| 118 | logger.Fatal(ctx, "unable-to-connect-to-kafka") |
Himani Chawla | b4c2591 | 2020-11-12 17:16:38 +0530 | [diff] [blame] | 119 | } |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 120 | defer core.KafkaClient.Stop(ctx) |
Himani Chawla | b4c2591 | 2020-11-12 17:16:38 +0530 | [diff] [blame] | 121 | |
kesavand | 92fac10 | 2022-03-16 12:33:06 +0530 | [diff] [blame] | 122 | // create the voltha.events topic |
| 123 | topic := &kafka.Topic{Name: cf.EventTopic} |
| 124 | if err := core.KafkaClient.CreateTopic(ctx, topic, cf.EventTopicPartitions, cf.EventTopicReplicas); err != nil { |
| 125 | if err != nil { |
| 126 | logger.Fatal(ctx, "unable-to create topic", log.Fields{"topic": cf.EventTopic, "error": err}) |
| 127 | } |
| 128 | } |
| 129 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 130 | // Create the event proxy to post events to KAFKA |
| 131 | eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic})) |
| 132 | go func() { |
| 133 | if err := eventProxy.Start(); err != nil { |
| 134 | logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err}) |
| 135 | } |
| 136 | }() |
| 137 | defer eventProxy.Stop() |
| 138 | |
| 139 | // Start the kafka monitoring routine |
| 140 | go kafka.MonitorKafkaReadiness(ctx, core.KafkaClient, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessagingService) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 141 | |
Kent Hagerman | f5a6735 | 2020-04-30 15:15:26 -0400 | [diff] [blame] | 142 | // create kv path |
| 143 | dbPath := model.NewDBPath(backend) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 144 | |
| 145 | // load adapters & device types while other things are starting |
nikesh.krishnan | 0ded28d | 2023-06-28 12:36:32 +0530 | [diff] [blame] | 146 | adapterMgr := adapter.NewAdapterManager(cf.GrpcSBIAddress, dbPath, id, backend, cf.LiveProbeInterval, cf.MaxRetries, cf.PerRPCRetryTimeout) |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 147 | adapterMgr.Start(ctx, adapterService) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 148 | |
khenaidoo | a46458b | 2021-12-15 16:50:44 -0500 | [diff] [blame] | 149 | // We do not do a defer adapterMgr.Stop() here as we want this to be ran as soon as |
| 150 | // the core is stopped |
| 151 | core.adapterMgr = adapterMgr |
| 152 | |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 153 | // create the core of the system, the device managers |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 154 | deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, cf, id, eventProxy) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 155 | |
khenaidoo | 7585a96 | 2021-06-10 16:15:38 -0400 | [diff] [blame] | 156 | // Start the device manager to load the devices. Wait until it is completed to prevent multiple loading happening |
| 157 | // triggered by logicalDeviceMgr.Start(Ctx) |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 158 | err = deviceMgr.Start(ctx, deviceService) |
| 159 | if err != nil { |
| 160 | logger.Fatalw(ctx, "failure-starting-device-manager", log.Fields{"error": err}) |
| 161 | } |
khenaidoo | a46458b | 2021-12-15 16:50:44 -0500 | [diff] [blame] | 162 | defer deviceMgr.Stop(ctx, deviceService) |
khenaidoo | 7585a96 | 2021-06-10 16:15:38 -0400 | [diff] [blame] | 163 | |
| 164 | // Start the logical device manager to load the logical devices. |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 165 | logicalDeviceMgr.Start(ctx, logicalDeviceService) |
khenaidoo | 7585a96 | 2021-06-10 16:15:38 -0400 | [diff] [blame] | 166 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 167 | // Create and start the SBI gRPC service |
| 168 | grpcSBIServer := grpcserver.NewGrpcServer(cf.GrpcSBIAddress, nil, false, probe.GetProbeFromContext(ctx)) |
| 169 | go startGrpcSbiService(ctx, grpcSBIServer, grpcSBIService, api.NewAPIHandler(deviceMgr, nil, adapterMgr)) |
| 170 | defer grpcSBIServer.Stop() |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 171 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 172 | // In the case of a restart, let's wait until all the registered adapters are connected to the Core |
| 173 | // before starting the grpc server that handles NBI requests. |
| 174 | err = adapterMgr.WaitUntilConnectionsToAdaptersAreUp(ctx, cf.ConnectionRetryInterval) |
| 175 | if err != nil { |
| 176 | logger.Fatalw(ctx, "failure-connecting-to-adapters", log.Fields{"error": err}) |
| 177 | } |
| 178 | |
| 179 | // Create the NBI gRPC server |
| 180 | grpcNBIServer := grpcserver.NewGrpcServer(cf.GrpcNBIAddress, nil, false, probe.GetProbeFromContext(ctx)) |
Salman Siddiqui | 1cf9504 | 2020-11-19 00:42:56 +0530 | [diff] [blame] | 181 | |
| 182 | //Register the 'Extension' service on this gRPC server |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 183 | addGRPCExtensionService(ctx, grpcNBIServer, device.GetNewExtensionManager(deviceMgr)) |
Salman Siddiqui | 1cf9504 | 2020-11-19 00:42:56 +0530 | [diff] [blame] | 184 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 185 | go startGrpcNbiService(ctx, grpcNBIServer, grpcNBIService, api.NewAPIHandler(deviceMgr, logicalDeviceMgr, adapterMgr)) |
| 186 | defer grpcNBIServer.Stop() |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 187 | |
| 188 | // wait for core to be stopped, via Stop() or context cancellation, before running deferred functions |
| 189 | <-ctx.Done() |
| 190 | } |
| 191 | |
| 192 | // Stop brings down core services |
khenaidoo | a46458b | 2021-12-15 16:50:44 -0500 | [diff] [blame] | 193 | func (core *Core) Stop(ctx context.Context) { |
| 194 | // Close all the grpc clients connections to the adapters first |
| 195 | core.adapterMgr.Stop(ctx) |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 196 | core.Shutdown() |
khenaidoo | a46458b | 2021-12-15 16:50:44 -0500 | [diff] [blame] | 197 | close(core.Stopped) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 198 | } |
| 199 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 200 | // startGrpcSbiService creates the grpc core service handlers, registers it to the grpc server and starts the server |
khenaidoo | 9beaaf1 | 2021-10-19 17:32:01 -0400 | [diff] [blame] | 201 | func startGrpcSbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler core_service.CoreServiceServer) { |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 202 | logger.Infow(ctx, "starting-grpc-sbi-service", log.Fields{"service": serviceName}) |
| 203 | |
khenaidoo | 9beaaf1 | 2021-10-19 17:32:01 -0400 | [diff] [blame] | 204 | server.AddService(func(server *grpc.Server) { core_service.RegisterCoreServiceServer(server, handler) }) |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 205 | logger.Infow(ctx, "grpc-sbi-service-added", log.Fields{"service": serviceName}) |
| 206 | |
| 207 | probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning) |
| 208 | logger.Infow(ctx, "grpc-sbi-server-started", log.Fields{"service": serviceName}) |
| 209 | server.Start(ctx) |
| 210 | probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped) |
| 211 | } |
| 212 | |
| 213 | // startGrpcNbiService creates the grpc NBI service handlers, registers it to the grpc server and starts the server |
| 214 | func startGrpcNbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler voltha.VolthaServiceServer) { |
| 215 | logger.Infow(ctx, "starting-grpc-nbi-service", log.Fields{"service": serviceName}) |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 216 | |
| 217 | server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) }) |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 218 | logger.Infow(ctx, "grpc-nbi-service-added-and-started", log.Fields{"service": serviceName}) |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 219 | |
Kent Hagerman | 2f0d055 | 2020-04-23 17:28:52 -0400 | [diff] [blame] | 220 | // Note that there is a small window here in which the core could return its status as ready, |
| 221 | // when it really isn't. This is unlikely to cause issues, as the delay is incredibly short. |
| 222 | server.Start(ctx) |
khenaidoo | b920354 | 2018-09-17 22:56:37 -0400 | [diff] [blame] | 223 | } |
Salman Siddiqui | 1cf9504 | 2020-11-19 00:42:56 +0530 | [diff] [blame] | 224 | |
| 225 | func addGRPCExtensionService(ctx context.Context, server *grpcserver.GrpcServer, handler extension.ExtensionServer) { |
| 226 | logger.Info(ctx, "extension-grpc-server-created") |
| 227 | |
| 228 | server.AddService(func(server *grpc.Server) { |
| 229 | extension.RegisterExtensionServer(server, handler) |
| 230 | }) |
Salman Siddiqui | 1cf9504 | 2020-11-19 00:42:56 +0530 | [diff] [blame] | 231 | } |