blob: c81141b05aa36aa2f90d2a7f67db2f3313330e5f [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 grpcserver "github.com/opencord/voltha-go/common/grpc"
21 "github.com/opencord/voltha-go/common/log"
Richard Jankowskie4d77662018-10-17 13:53:21 -040022 "github.com/opencord/voltha-go/db/kvstore"
khenaidoob9203542018-09-17 22:56:37 -040023 "github.com/opencord/voltha-go/db/model"
24 "github.com/opencord/voltha-go/kafka"
25 "github.com/opencord/voltha-go/protos/voltha"
26 "github.com/opencord/voltha-go/rw_core/config"
27 "google.golang.org/grpc"
khenaidoob9203542018-09-17 22:56:37 -040028)
29
30type Core struct {
31 instanceId string
32 deviceMgr *DeviceManager
33 logicalDeviceMgr *LogicalDeviceManager
34 grpcServer *grpcserver.GrpcServer
Richard Jankowskidbab94a2018-12-06 16:20:25 -050035 grpcNBIAPIHandler *APIHandler
khenaidoo21d51152019-02-01 13:48:37 -050036 adapterMgr *AdapterManager
khenaidoob9203542018-09-17 22:56:37 -040037 config *config.RWCoreFlags
khenaidoo43c82122018-11-22 18:38:28 -050038 kmp *kafka.InterContainerProxy
khenaidoo92e62c52018-10-03 14:02:54 -040039 clusterDataRoot model.Root
40 localDataRoot model.Root
khenaidoob9203542018-09-17 22:56:37 -040041 clusterDataProxy *model.Proxy
42 localDataProxy *model.Proxy
43 exitChannel chan int
Richard Jankowskie4d77662018-10-17 13:53:21 -040044 kvClient kvstore.Client
khenaidoo43c82122018-11-22 18:38:28 -050045 kafkaClient kafka.Client
khenaidoob9203542018-09-17 22:56:37 -040046}
47
48func init() {
49 log.AddPackage(log.JSON, log.WarnLevel, nil)
50}
51
khenaidoo43c82122018-11-22 18:38:28 -050052func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client, kafkaClient kafka.Client) *Core {
khenaidoob9203542018-09-17 22:56:37 -040053 var core Core
54 core.instanceId = id
55 core.exitChannel = make(chan int, 1)
56 core.config = cf
Richard Jankowskie4d77662018-10-17 13:53:21 -040057 core.kvClient = kvClient
khenaidoo43c82122018-11-22 18:38:28 -050058 core.kafkaClient = kafkaClient
Richard Jankowskie4d77662018-10-17 13:53:21 -040059
60 // Setup the KV store
61 // Do not call NewBackend constructor; it creates its own KV client
khenaidoo91ecfd62018-11-04 17:13:42 -050062 // Commented the backend for now until the issue between the model and the KV store
63 // is resolved.
khenaidoo7ccedd52018-12-14 16:48:54 -050064 backend := model.Backend{
65 Client: kvClient,
66 StoreType: cf.KVStoreType,
67 Host: cf.KVStoreHost,
68 Port: cf.KVStorePort,
69 Timeout: cf.KVStoreTimeout,
khenaidoo9cdc1a62019-01-24 21:57:40 -050070 PathPrefix: cf.KVStoreDataPrefix}
khenaidoo7ccedd52018-12-14 16:48:54 -050071 core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
72 core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
khenaidoo43c82122018-11-22 18:38:28 -050073 core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
74 core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
khenaidoob9203542018-09-17 22:56:37 -040075 return &core
76}
77
78func (core *Core) Start(ctx context.Context) {
khenaidoo19374072018-12-11 11:05:15 -050079 log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
khenaidoo9cdc1a62019-01-24 21:57:40 -050080 if err := core.startKafkaMessagingProxy(ctx); err != nil {
81 log.Fatal("Failure-starting-kafkaMessagingProxy")
82 }
khenaidoob9203542018-09-17 22:56:37 -040083 log.Info("values", log.Fields{"kmp": core.kmp})
khenaidoo21d51152019-02-01 13:48:37 -050084 core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
85 core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
khenaidoo4d4802d2018-10-04 21:59:49 -040086 core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
khenaidoo21d51152019-02-01 13:48:37 -050087 if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
khenaidoo9cdc1a62019-01-24 21:57:40 -050088 log.Fatal("Failure-registering-adapterRequestHandler")
89 }
khenaidoob9203542018-09-17 22:56:37 -040090 go core.startDeviceManager(ctx)
91 go core.startLogicalDeviceManager(ctx)
92 go core.startGRPCService(ctx)
khenaidoo21d51152019-02-01 13:48:37 -050093 go core.startAdapterManager(ctx)
khenaidoob9203542018-09-17 22:56:37 -040094
khenaidoo19374072018-12-11 11:05:15 -050095 log.Info("adaptercore-started")
khenaidoob9203542018-09-17 22:56:37 -040096}
97
98func (core *Core) Stop(ctx context.Context) {
khenaidoo19374072018-12-11 11:05:15 -050099 log.Info("stopping-adaptercore")
khenaidoob9203542018-09-17 22:56:37 -0400100 core.exitChannel <- 1
khenaidoo43c82122018-11-22 18:38:28 -0500101 // Stop all the started services
102 core.grpcServer.Stop()
103 core.logicalDeviceMgr.stop(ctx)
104 core.deviceMgr.stop(ctx)
105 core.kmp.Stop()
khenaidoo19374072018-12-11 11:05:15 -0500106 log.Info("adaptercore-stopped")
khenaidoob9203542018-09-17 22:56:37 -0400107}
108
khenaidoo92e62c52018-10-03 14:02:54 -0400109//startGRPCService creates the grpc service handlers, registers it to the grpc server
khenaidoob9203542018-09-17 22:56:37 -0400110// and starts the server
111func (core *Core) startGRPCService(ctx context.Context) {
112 // create an insecure gserver server
113 core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
114 log.Info("grpc-server-created")
115
khenaidoo21d51152019-02-01 13:48:37 -0500116 core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
Richard Jankowskidbab94a2018-12-06 16:20:25 -0500117 core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
khenaidoob9203542018-09-17 22:56:37 -0400118 // Create a function to register the core GRPC service with the GRPC server
119 f := func(gs *grpc.Server) {
120 voltha.RegisterVolthaServiceServer(
121 gs,
Richard Jankowskidbab94a2018-12-06 16:20:25 -0500122 core.grpcNBIAPIHandler,
khenaidoob9203542018-09-17 22:56:37 -0400123 )
124 }
125
126 core.grpcServer.AddService(f)
127 log.Info("grpc-service-added")
128
129 // Start the server
130 core.grpcServer.Start(context.Background())
131 log.Info("grpc-server-started")
132}
133
134func (core *Core) startKafkaMessagingProxy(ctx context.Context) error {
135 log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
136 "port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
137 var err error
khenaidoo43c82122018-11-22 18:38:28 -0500138 if core.kmp, err = kafka.NewInterContainerProxy(
139 kafka.InterContainerHost(core.config.KafkaAdapterHost),
140 kafka.InterContainerPort(core.config.KafkaAdapterPort),
141 kafka.MsgClient(core.kafkaClient),
khenaidoo79232702018-12-04 11:00:41 -0500142 kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic}),
143 kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic})); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400144 log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
145 return err
146 }
147
148 if err = core.kmp.Start(); err != nil {
149 log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
150 return err
151 }
152
153 log.Info("kafka-messaging-proxy-created")
154 return nil
155}
156
khenaidoo91ecfd62018-11-04 17:13:42 -0500157func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
khenaidoo21d51152019-02-01 13:48:37 -0500158 aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) error {
159 requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy)
khenaidoo43c82122018-11-22 18:38:28 -0500160 core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
khenaidoob9203542018-09-17 22:56:37 -0400161
khenaidoo92e62c52018-10-03 14:02:54 -0400162 log.Info("request-handlers")
khenaidoob9203542018-09-17 22:56:37 -0400163 return nil
164}
165
166func (core *Core) startDeviceManager(ctx context.Context) {
167 // TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
168 // callbacks. For now, until the model is ready, devicemanager will keep a reference to the
169 // logicaldevicemanager to initiate the creation of logical devices
khenaidoo21d51152019-02-01 13:48:37 -0500170 log.Info("DeviceManager-Starting...")
khenaidoo4d4802d2018-10-04 21:59:49 -0400171 core.deviceMgr.start(ctx, core.logicalDeviceMgr)
khenaidoo21d51152019-02-01 13:48:37 -0500172 log.Info("DeviceManager-Started")
khenaidoob9203542018-09-17 22:56:37 -0400173}
174
175func (core *Core) startLogicalDeviceManager(ctx context.Context) {
khenaidoo21d51152019-02-01 13:48:37 -0500176 log.Info("Logical-DeviceManager-Starting...")
khenaidoo4d4802d2018-10-04 21:59:49 -0400177 core.logicalDeviceMgr.start(ctx)
khenaidoo21d51152019-02-01 13:48:37 -0500178 log.Info("Logical-DeviceManager-Started")
khenaidoob9203542018-09-17 22:56:37 -0400179}
khenaidoo21d51152019-02-01 13:48:37 -0500180
181func (core *Core) startAdapterManager(ctx context.Context) {
182 log.Info("Adapter-Manager-Starting...")
183 core.adapterMgr.start(ctx)
184 log.Info("Adapter-Manager-Started")
185}