blob: 58c6968fbcdae8fe9f0c35a2f86f9db6b2d5b21e [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoob9203542018-09-17 22:56:37 -040017package core
18
19import (
20 "context"
npujar1d86a522019-11-14 17:11:16 +053021 "time"
22
sbarbari17d7e222019-11-05 10:02:29 -050023 "github.com/opencord/voltha-go/db/model"
khenaidoob9203542018-09-17 22:56:37 -040024 "github.com/opencord/voltha-go/rw_core/config"
Kent Hagerman2b216042020-04-03 18:28:56 -040025 "github.com/opencord/voltha-go/rw_core/core/adapter"
26 "github.com/opencord/voltha-go/rw_core/core/api"
27 "github.com/opencord/voltha-go/rw_core/core/device"
khenaidood948f772021-08-11 17:49:24 -040028 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
29 "github.com/opencord/voltha-lib-go/v7/pkg/events"
30 grpcserver "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
31 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
32 "github.com/opencord/voltha-lib-go/v7/pkg/log"
33 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
khenaidoo9beaaf12021-10-19 17:32:01 -040034 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidood948f772021-08-11 17:49:24 -040035 "github.com/opencord/voltha-protos/v5/go/extension"
36 "github.com/opencord/voltha-protos/v5/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040037 "google.golang.org/grpc"
khenaidoob9203542018-09-17 22:56:37 -040038)
39
npujar1d86a522019-11-14 17:11:16 +053040// Core represent read,write core attributes
khenaidoob9203542018-09-17 22:56:37 -040041type Core struct {
khenaidood948f772021-08-11 17:49:24 -040042 Shutdown context.CancelFunc
43 Stopped chan struct{}
44 KafkaClient kafka.Client
khenaidooa46458b2021-12-15 16:50:44 -050045 adapterMgr *adapter.Manager
khenaidoob9203542018-09-17 22:56:37 -040046}
47
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080048const (
khenaidood948f772021-08-11 17:49:24 -040049 clusterMessagingService = "cluster-message-service"
50 grpcNBIService = "grpc-nbi-service"
51 grpcSBIService = "grpc-sbi-service"
52 adapterService = "adapter-service"
53 kvService = "kv-service"
54 deviceService = "device-service"
55 logicalDeviceService = "logical-device-service"
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080056)
57
npujar1d86a522019-11-14 17:11:16 +053058// NewCore creates instance of rw core
khenaidood948f772021-08-11 17:49:24 -040059func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) (*Core, context.Context) {
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -070060 // If the context has a probe then fetch it and register our services
Kent Hagerman2f0d0552020-04-23 17:28:52 -040061 if p := probe.GetProbeFromContext(ctx); p != nil {
62 p.RegisterService(
Rohan Agrawal31f21802020-06-12 05:38:46 +000063 ctx,
khenaidood948f772021-08-11 17:49:24 -040064 kvService,
65 adapterService,
66 grpcSBIService,
67 clusterMessagingService,
68 deviceService,
69 logicalDeviceService,
khenaidoob9203542018-09-17 22:56:37 -040070 )
71 }
72
khenaidood948f772021-08-11 17:49:24 -040073 // create kafka client for events
74 KafkaClient := kafka.NewSaramaClient(
75 kafka.Address(cf.KafkaClusterAddress),
76 kafka.ProducerReturnOnErrors(true),
77 kafka.ProducerReturnOnSuccess(true),
78 kafka.ProducerMaxRetries(6),
79 kafka.ProducerRetryBackoff(time.Millisecond*30),
80 kafka.AutoCreateTopic(true),
81 kafka.MetadatMaxRetries(15),
82 )
83
Kent Hagerman2f0d0552020-04-23 17:28:52 -040084 // new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
85 shutdownCtx, cancelCtx := context.WithCancel(ctx)
86
khenaidood948f772021-08-11 17:49:24 -040087 rwCore := &Core{Shutdown: cancelCtx, Stopped: make(chan struct{}), KafkaClient: KafkaClient}
88 return rwCore, shutdownCtx
Kent Hagerman2f0d0552020-04-23 17:28:52 -040089}
90
khenaidood948f772021-08-11 17:49:24 -040091func (core *Core) Start(ctx context.Context, id string, cf *config.RWCoreFlags) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000092 logger.Info(ctx, "starting-core-services", log.Fields{"coreId": id})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040093
Kent Hagerman2f0d0552020-04-23 17:28:52 -040094 // setup kv client
Rohan Agrawal31f21802020-06-12 05:38:46 +000095 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
96 kvClient, err := newKVClient(ctx, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040097 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000098 logger.Fatal(ctx, err)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040099 }
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000100 defer stopKVClient(log.WithSpanFromContext(context.Background(), ctx), kvClient)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400101
102 // sync logging config with kv store
Rohan Agrawal31f21802020-06-12 05:38:46 +0000103 cm := conf.NewConfigManager(ctx, kvClient, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400104 go conf.StartLogLevelConfigProcessing(cm, ctx)
Girish Kumarf8d4f8d2020-08-18 11:45:30 +0000105 go conf.StartLogFeaturesConfigProcessing(cm, ctx)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400106
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700107 backend := cm.Backend
108 backend.LivenessChannelInterval = cf.LiveProbeInterval / 2
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400109
110 // wait until connection to KV Store is up
khenaidood948f772021-08-11 17:49:24 -0400111 if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval, kvService); err != nil {
Himani Chawlab4c25912020-11-12 17:16:38 +0530112 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400113 }
khenaidood948f772021-08-11 17:49:24 -0400114 go monitorKVStoreLiveness(ctx, backend, kvService, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400115
khenaidood948f772021-08-11 17:49:24 -0400116 // Start kafka communications and artefacts
117 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, core.KafkaClient, cf.ConnectionRetryInterval, clusterMessagingService); err != nil {
118 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Himani Chawlab4c25912020-11-12 17:16:38 +0530119 }
khenaidood948f772021-08-11 17:49:24 -0400120 defer core.KafkaClient.Stop(ctx)
Himani Chawlab4c25912020-11-12 17:16:38 +0530121
kesavand92fac102022-03-16 12:33:06 +0530122 // create the voltha.events topic
123 topic := &kafka.Topic{Name: cf.EventTopic}
124 if err := core.KafkaClient.CreateTopic(ctx, topic, cf.EventTopicPartitions, cf.EventTopicReplicas); err != nil {
125 if err != nil {
126 logger.Fatal(ctx, "unable-to create topic", log.Fields{"topic": cf.EventTopic, "error": err})
127 }
128 }
129
khenaidood948f772021-08-11 17:49:24 -0400130 // Create the event proxy to post events to KAFKA
131 eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
132 go func() {
133 if err := eventProxy.Start(); err != nil {
134 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
135 }
136 }()
137 defer eventProxy.Stop()
138
139 // Start the kafka monitoring routine
140 go kafka.MonitorKafkaReadiness(ctx, core.KafkaClient, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessagingService)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400141
Kent Hagermanf5a67352020-04-30 15:15:26 -0400142 // create kv path
143 dbPath := model.NewDBPath(backend)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400144
145 // load adapters & device types while other things are starting
khenaidoo25057da2021-12-08 14:40:45 -0500146 adapterMgr := adapter.NewAdapterManager(cf.GrpcSBIAddress, dbPath, id, backend, cf.LiveProbeInterval)
khenaidood948f772021-08-11 17:49:24 -0400147 adapterMgr.Start(ctx, adapterService)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400148
khenaidooa46458b2021-12-15 16:50:44 -0500149 // We do not do a defer adapterMgr.Stop() here as we want this to be ran as soon as
150 // the core is stopped
151 core.adapterMgr = adapterMgr
152
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400153 // create the core of the system, the device managers
khenaidood948f772021-08-11 17:49:24 -0400154 deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, cf, id, eventProxy)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400155
khenaidoo7585a962021-06-10 16:15:38 -0400156 // Start the device manager to load the devices. Wait until it is completed to prevent multiple loading happening
157 // triggered by logicalDeviceMgr.Start(Ctx)
khenaidood948f772021-08-11 17:49:24 -0400158 err = deviceMgr.Start(ctx, deviceService)
159 if err != nil {
160 logger.Fatalw(ctx, "failure-starting-device-manager", log.Fields{"error": err})
161 }
khenaidooa46458b2021-12-15 16:50:44 -0500162 defer deviceMgr.Stop(ctx, deviceService)
khenaidoo7585a962021-06-10 16:15:38 -0400163
164 // Start the logical device manager to load the logical devices.
khenaidood948f772021-08-11 17:49:24 -0400165 logicalDeviceMgr.Start(ctx, logicalDeviceService)
khenaidoo7585a962021-06-10 16:15:38 -0400166
khenaidood948f772021-08-11 17:49:24 -0400167 // Create and start the SBI gRPC service
168 grpcSBIServer := grpcserver.NewGrpcServer(cf.GrpcSBIAddress, nil, false, probe.GetProbeFromContext(ctx))
169 go startGrpcSbiService(ctx, grpcSBIServer, grpcSBIService, api.NewAPIHandler(deviceMgr, nil, adapterMgr))
170 defer grpcSBIServer.Stop()
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400171
khenaidood948f772021-08-11 17:49:24 -0400172 // In the case of a restart, let's wait until all the registered adapters are connected to the Core
173 // before starting the grpc server that handles NBI requests.
174 err = adapterMgr.WaitUntilConnectionsToAdaptersAreUp(ctx, cf.ConnectionRetryInterval)
175 if err != nil {
176 logger.Fatalw(ctx, "failure-connecting-to-adapters", log.Fields{"error": err})
177 }
178
179 // Create the NBI gRPC server
180 grpcNBIServer := grpcserver.NewGrpcServer(cf.GrpcNBIAddress, nil, false, probe.GetProbeFromContext(ctx))
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530181
182 //Register the 'Extension' service on this gRPC server
khenaidood948f772021-08-11 17:49:24 -0400183 addGRPCExtensionService(ctx, grpcNBIServer, device.GetNewExtensionManager(deviceMgr))
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530184
khenaidood948f772021-08-11 17:49:24 -0400185 go startGrpcNbiService(ctx, grpcNBIServer, grpcNBIService, api.NewAPIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
186 defer grpcNBIServer.Stop()
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400187
188 // wait for core to be stopped, via Stop() or context cancellation, before running deferred functions
189 <-ctx.Done()
190}
191
192// Stop brings down core services
khenaidooa46458b2021-12-15 16:50:44 -0500193func (core *Core) Stop(ctx context.Context) {
194 // Close all the grpc clients connections to the adapters first
195 core.adapterMgr.Stop(ctx)
khenaidood948f772021-08-11 17:49:24 -0400196 core.Shutdown()
khenaidooa46458b2021-12-15 16:50:44 -0500197 close(core.Stopped)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400198}
199
khenaidood948f772021-08-11 17:49:24 -0400200// startGrpcSbiService creates the grpc core service handlers, registers it to the grpc server and starts the server
khenaidoo9beaaf12021-10-19 17:32:01 -0400201func startGrpcSbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler core_service.CoreServiceServer) {
khenaidood948f772021-08-11 17:49:24 -0400202 logger.Infow(ctx, "starting-grpc-sbi-service", log.Fields{"service": serviceName})
203
khenaidoo9beaaf12021-10-19 17:32:01 -0400204 server.AddService(func(server *grpc.Server) { core_service.RegisterCoreServiceServer(server, handler) })
khenaidood948f772021-08-11 17:49:24 -0400205 logger.Infow(ctx, "grpc-sbi-service-added", log.Fields{"service": serviceName})
206
207 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
208 logger.Infow(ctx, "grpc-sbi-server-started", log.Fields{"service": serviceName})
209 server.Start(ctx)
210 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
211}
212
213// startGrpcNbiService creates the grpc NBI service handlers, registers it to the grpc server and starts the server
214func startGrpcNbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler voltha.VolthaServiceServer) {
215 logger.Infow(ctx, "starting-grpc-nbi-service", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400216
217 server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) })
khenaidood948f772021-08-11 17:49:24 -0400218 logger.Infow(ctx, "grpc-nbi-service-added-and-started", log.Fields{"service": serviceName})
khenaidoob9203542018-09-17 22:56:37 -0400219
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400220 // Note that there is a small window here in which the core could return its status as ready,
221 // when it really isn't. This is unlikely to cause issues, as the delay is incredibly short.
222 server.Start(ctx)
khenaidoob9203542018-09-17 22:56:37 -0400223}
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530224
225func addGRPCExtensionService(ctx context.Context, server *grpcserver.GrpcServer, handler extension.ExtensionServer) {
226 logger.Info(ctx, "extension-grpc-server-created")
227
228 server.AddService(func(server *grpc.Server) {
229 extension.RegisterExtensionServer(server, handler)
230 })
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530231}