blob: 2a207842ace0bc836d8783ffd88d8b01d48c834d [file] [log] [blame]
Stephane Barbariea75791c2019-01-24 10:58:06 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar03b018e2019-11-13 15:29:36 +053016
Stephane Barbariea75791c2019-01-24 10:58:06 -050017package core
18
19import (
20 "context"
Thomas Lee Se5a44012019-11-07 20:32:24 +053021 "fmt"
npujar03b018e2019-11-13 15:29:36 +053022
23 "time"
24
sbarbari17d7e222019-11-05 10:02:29 -050025 "github.com/opencord/voltha-go/db/model"
Stephane Barbariea75791c2019-01-24 10:58:06 -050026 "github.com/opencord/voltha-go/ro_core/config"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080027 "github.com/opencord/voltha-lib-go/v3/pkg/db"
28 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
29 grpcserver "github.com/opencord/voltha-lib-go/v3/pkg/grpc"
30 "github.com/opencord/voltha-lib-go/v3/pkg/log"
31 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
32 "github.com/opencord/voltha-protos/v3/go/voltha"
Stephane Barbariea75791c2019-01-24 10:58:06 -050033 "google.golang.org/grpc"
Divya Desai660dbba2019-10-16 07:06:49 +000034 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
Stephane Barbariea75791c2019-01-24 10:58:06 -050036)
37
npujar03b018e2019-11-13 15:29:36 +053038// Core holds all information of read only core service
Stephane Barbariea75791c2019-01-24 10:58:06 -050039type Core struct {
npujar03b018e2019-11-13 15:29:36 +053040 instanceID string
Stephane Barbariea75791c2019-01-24 10:58:06 -050041 genericMgr *ModelProxyManager
42 deviceMgr *DeviceManager
43 logicalDeviceMgr *LogicalDeviceManager
44 grpcServer *grpcserver.GrpcServer
45 grpcNBIAPIHandler *APIHandler
46 config *config.ROCoreFlags
47 clusterDataRoot model.Root
48 localDataRoot model.Root
49 clusterDataProxy *model.Proxy
50 localDataProxy *model.Proxy
51 exitChannel chan int
52 kvClient kvstore.Client
Girish Kumar91482642019-11-08 11:38:03 +000053 backend db.Backend
Stephane Barbariea75791c2019-01-24 10:58:06 -050054}
55
56func init() {
npujar03b018e2019-11-13 15:29:36 +053057 _, err := log.AddPackage(log.JSON, log.DebugLevel, nil)
58 if err != nil {
59 log.Errorw("unable-to-register-package-to-the-log-map", log.Fields{"error": err})
60 }
Stephane Barbariea75791c2019-01-24 10:58:06 -050061}
62
npujar03b018e2019-11-13 15:29:36 +053063// NewCore instantiates core service parameters
Thomas Lee Se5a44012019-11-07 20:32:24 +053064func NewCore(ctx context.Context, id string, cf *config.ROCoreFlags, kvClient kvstore.Client) *Core {
Stephane Barbariea75791c2019-01-24 10:58:06 -050065 var core Core
npujar03b018e2019-11-13 15:29:36 +053066 core.instanceID = id
Stephane Barbariea75791c2019-01-24 10:58:06 -050067 core.exitChannel = make(chan int, 1)
68 core.config = cf
69 core.kvClient = kvClient
70
Girish Kumar91482642019-11-08 11:38:03 +000071 // Configure backend to push Liveness Status at least every cf.LiveProbeInterval / 2 seconds
72 // so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive
73 livenessChannelInterval := cf.LiveProbeInterval / 2
74
Stephane Barbariea75791c2019-01-24 10:58:06 -050075 // Setup the KV store
76 // Do not call NewBackend constructor; it creates its own KV client
77 // Commented the backend for now until the issue between the model and the KV store
78 // is resolved.
Girish Kumar91482642019-11-08 11:38:03 +000079 core.backend = db.Backend{
80 Client: kvClient,
81 StoreType: cf.KVStoreType,
82 Host: cf.KVStoreHost,
83 Port: cf.KVStorePort,
84 Timeout: cf.KVStoreTimeout,
85 LivenessChannelInterval: livenessChannelInterval,
86 PathPrefix: "service/voltha"}
87 core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &core.backend)
88 core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &core.backend)
Thomas Lee Se5a44012019-11-07 20:32:24 +053089
Stephane Barbariea75791c2019-01-24 10:58:06 -050090 return &core
91}
92
Divya Desai660dbba2019-10-16 07:06:49 +000093// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
94func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval time.Duration) error {
95 log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
96 "port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
97
98 // Get timeout in seconds with 1 second set as minimum
99 timeout := int(core.config.CoreTimeout.Seconds())
100 if timeout < 1 {
101 timeout = 1
102 }
103 count := 0
104 for {
105 if !core.kvClient.IsConnectionUp(timeout) {
106 log.Info("KV-store-unreachable")
107 if maxRetries != -1 {
108 if count >= maxRetries {
109 return status.Error(codes.Unavailable, "kv store unreachable")
110 }
111 }
npujar03b018e2019-11-13 15:29:36 +0530112 count++
Divya Desai660dbba2019-10-16 07:06:49 +0000113 // Take a nap before retrying
114 time.Sleep(retryInterval)
115 log.Infow("retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
116
117 } else {
118 break
119 }
120 }
121 log.Info("KV-store-reachable")
122 return nil
123}
124
npujar03b018e2019-11-13 15:29:36 +0530125// Start will start core adapter services
Thomas Lee Se5a44012019-11-07 20:32:24 +0530126func (core *Core) Start(ctx context.Context) error {
127 var err error
npujar03b018e2019-11-13 15:29:36 +0530128 log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceID})
Divya Desai660dbba2019-10-16 07:06:49 +0000129
130 // Wait until connection to KV Store is up
131 if err := core.waitUntilKVStoreReachableOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
132 log.Fatal("Unable-to-connect-to-KV-store")
133 }
134
135 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
136
Thomas Lee Se5a44012019-11-07 20:32:24 +0530137 core.clusterDataProxy, err = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
138 if err != nil {
139 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
140 return fmt.Errorf("Failed to create cluster data proxy")
141 }
142 core.localDataProxy, err = core.localDataRoot.CreateProxy(context.Background(), "/", false)
143 if err != nil {
144 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
145 return fmt.Errorf("Failed to create local cluster data proxy")
146 }
Stephane Barbariea75791c2019-01-24 10:58:06 -0500147 core.genericMgr = newModelProxyManager(core.clusterDataProxy)
npujar03b018e2019-11-13 15:29:36 +0530148 core.deviceMgr = newDeviceManager(core.clusterDataProxy, core.instanceID)
Stephane Barbariea75791c2019-01-24 10:58:06 -0500149 core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.clusterDataProxy)
150 go core.startDeviceManager(ctx)
151 go core.startLogicalDeviceManager(ctx)
152 go core.startGRPCService(ctx)
Girish Kumar91482642019-11-08 11:38:03 +0000153 go core.monitorKvstoreLiveness(ctx)
Stephane Barbariea75791c2019-01-24 10:58:06 -0500154
155 log.Info("adaptercore-started")
Thomas Lee Se5a44012019-11-07 20:32:24 +0530156 return nil
Stephane Barbariea75791c2019-01-24 10:58:06 -0500157}
158
npujar03b018e2019-11-13 15:29:36 +0530159// Stop will stop core services
Stephane Barbariea75791c2019-01-24 10:58:06 -0500160func (core *Core) Stop(ctx context.Context) {
161 log.Info("stopping-adaptercore")
David Bainbridgef794fc52019-10-03 22:37:12 +0000162 if core.exitChannel != nil {
163 core.exitChannel <- 1
164 }
Stephane Barbariea75791c2019-01-24 10:58:06 -0500165 // Stop all the started services
David Bainbridgef794fc52019-10-03 22:37:12 +0000166 if core.grpcServer != nil {
167 core.grpcServer.Stop()
168 }
169 if core.logicalDeviceMgr != nil {
170 core.logicalDeviceMgr.stop(ctx)
171 }
172 if core.deviceMgr != nil {
173 core.deviceMgr.stop(ctx)
174 }
Stephane Barbariea75791c2019-01-24 10:58:06 -0500175 log.Info("adaptercore-stopped")
176}
177
178//startGRPCService creates the grpc service handlers, registers it to the grpc server
179// and starts the server
180func (core *Core) startGRPCService(ctx context.Context) {
181 // create an insecure gserver server
Girish Kumar91482642019-11-08 11:38:03 +0000182 core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
Stephane Barbariea75791c2019-01-24 10:58:06 -0500183 log.Info("grpc-server-created")
184
185 core.grpcNBIAPIHandler = NewAPIHandler(core.genericMgr, core.deviceMgr, core.logicalDeviceMgr)
186 core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
187 // Create a function to register the core GRPC service with the GRPC server
188 f := func(gs *grpc.Server) {
189 voltha.RegisterVolthaServiceServer(
190 gs,
191 core.grpcNBIAPIHandler,
192 )
193 }
194
195 core.grpcServer.AddService(f)
196 log.Info("grpc-service-added")
197
Hardik Windlassdc63dde2019-09-30 07:15:13 +0000198 /*
199 * Start the GRPC server
200 *
201 * This is a bit sub-optimal here as the grpcServer.Start call does not return (blocks)
202 * until something fails, but we want to send a "start" status update. As written this
203 * means that we are actually sending the "start" status update before the server is
204 * started, which means it is possible that the status is "running" before it actually is.
205 *
206 * This means that there is a small window in which the core could return its status as
207 * ready, when it really isn't.
208 */
209 probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
210
Stephane Barbariea75791c2019-01-24 10:58:06 -0500211 // Start the server
Stephane Barbariea75791c2019-01-24 10:58:06 -0500212 log.Info("grpc-server-started")
Hardik Windlassdc63dde2019-09-30 07:15:13 +0000213 core.grpcServer.Start(context.Background())
214
215 probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
Stephane Barbariea75791c2019-01-24 10:58:06 -0500216}
217
218func (core *Core) startDeviceManager(ctx context.Context) {
219 // TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
220 // callbacks. For now, until the model is ready, devicemanager will keep a reference to the
221 // logicaldevicemanager to initiate the creation of logical devices
222 log.Info("starting-DeviceManager")
223 core.deviceMgr.start(ctx, core.logicalDeviceMgr)
224 log.Info("started-DeviceManager")
225}
226
227func (core *Core) startLogicalDeviceManager(ctx context.Context) {
228 log.Info("starting-Logical-DeviceManager")
229 core.logicalDeviceMgr.start(ctx)
230 log.Info("started-Logical-DeviceManager")
231}
Girish Kumar91482642019-11-08 11:38:03 +0000232
233/*
234* Thread to monitor kvstore Liveness (connection status)
235*
236* This function constantly monitors Liveness State of kvstore as reported
237* periodically by backend and updates the Status of kv-store service registered
238* with ro_core probe.
239*
240* If no liveness event has been seen within a timeout, then the thread will make
241* an trigger a "liveness" check, which will in turn trigger a liveness event on
242* the liveness channel, true or false depending on whether the attempt succeeded.
243*
244* The gRPC server in turn monitors the state of the readiness probe and will
245* start issuing UNAVAILABLE response while the probe is not ready.
246 */
247func (core *Core) monitorKvstoreLiveness(ctx context.Context) {
248 log.Info("start-monitoring-kvstore-liveness")
249
250 // Instruct backend to create Liveness channel for transporting state updates
251 livenessChannel := core.backend.EnableLivenessChannel()
252
253 log.Debug("enabled-kvstore-liveness-channel")
254
255 // Default state for kvstore is not alive
256 timeout := core.config.NotLiveProbeInterval
257 for {
258 timeoutTimer := time.NewTimer(timeout)
259 select {
260
261 case liveness := <-livenessChannel:
262 log.Debugw("received-liveness-change-notification", log.Fields{"liveness": liveness})
263
264 if !liveness {
265 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
266
267 if core.grpcServer != nil {
268 log.Info("kvstore-set-server-notready")
269 }
270
271 timeout = core.config.NotLiveProbeInterval
272 } else {
273 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
274
275 if core.grpcServer != nil {
276 log.Info("kvstore-set-server-ready")
277 }
278
279 timeout = core.config.LiveProbeInterval
280 }
281
282 if !timeoutTimer.Stop() {
283 <-timeoutTimer.C
284 }
285
286 case <-timeoutTimer.C:
287 log.Info("kvstore-perform-liveness-check-on-timeout")
288
289 // Trigger Liveness check if no liveness update received within the timeout period.
290 // The Liveness check will push Live state to same channel which this routine is
291 // reading and processing. This, do it asynchronously to avoid blocking for
292 // backend response and avoid any possibility of deadlock
293 go core.backend.PerformLivenessCheck(core.config.KVStoreTimeout)
294 }
295 }
296}