blob: 3fa68eff9dc99febd9330a1095628962c13caecf [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoob9203542018-09-17 22:56:37 -040017package core
18
19import (
20 "context"
npujar1d86a522019-11-14 17:11:16 +053021 "time"
22
sbarbari17d7e222019-11-05 10:02:29 -050023 "github.com/opencord/voltha-go/db/model"
khenaidoob9203542018-09-17 22:56:37 -040024 "github.com/opencord/voltha-go/rw_core/config"
Kent Hagerman2b216042020-04-03 18:28:56 -040025 "github.com/opencord/voltha-go/rw_core/core/adapter"
26 "github.com/opencord/voltha-go/rw_core/core/api"
27 "github.com/opencord/voltha-go/rw_core/core/device"
khenaidood948f772021-08-11 17:49:24 -040028 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
29 "github.com/opencord/voltha-lib-go/v7/pkg/events"
30 grpcserver "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
31 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
32 "github.com/opencord/voltha-lib-go/v7/pkg/log"
33 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
34 "github.com/opencord/voltha-protos/v5/go/core"
35 "github.com/opencord/voltha-protos/v5/go/extension"
36 "github.com/opencord/voltha-protos/v5/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040037 "google.golang.org/grpc"
khenaidoob9203542018-09-17 22:56:37 -040038)
39
npujar1d86a522019-11-14 17:11:16 +053040// Core represent read,write core attributes
khenaidoob9203542018-09-17 22:56:37 -040041type Core struct {
khenaidood948f772021-08-11 17:49:24 -040042 Shutdown context.CancelFunc
43 Stopped chan struct{}
44 KafkaClient kafka.Client
khenaidoob9203542018-09-17 22:56:37 -040045}
46
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080047const (
khenaidood948f772021-08-11 17:49:24 -040048 clusterMessagingService = "cluster-message-service"
49 grpcNBIService = "grpc-nbi-service"
50 grpcSBIService = "grpc-sbi-service"
51 adapterService = "adapter-service"
52 kvService = "kv-service"
53 deviceService = "device-service"
54 logicalDeviceService = "logical-device-service"
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080055)
56
npujar1d86a522019-11-14 17:11:16 +053057// NewCore creates instance of rw core
khenaidood948f772021-08-11 17:49:24 -040058func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) (*Core, context.Context) {
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -070059 // If the context has a probe then fetch it and register our services
Kent Hagerman2f0d0552020-04-23 17:28:52 -040060 if p := probe.GetProbeFromContext(ctx); p != nil {
61 p.RegisterService(
Rohan Agrawal31f21802020-06-12 05:38:46 +000062 ctx,
khenaidood948f772021-08-11 17:49:24 -040063 kvService,
64 adapterService,
65 grpcSBIService,
66 clusterMessagingService,
67 deviceService,
68 logicalDeviceService,
khenaidoob9203542018-09-17 22:56:37 -040069 )
70 }
71
khenaidood948f772021-08-11 17:49:24 -040072 // create kafka client for events
73 KafkaClient := kafka.NewSaramaClient(
74 kafka.Address(cf.KafkaClusterAddress),
75 kafka.ProducerReturnOnErrors(true),
76 kafka.ProducerReturnOnSuccess(true),
77 kafka.ProducerMaxRetries(6),
78 kafka.ProducerRetryBackoff(time.Millisecond*30),
79 kafka.AutoCreateTopic(true),
80 kafka.MetadatMaxRetries(15),
81 )
82
Kent Hagerman2f0d0552020-04-23 17:28:52 -040083 // new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
84 shutdownCtx, cancelCtx := context.WithCancel(ctx)
85
khenaidood948f772021-08-11 17:49:24 -040086 rwCore := &Core{Shutdown: cancelCtx, Stopped: make(chan struct{}), KafkaClient: KafkaClient}
87 return rwCore, shutdownCtx
Kent Hagerman2f0d0552020-04-23 17:28:52 -040088}
89
khenaidood948f772021-08-11 17:49:24 -040090func (core *Core) Start(ctx context.Context, id string, cf *config.RWCoreFlags) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000091 logger.Info(ctx, "starting-core-services", log.Fields{"coreId": id})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040092
93 // deferred functions are used to run cleanup
94 // failing partway will stop anything that's been started
khenaidood948f772021-08-11 17:49:24 -040095 defer close(core.Stopped)
96 defer core.Shutdown()
Kent Hagerman2f0d0552020-04-23 17:28:52 -040097
Himani Chawlab4c25912020-11-12 17:16:38 +053098 logger.Info(ctx, "starting-rw-core-components")
Kent Hagerman2f0d0552020-04-23 17:28:52 -040099
100 // setup kv client
Rohan Agrawal31f21802020-06-12 05:38:46 +0000101 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
102 kvClient, err := newKVClient(ctx, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400103 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000104 logger.Fatal(ctx, err)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400105 }
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000106 defer stopKVClient(log.WithSpanFromContext(context.Background(), ctx), kvClient)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400107
108 // sync logging config with kv store
Rohan Agrawal31f21802020-06-12 05:38:46 +0000109 cm := conf.NewConfigManager(ctx, kvClient, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400110 go conf.StartLogLevelConfigProcessing(cm, ctx)
Girish Kumarf8d4f8d2020-08-18 11:45:30 +0000111 go conf.StartLogFeaturesConfigProcessing(cm, ctx)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400112
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700113 backend := cm.Backend
114 backend.LivenessChannelInterval = cf.LiveProbeInterval / 2
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400115
116 // wait until connection to KV Store is up
khenaidood948f772021-08-11 17:49:24 -0400117 if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval, kvService); err != nil {
Himani Chawlab4c25912020-11-12 17:16:38 +0530118 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400119 }
khenaidood948f772021-08-11 17:49:24 -0400120 go monitorKVStoreLiveness(ctx, backend, kvService, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400121
khenaidood948f772021-08-11 17:49:24 -0400122 // Start kafka communications and artefacts
123 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, core.KafkaClient, cf.ConnectionRetryInterval, clusterMessagingService); err != nil {
124 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Himani Chawlab4c25912020-11-12 17:16:38 +0530125 }
khenaidood948f772021-08-11 17:49:24 -0400126 defer core.KafkaClient.Stop(ctx)
Himani Chawlab4c25912020-11-12 17:16:38 +0530127
khenaidood948f772021-08-11 17:49:24 -0400128 // Create the event proxy to post events to KAFKA
129 eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
130 go func() {
131 if err := eventProxy.Start(); err != nil {
132 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
133 }
134 }()
135 defer eventProxy.Stop()
136
137 // Start the kafka monitoring routine
138 go kafka.MonitorKafkaReadiness(ctx, core.KafkaClient, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessagingService)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400139
Kent Hagermanf5a67352020-04-30 15:15:26 -0400140 // create kv path
141 dbPath := model.NewDBPath(backend)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400142
143 // load adapters & device types while other things are starting
khenaidood948f772021-08-11 17:49:24 -0400144 adapterMgr := adapter.NewAdapterManager(dbPath, id, backend, cf.LiveProbeInterval)
145 adapterMgr.Start(ctx, adapterService)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400146
147 // create the core of the system, the device managers
khenaidood948f772021-08-11 17:49:24 -0400148 deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, cf, id, eventProxy)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400149
khenaidoo7585a962021-06-10 16:15:38 -0400150 // Start the device manager to load the devices. Wait until it is completed to prevent multiple loading happening
151 // triggered by logicalDeviceMgr.Start(Ctx)
khenaidood948f772021-08-11 17:49:24 -0400152 err = deviceMgr.Start(ctx, deviceService)
153 if err != nil {
154 logger.Fatalw(ctx, "failure-starting-device-manager", log.Fields{"error": err})
155 }
khenaidoo7585a962021-06-10 16:15:38 -0400156
157 // Start the logical device manager to load the logical devices.
khenaidood948f772021-08-11 17:49:24 -0400158 logicalDeviceMgr.Start(ctx, logicalDeviceService)
khenaidoo7585a962021-06-10 16:15:38 -0400159
khenaidood948f772021-08-11 17:49:24 -0400160 // Create and start the SBI gRPC service
161 grpcSBIServer := grpcserver.NewGrpcServer(cf.GrpcSBIAddress, nil, false, probe.GetProbeFromContext(ctx))
162 go startGrpcSbiService(ctx, grpcSBIServer, grpcSBIService, api.NewAPIHandler(deviceMgr, nil, adapterMgr))
163 defer grpcSBIServer.Stop()
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400164
khenaidood948f772021-08-11 17:49:24 -0400165 // In the case of a restart, let's wait until all the registered adapters are connected to the Core
166 // before starting the grpc server that handles NBI requests.
167 err = adapterMgr.WaitUntilConnectionsToAdaptersAreUp(ctx, cf.ConnectionRetryInterval)
168 if err != nil {
169 logger.Fatalw(ctx, "failure-connecting-to-adapters", log.Fields{"error": err})
170 }
171
172 // Create the NBI gRPC server
173 grpcNBIServer := grpcserver.NewGrpcServer(cf.GrpcNBIAddress, nil, false, probe.GetProbeFromContext(ctx))
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530174
175 //Register the 'Extension' service on this gRPC server
khenaidood948f772021-08-11 17:49:24 -0400176 addGRPCExtensionService(ctx, grpcNBIServer, device.GetNewExtensionManager(deviceMgr))
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530177
khenaidood948f772021-08-11 17:49:24 -0400178 go startGrpcNbiService(ctx, grpcNBIServer, grpcNBIService, api.NewAPIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
179 defer grpcNBIServer.Stop()
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400180
181 // wait for core to be stopped, via Stop() or context cancellation, before running deferred functions
182 <-ctx.Done()
183}
184
185// Stop brings down core services
186func (core *Core) Stop() {
khenaidood948f772021-08-11 17:49:24 -0400187 core.Shutdown()
188 <-core.Stopped
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400189}
190
khenaidood948f772021-08-11 17:49:24 -0400191// startGrpcSbiService creates the grpc core service handlers, registers it to the grpc server and starts the server
192func startGrpcSbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler core.CoreServiceServer) {
193 logger.Infow(ctx, "starting-grpc-sbi-service", log.Fields{"service": serviceName})
194
195 server.AddService(func(server *grpc.Server) { core.RegisterCoreServiceServer(server, handler) })
196 logger.Infow(ctx, "grpc-sbi-service-added", log.Fields{"service": serviceName})
197
198 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
199 logger.Infow(ctx, "grpc-sbi-server-started", log.Fields{"service": serviceName})
200 server.Start(ctx)
201 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
202}
203
204// startGrpcNbiService creates the grpc NBI service handlers, registers it to the grpc server and starts the server
205func startGrpcNbiService(ctx context.Context, server *grpcserver.GrpcServer, serviceName string, handler voltha.VolthaServiceServer) {
206 logger.Infow(ctx, "starting-grpc-nbi-service", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400207
208 server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) })
khenaidood948f772021-08-11 17:49:24 -0400209 logger.Infow(ctx, "grpc-nbi-service-added-and-started", log.Fields{"service": serviceName})
khenaidoob9203542018-09-17 22:56:37 -0400210
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400211 // Note that there is a small window here in which the core could return its status as ready,
212 // when it really isn't. This is unlikely to cause issues, as the delay is incredibly short.
213 server.Start(ctx)
khenaidoob9203542018-09-17 22:56:37 -0400214}
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530215
216func addGRPCExtensionService(ctx context.Context, server *grpcserver.GrpcServer, handler extension.ExtensionServer) {
217 logger.Info(ctx, "extension-grpc-server-created")
218
219 server.AddService(func(server *grpc.Server) {
220 extension.RegisterExtensionServer(server, handler)
221 })
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530222}