blob: 01543ff248cb0b7e21b4c535be7a1ddaf9b70cad [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoob9203542018-09-17 22:56:37 -040017package core
18
19import (
20 "context"
npujar1d86a522019-11-14 17:11:16 +053021 "time"
22
sbarbari17d7e222019-11-05 10:02:29 -050023 "github.com/opencord/voltha-go/db/model"
khenaidoob9203542018-09-17 22:56:37 -040024 "github.com/opencord/voltha-go/rw_core/config"
Kent Hagerman2b216042020-04-03 18:28:56 -040025 "github.com/opencord/voltha-go/rw_core/core/adapter"
26 "github.com/opencord/voltha-go/rw_core/core/api"
27 "github.com/opencord/voltha-go/rw_core/core/device"
Kent Hagerman2f0d0552020-04-23 17:28:52 -040028 conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080029 grpcserver "github.com/opencord/voltha-lib-go/v3/pkg/grpc"
30 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
32 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
33 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040034 "google.golang.org/grpc"
khenaidoob9203542018-09-17 22:56:37 -040035)
36
npujar1d86a522019-11-14 17:11:16 +053037// Core represent read,write core attributes
khenaidoob9203542018-09-17 22:56:37 -040038type Core struct {
Kent Hagerman2f0d0552020-04-23 17:28:52 -040039 shutdown context.CancelFunc
40 stopped chan struct{}
khenaidoob9203542018-09-17 22:56:37 -040041}
42
npujar1d86a522019-11-14 17:11:16 +053043// NewCore creates instance of rw core
Kent Hagerman2f0d0552020-04-23 17:28:52 -040044func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -070045 // If the context has a probe then fetch it and register our services
Kent Hagerman2f0d0552020-04-23 17:28:52 -040046 if p := probe.GetProbeFromContext(ctx); p != nil {
47 p.RegisterService(
Rohan Agrawal31f21802020-06-12 05:38:46 +000048 ctx,
Kent Hagerman2f0d0552020-04-23 17:28:52 -040049 "message-bus",
50 "kv-store",
51 "adapter-manager",
52 "grpc-service",
khenaidoob9203542018-09-17 22:56:37 -040053 )
54 }
55
Kent Hagerman2f0d0552020-04-23 17:28:52 -040056 // new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
57 shutdownCtx, cancelCtx := context.WithCancel(ctx)
58
59 core := &Core{shutdown: cancelCtx, stopped: make(chan struct{})}
60 go core.start(shutdownCtx, id, cf)
61 return core
62}
63
64func (core *Core) start(ctx context.Context, id string, cf *config.RWCoreFlags) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000065 logger.Info(ctx, "starting-core-services", log.Fields{"coreId": id})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040066
67 // deferred functions are used to run cleanup
68 // failing partway will stop anything that's been started
69 defer close(core.stopped)
70 defer core.shutdown()
71
Rohan Agrawal31f21802020-06-12 05:38:46 +000072 logger.Info(ctx, "Starting RW Core components")
Kent Hagerman2f0d0552020-04-23 17:28:52 -040073
74 // setup kv client
Rohan Agrawal31f21802020-06-12 05:38:46 +000075 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
76 kvClient, err := newKVClient(ctx, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040077 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000078 logger.Fatal(ctx, err)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040079 }
Rohan Agrawalcf12f202020-08-03 04:42:01 +000080 defer stopKVClient(log.WithSpanFromContext(context.Background(), ctx), kvClient)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040081
82 // sync logging config with kv store
Rohan Agrawal31f21802020-06-12 05:38:46 +000083 cm := conf.NewConfigManager(ctx, kvClient, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040084 go conf.StartLogLevelConfigProcessing(cm, ctx)
85
serkant.uluderya8ff291d2020-05-20 00:58:00 -070086 backend := cm.Backend
87 backend.LivenessChannelInterval = cf.LiveProbeInterval / 2
Kent Hagerman2f0d0552020-04-23 17:28:52 -040088
89 // wait until connection to KV Store is up
90 if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000091 logger.Fatal(ctx, "Unable-to-connect-to-KV-store")
Kent Hagerman2f0d0552020-04-23 17:28:52 -040092 }
93 go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
94
95 // create kafka client
96 kafkaClient := kafka.NewSaramaClient(
Neha Sharmad1387da2020-05-07 20:07:28 +000097 kafka.Address(cf.KafkaAdapterAddress),
Kent Hagerman2f0d0552020-04-23 17:28:52 -040098 kafka.ConsumerType(kafka.GroupCustomer),
99 kafka.ProducerReturnOnErrors(true),
100 kafka.ProducerReturnOnSuccess(true),
101 kafka.ProducerMaxRetries(6),
102 kafka.NumPartitions(3),
103 kafka.ConsumerGroupName(id),
104 kafka.ConsumerGroupPrefix(id),
105 kafka.AutoCreateTopic(true),
106 kafka.ProducerFlushFrequency(5),
107 kafka.ProducerRetryBackoff(time.Millisecond*30),
108 kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
109 )
110 // defer kafkaClient.Stop()
111
Kent Hagermanf5a67352020-04-30 15:15:26 -0400112 // create kv path
113 dbPath := model.NewDBPath(backend)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400114
115 // load adapters & device types while other things are starting
Rohan Agrawal31f21802020-06-12 05:38:46 +0000116 adapterMgr := adapter.NewAdapterManager(ctx, dbPath, id, kafkaClient)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400117 go adapterMgr.Start(ctx)
118
119 // connect to kafka, then wait until reachable and publisher/consumer created
120 // core.kmp must be created before deviceMgr and adapterMgr
David Bainbridge9ae13132020-06-22 17:28:01 -0700121 kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400122 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000123 logger.Warn(ctx, "Failed to setup kafka connection")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400124 return
125 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000126 defer kmp.Stop(ctx)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400127 go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
128
129 // create the core of the system, the device managers
130 endpointMgr := kafka.NewEndpointManager(backend)
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700131 deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400132
133 // register kafka RPC handler
Rohan Agrawal31f21802020-06-12 05:38:46 +0000134 registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400135
136 // start gRPC handler
Neha Sharmad1387da2020-05-07 20:07:28 +0000137 grpcServer := grpcserver.NewGrpcServer(cf.GrpcAddress, nil, false, probe.GetProbeFromContext(ctx))
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400138 go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
139 defer grpcServer.Stop()
140
141 // wait for core to be stopped, via Stop() or context cancellation, before running deferred functions
142 <-ctx.Done()
143}
144
145// Stop brings down core services
146func (core *Core) Stop() {
147 core.shutdown()
148 <-core.stopped
149}
150
151// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
152func startGRPCService(ctx context.Context, server *grpcserver.GrpcServer, handler voltha.VolthaServiceServer) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000153 logger.Info(ctx, "grpc-server-created")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400154
155 server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) })
Rohan Agrawal31f21802020-06-12 05:38:46 +0000156 logger.Info(ctx, "grpc-service-added")
khenaidoob9203542018-09-17 22:56:37 -0400157
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700158 probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000159 logger.Info(ctx, "grpc-server-started")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400160 // Note that there is a small window here in which the core could return its status as ready,
161 // when it really isn't. This is unlikely to cause issues, as the delay is incredibly short.
162 server.Start(ctx)
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700163 probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
khenaidoob9203542018-09-17 22:56:37 -0400164}