blob: 0dbecc844009f7fedd1eaf88747b46b11004a352 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoob9203542018-09-17 22:56:37 -040017package core
18
19import (
20 "context"
Kent Hagerman2f0d0552020-04-23 17:28:52 -040021 "strconv"
npujar1d86a522019-11-14 17:11:16 +053022 "time"
23
sbarbari17d7e222019-11-05 10:02:29 -050024 "github.com/opencord/voltha-go/db/model"
khenaidoob9203542018-09-17 22:56:37 -040025 "github.com/opencord/voltha-go/rw_core/config"
Kent Hagerman2b216042020-04-03 18:28:56 -040026 "github.com/opencord/voltha-go/rw_core/core/adapter"
27 "github.com/opencord/voltha-go/rw_core/core/api"
28 "github.com/opencord/voltha-go/rw_core/core/device"
Kent Hagerman2f0d0552020-04-23 17:28:52 -040029 conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080030 "github.com/opencord/voltha-lib-go/v3/pkg/db"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080031 grpcserver "github.com/opencord/voltha-lib-go/v3/pkg/grpc"
32 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
33 "github.com/opencord/voltha-lib-go/v3/pkg/log"
34 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
35 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040036 "google.golang.org/grpc"
khenaidoob9203542018-09-17 22:56:37 -040037)
38
npujar1d86a522019-11-14 17:11:16 +053039// Core represent read,write core attributes
khenaidoob9203542018-09-17 22:56:37 -040040type Core struct {
Kent Hagerman2f0d0552020-04-23 17:28:52 -040041 shutdown context.CancelFunc
42 stopped chan struct{}
khenaidoob9203542018-09-17 22:56:37 -040043}
44
npujar1d86a522019-11-14 17:11:16 +053045// NewCore creates instance of rw core
Kent Hagerman2f0d0552020-04-23 17:28:52 -040046func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -070047 // If the context has a probe then fetch it and register our services
Kent Hagerman2f0d0552020-04-23 17:28:52 -040048 if p := probe.GetProbeFromContext(ctx); p != nil {
49 p.RegisterService(
50 "message-bus",
51 "kv-store",
52 "adapter-manager",
53 "grpc-service",
khenaidoob9203542018-09-17 22:56:37 -040054 )
55 }
56
Kent Hagerman2f0d0552020-04-23 17:28:52 -040057 // new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
58 shutdownCtx, cancelCtx := context.WithCancel(ctx)
59
60 core := &Core{shutdown: cancelCtx, stopped: make(chan struct{})}
61 go core.start(shutdownCtx, id, cf)
62 return core
63}
64
65func (core *Core) start(ctx context.Context, id string, cf *config.RWCoreFlags) {
66 logger.Info("starting-core-services", log.Fields{"coreId": id})
67
68 // deferred functions are used to run cleanup
69 // failing partway will stop anything that's been started
70 defer close(core.stopped)
71 defer core.shutdown()
72
73 logger.Info("Starting RW Core components")
74
75 // setup kv client
76 logger.Debugw("create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
77 kvClient, err := newKVClient(cf.KVStoreType, cf.KVStoreHost+":"+strconv.Itoa(cf.KVStorePort), cf.KVStoreTimeout)
78 if err != nil {
79 logger.Fatal(err)
80 }
81 defer stopKVClient(context.Background(), kvClient)
82
83 // sync logging config with kv store
84 cm := conf.NewConfigManager(kvClient, cf.KVStoreType, cf.KVStoreHost, cf.KVStorePort, cf.KVStoreTimeout)
85 go conf.StartLogLevelConfigProcessing(cm, ctx)
86
87 backend := &db.Backend{
88 Client: kvClient,
89 StoreType: cf.KVStoreType,
90 Host: cf.KVStoreHost,
91 Port: cf.KVStorePort,
92 Timeout: cf.KVStoreTimeout,
93 // Configure backend to push Liveness Status at least every (cf.LiveProbeInterval / 2) seconds
94 // so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive
95 LivenessChannelInterval: cf.LiveProbeInterval / 2,
96 PathPrefix: cf.KVStoreDataPrefix,
97 }
98
99 // wait until connection to KV Store is up
100 if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil {
101 logger.Fatal("Unable-to-connect-to-KV-store")
102 }
103 go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
104
105 // create kafka client
106 kafkaClient := kafka.NewSaramaClient(
107 kafka.Host(cf.KafkaAdapterHost),
108 kafka.Port(cf.KafkaAdapterPort),
109 kafka.ConsumerType(kafka.GroupCustomer),
110 kafka.ProducerReturnOnErrors(true),
111 kafka.ProducerReturnOnSuccess(true),
112 kafka.ProducerMaxRetries(6),
113 kafka.NumPartitions(3),
114 kafka.ConsumerGroupName(id),
115 kafka.ConsumerGroupPrefix(id),
116 kafka.AutoCreateTopic(true),
117 kafka.ProducerFlushFrequency(5),
118 kafka.ProducerRetryBackoff(time.Millisecond*30),
119 kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
120 )
121 // defer kafkaClient.Stop()
122
123 // create kv proxy
124 proxy := model.NewProxy(backend, "/")
125
126 // load adapters & device types while other things are starting
127 adapterMgr := adapter.NewAdapterManager(proxy, id, kafkaClient)
128 go adapterMgr.Start(ctx)
129
130 // connect to kafka, then wait until reachable and publisher/consumer created
131 // core.kmp must be created before deviceMgr and adapterMgr
132 kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterHost, cf.KafkaAdapterPort, cf.CoreTopic, cf.AffinityRouterTopic, cf.ConnectionRetryInterval)
133 if err != nil {
134 logger.Warn("Failed to setup kafka connection")
135 return
136 }
137 defer kmp.Stop()
138 go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
139
140 // create the core of the system, the device managers
141 endpointMgr := kafka.NewEndpointManager(backend)
142 deviceMgr, logicalDeviceMgr := device.NewManagers(proxy, adapterMgr, kmp, endpointMgr, cf.CorePairTopic, id, cf.DefaultCoreTimeout)
143
144 // register kafka RPC handler
145 registerAdapterRequestHandlers(kmp, deviceMgr, adapterMgr, cf.CoreTopic, cf.CorePairTopic)
146
147 // start gRPC handler
148 grpcServer := grpcserver.NewGrpcServer(cf.GrpcHost, cf.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
149 go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
150 defer grpcServer.Stop()
151
152 // wait for core to be stopped, via Stop() or context cancellation, before running deferred functions
153 <-ctx.Done()
154}
155
156// Stop brings down core services
157func (core *Core) Stop() {
158 core.shutdown()
159 <-core.stopped
160}
161
162// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
163func startGRPCService(ctx context.Context, server *grpcserver.GrpcServer, handler voltha.VolthaServiceServer) {
164 logger.Info("grpc-server-created")
165
166 server.AddService(func(gs *grpc.Server) { voltha.RegisterVolthaServiceServer(gs, handler) })
Girish Kumarf56a4682020-03-20 20:07:46 +0000167 logger.Info("grpc-service-added")
khenaidoob9203542018-09-17 22:56:37 -0400168
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700169 probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
Girish Kumarf56a4682020-03-20 20:07:46 +0000170 logger.Info("grpc-server-started")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400171 // Note that there is a small window here in which the core could return its status as ready,
172 // when it really isn't. This is unlikely to cause issues, as the delay is incredibly short.
173 server.Start(ctx)
David K. Bainbridgeb4a9ab02019-09-20 15:12:16 -0700174 probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
khenaidoob9203542018-09-17 22:56:37 -0400175}