blob: 43e990357fba0814fdad5d135c51047cbb593fea [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoob9203542018-09-17 22:56:37 -040017package core
18
19import (
20 "context"
npujar1d86a522019-11-14 17:11:16 +053021 "time"
22
sbarbari17d7e222019-11-05 10:02:29 -050023 "github.com/opencord/voltha-go/db/model"
khenaidoob9203542018-09-17 22:56:37 -040024 "github.com/opencord/voltha-go/rw_core/config"
Kent Hagerman2b216042020-04-03 18:28:56 -040025 "github.com/opencord/voltha-go/rw_core/core/adapter"
26 "github.com/opencord/voltha-go/rw_core/core/api"
27 "github.com/opencord/voltha-go/rw_core/core/device"
Maninderdfadc982020-10-28 14:04:33 +053028 conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
29 grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc"
30 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
31 "github.com/opencord/voltha-lib-go/v4/pkg/log"
32 "github.com/opencord/voltha-lib-go/v4/pkg/probe"
Salman Siddiqui1cf95042020-11-19 00:42:56 +053033 "github.com/opencord/voltha-protos/v4/go/extension"
Maninderdfadc982020-10-28 14:04:33 +053034 "github.com/opencord/voltha-protos/v4/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040035 "google.golang.org/grpc"
khenaidoob9203542018-09-17 22:56:37 -040036)
37
npujar1d86a522019-11-14 17:11:16 +053038// Core represent read,write core attributes
khenaidoob9203542018-09-17 22:56:37 -040039type Core struct {
Kent Hagerman2f0d0552020-04-23 17:28:52 -040040 shutdown context.CancelFunc
41 stopped chan struct{}
khenaidoob9203542018-09-17 22:56:37 -040042}
43
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080044const (
45 adapterMessageBus = "adapter-message-bus"
46 clusterMessageBus = "cluster-message-bus"
47)
48
npujar1d86a522019-11-14 17:11:16 +053049// NewCore creates instance of rw core
Kent Hagerman2f0d0552020-04-23 17:28:52 -040050func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -070051 // If the context has a probe then fetch it and register our services
Kent Hagerman2f0d0552020-04-23 17:28:52 -040052 if p := probe.GetProbeFromContext(ctx); p != nil {
53 p.RegisterService(
Rohan Agrawal31f21802020-06-12 05:38:46 +000054 ctx,
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080055 adapterMessageBus,
Kent Hagerman2f0d0552020-04-23 17:28:52 -040056 "kv-store",
57 "adapter-manager",
khenaidoo7585a962021-06-10 16:15:38 -040058 "device-manager",
59 "logical-device-manager",
Kent Hagerman2f0d0552020-04-23 17:28:52 -040060 "grpc-service",
khenaidoo5e4fca32021-05-12 16:02:23 -040061 "adapter-request-handler",
khenaidoob9203542018-09-17 22:56:37 -040062 )
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080063
64 if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
65 p.RegisterService(
66 ctx,
67 clusterMessageBus,
68 )
69 }
khenaidoob9203542018-09-17 22:56:37 -040070 }
71
Kent Hagerman2f0d0552020-04-23 17:28:52 -040072 // new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
73 shutdownCtx, cancelCtx := context.WithCancel(ctx)
74
75 core := &Core{shutdown: cancelCtx, stopped: make(chan struct{})}
76 go core.start(shutdownCtx, id, cf)
77 return core
78}
79
80func (core *Core) start(ctx context.Context, id string, cf *config.RWCoreFlags) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000081 logger.Info(ctx, "starting-core-services", log.Fields{"coreId": id})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040082
83 // deferred functions are used to run cleanup
84 // failing partway will stop anything that's been started
85 defer close(core.stopped)
86 defer core.shutdown()
87
Himani Chawlab4c25912020-11-12 17:16:38 +053088 logger.Info(ctx, "starting-rw-core-components")
Kent Hagerman2f0d0552020-04-23 17:28:52 -040089
90 // setup kv client
Rohan Agrawal31f21802020-06-12 05:38:46 +000091 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
92 kvClient, err := newKVClient(ctx, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040093 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000094 logger.Fatal(ctx, err)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040095 }
Rohan Agrawalcf12f202020-08-03 04:42:01 +000096 defer stopKVClient(log.WithSpanFromContext(context.Background(), ctx), kvClient)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040097
98 // sync logging config with kv store
Rohan Agrawal31f21802020-06-12 05:38:46 +000099 cm := conf.NewConfigManager(ctx, kvClient, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400100 go conf.StartLogLevelConfigProcessing(cm, ctx)
Girish Kumarf8d4f8d2020-08-18 11:45:30 +0000101 go conf.StartLogFeaturesConfigProcessing(cm, ctx)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400102
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700103 backend := cm.Backend
104 backend.LivenessChannelInterval = cf.LiveProbeInterval / 2
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400105
106 // wait until connection to KV Store is up
107 if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil {
Himani Chawlab4c25912020-11-12 17:16:38 +0530108 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400109 }
110 go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
111
112 // create kafka client
113 kafkaClient := kafka.NewSaramaClient(
Neha Sharmad1387da2020-05-07 20:07:28 +0000114 kafka.Address(cf.KafkaAdapterAddress),
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400115 kafka.ConsumerType(kafka.GroupCustomer),
116 kafka.ProducerReturnOnErrors(true),
117 kafka.ProducerReturnOnSuccess(true),
118 kafka.ProducerMaxRetries(6),
119 kafka.NumPartitions(3),
120 kafka.ConsumerGroupName(id),
121 kafka.ConsumerGroupPrefix(id),
122 kafka.AutoCreateTopic(true),
123 kafka.ProducerFlushFrequency(5),
124 kafka.ProducerRetryBackoff(time.Millisecond*30),
125 kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
126 )
Himani Chawlab4c25912020-11-12 17:16:38 +0530127
128 // create kafka client for events
129 kafkaClientEvent := kafka.NewSaramaClient(
130 kafka.Address(cf.KafkaClusterAddress),
131 kafka.ProducerReturnOnErrors(true),
132 kafka.ProducerReturnOnSuccess(true),
133 kafka.ProducerMaxRetries(6),
134 kafka.ProducerRetryBackoff(time.Millisecond*30),
135 kafka.AutoCreateTopic(true),
136 kafka.MetadatMaxRetries(15),
137 )
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800138
Himani Chawlab4c25912020-11-12 17:16:38 +0530139 // create event proxy
khenaidoo5e4fca32021-05-12 16:02:23 -0400140 updateProbeClusterService := cf.KafkaAdapterAddress != cf.KafkaClusterAddress
141 eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval, updateProbeClusterService)
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800142 if err != nil {
143 logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
Himani Chawlab4c25912020-11-12 17:16:38 +0530144 return
145 }
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800146 if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
147 // if we're using a single kafka cluster we don't need two liveliness probes on the same cluster
148 go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
149 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530150
Himani Chawla606a4f02021-03-23 19:45:58 +0530151 defer stopEventProxy(ctx, kafkaClientEvent, eventProxy)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400152
Kent Hagermanf5a67352020-04-30 15:15:26 -0400153 // create kv path
154 dbPath := model.NewDBPath(backend)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400155
156 // load adapters & device types while other things are starting
Rohan Agrawal31f21802020-06-12 05:38:46 +0000157 adapterMgr := adapter.NewAdapterManager(ctx, dbPath, id, kafkaClient)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400158 go adapterMgr.Start(ctx)
159
160 // connect to kafka, then wait until reachable and publisher/consumer created
161 // core.kmp must be created before deviceMgr and adapterMgr
David Bainbridge9ae13132020-06-22 17:28:01 -0700162 kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400163 if err != nil {
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800164 logger.Warn(ctx, "failed-to-setup-kafka-adapter-proxy-connection")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400165 return
166 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000167 defer kmp.Stop(ctx)
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800168 go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval, adapterMessageBus)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400169
170 // create the core of the system, the device managers
171 endpointMgr := kafka.NewEndpointManager(backend)
Maninder0aabf0c2021-03-17 14:55:14 +0530172 deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf, id, eventProxy)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400173
khenaidoo7585a962021-06-10 16:15:38 -0400174 // Start the device manager to load the devices. Wait until it is completed to prevent multiple loading happening
175 // triggered by logicalDeviceMgr.Start(Ctx)
176 deviceMgr.Start(ctx)
177
178 // Start the logical device manager to load the logical devices.
179 logicalDeviceMgr.Start(ctx)
180
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400181 // register kafka RPC handler
khenaidoo5e4fca32021-05-12 16:02:23 -0400182 registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf, "adapter-request-handler")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400183
184 // start gRPC handler
Neha Sharmad1387da2020-05-07 20:07:28 +0000185 grpcServer := grpcserver.NewGrpcServer(cf.GrpcAddress, nil, false, probe.GetProbeFromContext(ctx))
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530186
187 //Register the 'Extension' service on this gRPC server
188 addGRPCExtensionService(ctx, grpcServer, device.GetNewExtensionManager(deviceMgr))
189
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400190 go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
191 defer grpcServer.Stop()
192
193 // wait for core to be stopped, via Stop() or context cancellation, before running deferred functions
194 <-ctx.Done()
195}
196
197// Stop brings down core services
198func (core *Core) Stop() {
199 core.shutdown()
200 <-core.stopped
201}
202
203// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
204func startGRPCService(ctx context.Context, server *grpcserver.GrpcServer, handler voltha.VolthaServiceServer) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000205 logger.Info(ctx, "grpc-server-created")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400206
207 server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) })
Rohan Agrawal31f21802020-06-12 05:38:46 +0000208 logger.Info(ctx, "grpc-service-added")
khenaidoob9203542018-09-17 22:56:37 -0400209
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700210 probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000211 logger.Info(ctx, "grpc-server-started")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400212 // Note that there is a small window here in which the core could return its status as ready,
213 // when it really isn't. This is unlikely to cause issues, as the delay is incredibly short.
214 server.Start(ctx)
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700215 probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
khenaidoob9203542018-09-17 22:56:37 -0400216}
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530217
218func addGRPCExtensionService(ctx context.Context, server *grpcserver.GrpcServer, handler extension.ExtensionServer) {
219 logger.Info(ctx, "extension-grpc-server-created")
220
221 server.AddService(func(server *grpc.Server) {
222 extension.RegisterExtensionServer(server, handler)
223 })
224
225}