blob: d576a6e4d3086000611789ae647ee03d8b5b3ec2 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
khenaidoo54e0ddf2019-02-27 16:21:33 -050020 "errors"
21 "fmt"
khenaidoob9203542018-09-17 22:56:37 -040022 grpcserver "github.com/opencord/voltha-go/common/grpc"
23 "github.com/opencord/voltha-go/common/log"
Richard Jankowskie4d77662018-10-17 13:53:21 -040024 "github.com/opencord/voltha-go/db/kvstore"
khenaidoob9203542018-09-17 22:56:37 -040025 "github.com/opencord/voltha-go/db/model"
26 "github.com/opencord/voltha-go/kafka"
27 "github.com/opencord/voltha-go/protos/voltha"
28 "github.com/opencord/voltha-go/rw_core/config"
29 "google.golang.org/grpc"
khenaidoob9203542018-09-17 22:56:37 -040030)
31
32type Core struct {
33 instanceId string
34 deviceMgr *DeviceManager
35 logicalDeviceMgr *LogicalDeviceManager
36 grpcServer *grpcserver.GrpcServer
Richard Jankowskidbab94a2018-12-06 16:20:25 -050037 grpcNBIAPIHandler *APIHandler
khenaidoo21d51152019-02-01 13:48:37 -050038 adapterMgr *AdapterManager
khenaidoob9203542018-09-17 22:56:37 -040039 config *config.RWCoreFlags
khenaidoo43c82122018-11-22 18:38:28 -050040 kmp *kafka.InterContainerProxy
khenaidoo92e62c52018-10-03 14:02:54 -040041 clusterDataRoot model.Root
42 localDataRoot model.Root
khenaidoob9203542018-09-17 22:56:37 -040043 clusterDataProxy *model.Proxy
44 localDataProxy *model.Proxy
45 exitChannel chan int
Richard Jankowskie4d77662018-10-17 13:53:21 -040046 kvClient kvstore.Client
khenaidoo43c82122018-11-22 18:38:28 -050047 kafkaClient kafka.Client
khenaidoo54e0ddf2019-02-27 16:21:33 -050048 coreMembershipRegistered bool
khenaidoob9203542018-09-17 22:56:37 -040049}
50
51func init() {
52 log.AddPackage(log.JSON, log.WarnLevel, nil)
53}
54
khenaidoo43c82122018-11-22 18:38:28 -050055func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client, kafkaClient kafka.Client) *Core {
khenaidoob9203542018-09-17 22:56:37 -040056 var core Core
57 core.instanceId = id
58 core.exitChannel = make(chan int, 1)
59 core.config = cf
Richard Jankowskie4d77662018-10-17 13:53:21 -040060 core.kvClient = kvClient
khenaidoo43c82122018-11-22 18:38:28 -050061 core.kafkaClient = kafkaClient
Richard Jankowskie4d77662018-10-17 13:53:21 -040062
63 // Setup the KV store
64 // Do not call NewBackend constructor; it creates its own KV client
khenaidoo91ecfd62018-11-04 17:13:42 -050065 // Commented the backend for now until the issue between the model and the KV store
66 // is resolved.
khenaidoo7ccedd52018-12-14 16:48:54 -050067 backend := model.Backend{
68 Client: kvClient,
69 StoreType: cf.KVStoreType,
70 Host: cf.KVStoreHost,
71 Port: cf.KVStorePort,
72 Timeout: cf.KVStoreTimeout,
khenaidoo9cdc1a62019-01-24 21:57:40 -050073 PathPrefix: cf.KVStoreDataPrefix}
khenaidoo7ccedd52018-12-14 16:48:54 -050074 core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
75 core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
khenaidoo43c82122018-11-22 18:38:28 -050076 core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
77 core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
khenaidoo54e0ddf2019-02-27 16:21:33 -050078 core.coreMembershipRegistered = false
khenaidoob9203542018-09-17 22:56:37 -040079 return &core
80}
81
82func (core *Core) Start(ctx context.Context) {
khenaidoo19374072018-12-11 11:05:15 -050083 log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
khenaidoo9cdc1a62019-01-24 21:57:40 -050084 if err := core.startKafkaMessagingProxy(ctx); err != nil {
85 log.Fatal("Failure-starting-kafkaMessagingProxy")
86 }
khenaidoob9203542018-09-17 22:56:37 -040087 log.Info("values", log.Fields{"kmp": core.kmp})
khenaidoo21d51152019-02-01 13:48:37 -050088 core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
89 core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
khenaidoo4d4802d2018-10-04 21:59:49 -040090 core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
khenaidoo54e0ddf2019-02-27 16:21:33 -050091
92 if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
khenaidoo9cdc1a62019-01-24 21:57:40 -050093 log.Fatal("Failure-registering-adapterRequestHandler")
94 }
khenaidoob9203542018-09-17 22:56:37 -040095 go core.startDeviceManager(ctx)
96 go core.startLogicalDeviceManager(ctx)
97 go core.startGRPCService(ctx)
khenaidoo21d51152019-02-01 13:48:37 -050098 go core.startAdapterManager(ctx)
khenaidoob9203542018-09-17 22:56:37 -040099
khenaidoo19374072018-12-11 11:05:15 -0500100 log.Info("adaptercore-started")
khenaidoob9203542018-09-17 22:56:37 -0400101}
102
103func (core *Core) Stop(ctx context.Context) {
khenaidoo19374072018-12-11 11:05:15 -0500104 log.Info("stopping-adaptercore")
khenaidoob9203542018-09-17 22:56:37 -0400105 core.exitChannel <- 1
khenaidoo43c82122018-11-22 18:38:28 -0500106 // Stop all the started services
107 core.grpcServer.Stop()
108 core.logicalDeviceMgr.stop(ctx)
109 core.deviceMgr.stop(ctx)
110 core.kmp.Stop()
khenaidoo19374072018-12-11 11:05:15 -0500111 log.Info("adaptercore-stopped")
khenaidoob9203542018-09-17 22:56:37 -0400112}
113
khenaidoo92e62c52018-10-03 14:02:54 -0400114//startGRPCService creates the grpc service handlers, registers it to the grpc server
khenaidoob9203542018-09-17 22:56:37 -0400115// and starts the server
116func (core *Core) startGRPCService(ctx context.Context) {
117 // create an insecure gserver server
118 core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
119 log.Info("grpc-server-created")
120
khenaidoo54e0ddf2019-02-27 16:21:33 -0500121 //core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
122 core.grpcNBIAPIHandler = NewAPIHandler(core)
Richard Jankowskidbab94a2018-12-06 16:20:25 -0500123 core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
khenaidoob9203542018-09-17 22:56:37 -0400124 // Create a function to register the core GRPC service with the GRPC server
125 f := func(gs *grpc.Server) {
126 voltha.RegisterVolthaServiceServer(
127 gs,
Richard Jankowskidbab94a2018-12-06 16:20:25 -0500128 core.grpcNBIAPIHandler,
khenaidoob9203542018-09-17 22:56:37 -0400129 )
130 }
131
132 core.grpcServer.AddService(f)
133 log.Info("grpc-service-added")
134
135 // Start the server
136 core.grpcServer.Start(context.Background())
137 log.Info("grpc-server-started")
138}
139
140func (core *Core) startKafkaMessagingProxy(ctx context.Context) error {
141 log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
142 "port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
143 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500144 if core.kmp, err = kafka.NewInterContainerProxy(
145 kafka.InterContainerHost(core.config.KafkaAdapterHost),
146 kafka.InterContainerPort(core.config.KafkaAdapterPort),
147 kafka.MsgClient(core.kafkaClient),
khenaidoo79232702018-12-04 11:00:41 -0500148 kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic}),
149 kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic})); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400150 log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
151 return err
152 }
153
154 if err = core.kmp.Start(); err != nil {
155 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
156 return err
157 }
158
159 log.Info("kafka-messaging-proxy-created")
160 return nil
161}
162
khenaidoo54e0ddf2019-02-27 16:21:33 -0500163//func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
164// ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
165// ) error {
166// requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
167// core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
168// core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
169//
170// log.Info("request-handlers")
171// return nil
172//}
173
174func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
khenaidoo297cd252019-02-07 22:10:23 -0500175 ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
khenaidoo54e0ddf2019-02-27 16:21:33 -0500176) error {
khenaidoo297cd252019-02-07 22:10:23 -0500177 requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
178 core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
khenaidoob9203542018-09-17 22:56:37 -0400179
khenaidoo54e0ddf2019-02-27 16:21:33 -0500180 // Register the broadcast topic to handle any core-bound broadcast requests
181 if err := core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy); err != nil {
182 log.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": core.config.CoreTopic})
183 return err
184 }
185
186 log.Info("request-handler-registered")
khenaidoob9203542018-09-17 22:56:37 -0400187 return nil
188}
189
khenaidoo54e0ddf2019-02-27 16:21:33 -0500190
191func (core *Core) registerCoreTopic(ctx context.Context, coreTopicSuffix string) error {
192 // Sanity check - can only register once
193 if core.coreMembershipRegistered {
194 return errors.New("Can-only-set-once")
195 }
196
197 go func(coreTopicSuffix string) {
198 // Register the core-pair topic to handle core-bound requests aimed to the core pair
199 pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, coreTopicSuffix)}
200 // TODO: Set a retry here to ensure this subscription happens
201 if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
202 log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
203 }
204 // Update the CoreTopic to use by the adapter proxy
205 core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
206 }(coreTopicSuffix)
207
208 core.coreMembershipRegistered = true
209 log.Info("request-handlers-registered")
210 return nil
211}
212
213
khenaidoob9203542018-09-17 22:56:37 -0400214func (core *Core) startDeviceManager(ctx context.Context) {
215 // TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
216 // callbacks. For now, until the model is ready, devicemanager will keep a reference to the
217 // logicaldevicemanager to initiate the creation of logical devices
khenaidoo21d51152019-02-01 13:48:37 -0500218 log.Info("DeviceManager-Starting...")
khenaidoo4d4802d2018-10-04 21:59:49 -0400219 core.deviceMgr.start(ctx, core.logicalDeviceMgr)
khenaidoo21d51152019-02-01 13:48:37 -0500220 log.Info("DeviceManager-Started")
khenaidoob9203542018-09-17 22:56:37 -0400221}
222
223func (core *Core) startLogicalDeviceManager(ctx context.Context) {
khenaidoo21d51152019-02-01 13:48:37 -0500224 log.Info("Logical-DeviceManager-Starting...")
khenaidoo4d4802d2018-10-04 21:59:49 -0400225 core.logicalDeviceMgr.start(ctx)
khenaidoo21d51152019-02-01 13:48:37 -0500226 log.Info("Logical-DeviceManager-Started")
khenaidoob9203542018-09-17 22:56:37 -0400227}
khenaidoo21d51152019-02-01 13:48:37 -0500228
229func (core *Core) startAdapterManager(ctx context.Context) {
230 log.Info("Adapter-Manager-Starting...")
231 core.adapterMgr.start(ctx)
232 log.Info("Adapter-Manager-Started")
233}