blob: 336d36caedcc74b40ac8884a0a03f07e2ee37733 [file] [log] [blame]
cuilin20187b2a8c32019-03-26 19:52:28 -07001/*
Joey Armstrongf9bffdf2022-12-27 07:05:28 -05002* Copyright 2018-2023 Open Networking Foundation (ONF) and the ONF Contributors
cuilin20187b2a8c32019-03-26 19:52:28 -07003
cbabu116b73f2019-12-10 17:56:32 +05304* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
cuilin20187b2a8c32019-03-26 19:52:28 -07007
cbabu116b73f2019-12-10 17:56:32 +05308* http://www.apache.org/licenses/LICENSE-2.0
cuilin20187b2a8c32019-03-26 19:52:28 -07009
cbabu116b73f2019-12-10 17:56:32 +053010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
cuilin20187b2a8c32019-03-26 19:52:28 -070015 */
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070016
17//Package main invokes the application
cuilin20187b2a8c32019-03-26 19:52:28 -070018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
nikesh.krishnan6dd882b2023-03-14 10:02:41 +053024 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
25 codes "google.golang.org/grpc/codes"
kdarapu381c6902019-07-31 18:23:16 +053026 "os"
27 "os/signal"
kdarapu381c6902019-07-31 18:23:16 +053028 "syscall"
29 "time"
30
khenaidoo106c61a2021-08-11 18:05:46 -040031 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
32 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v7/pkg/events"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
35 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
36 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v7/pkg/log"
38 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
39 "github.com/opencord/voltha-lib-go/v7/pkg/version"
Scott Bakerdbd960e2020-02-28 08:57:51 -080040 "github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
41 ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
khenaidoodc2116e2021-10-19 17:33:19 -040042 "github.com/opencord/voltha-protos/v5/go/adapter_service"
43 ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
44 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoodc2116e2021-10-19 17:33:19 -040045 "github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
khenaidoo106c61a2021-08-11 18:05:46 -040046 "github.com/opencord/voltha-protos/v5/go/voltha"
47 "google.golang.org/grpc"
48)
49
50const (
51 clusterMessagingService = "cluster-message-service"
52 oltAdapterService = "olt-adapter-service"
53 kvService = "kv-service"
54 coreService = "core-service"
cuilin20187b2a8c32019-03-26 19:52:28 -070055)
56
57type adapter struct {
khenaidooefff76e2021-12-15 16:51:30 -050058 instanceID string
59 config *config.AdapterFlags
60 grpcServer *vgrpc.GrpcServer
61 oltAdapter *ac.OpenOLT
62 oltInterAdapter *ac.OpenOLTInterAdapter
63 kafkaClient kafka.Client
64 kvClient kvstore.Client
65 coreClient *vgrpc.Client
66 eventProxy eventif.EventProxy
67 halted bool
68 exitChannel chan int
cuilin20187b2a8c32019-03-26 19:52:28 -070069}
70
cuilin20187b2a8c32019-03-26 19:52:28 -070071func newAdapter(cf *config.AdapterFlags) *adapter {
72 var a adapter
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070073 a.instanceID = cf.InstanceID
cuilin20187b2a8c32019-03-26 19:52:28 -070074 a.config = cf
75 a.halted = false
76 a.exitChannel = make(chan int, 1)
cuilin20187b2a8c32019-03-26 19:52:28 -070077 return &a
78}
79
80func (a *adapter) start(ctx context.Context) {
Neha Sharma96b7bf22020-06-15 10:37:32 +000081 logger.Info(ctx, "Starting Core Adapter components")
cuilin20187b2a8c32019-03-26 19:52:28 -070082 var err error
83
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000084 var p *probe.Probe
85 if value := ctx.Value(probe.ProbeContextKey); value != nil {
86 if _, ok := value.(*probe.Probe); ok {
87 p = value.(*probe.Probe)
88 p.RegisterService(
Neha Sharma96b7bf22020-06-15 10:37:32 +000089 ctx,
khenaidoo106c61a2021-08-11 18:05:46 -040090 clusterMessagingService,
91 kvService,
92 oltAdapterService,
93 coreService,
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000094 )
95 }
96 }
97
cuilin20187b2a8c32019-03-26 19:52:28 -070098 // Setup KV Client
Neha Sharma96b7bf22020-06-15 10:37:32 +000099 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
100 if err = a.setKVClient(ctx); err != nil {
101 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700102 }
103
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000104 if p != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400105 p.UpdateStatus(ctx, kvService, probe.ServiceStatusRunning)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000106 }
107
divyadesaia37f78b2020-02-07 12:41:22 +0000108 // Setup Log Config
Neha Sharma96b7bf22020-06-15 10:37:32 +0000109 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Matteo Scandolodfa7a972020-11-06 13:03:40 -0800110
divyadesaid26f6b12020-03-19 06:30:28 +0000111 go conf.StartLogLevelConfigProcessing(cm, ctx)
Girish Kumar935f7af2020-08-18 11:59:42 +0000112 go conf.StartLogFeaturesConfigProcessing(cm, ctx)
divyadesaia37f78b2020-02-07 12:41:22 +0000113
cuilin20187b2a8c32019-03-26 19:52:28 -0700114 // Setup Kafka Client
khenaidoo106c61a2021-08-11 18:05:46 -0400115 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000116 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700117 }
118
khenaidoo106c61a2021-08-11 18:05:46 -0400119 // Start kafka communication with the broker
120 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
121 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000122 }
123
Devmalya Paulfb990a52019-07-09 10:01:49 -0400124 // Create the event proxy to post events to KAFKA
Himani Chawlacd407802020-12-10 12:08:59 +0530125 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo106c61a2021-08-11 18:05:46 -0400126 go func() {
127 if err := a.eventProxy.Start(); err != nil {
128 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
129 }
130 }()
131
132 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
133 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo27e7ac92021-12-08 14:43:09 -0500134 if a.coreClient, err = vgrpc.NewClient(
135 a.config.AdapterEndpoint,
136 a.config.CoreEndpoint,
khenaidooefff76e2021-12-15 16:51:30 -0500137 "core_service.CoreService",
khenaidoo27e7ac92021-12-08 14:43:09 -0500138 a.coreRestarted); err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400139 logger.Fatal(ctx, "grpc-client-not-created")
140 }
141 // Start the core grpc client
nikesh.krishnan6dd882b2023-03-14 10:02:41 +0530142 retryCodes := []codes.Code{
143 codes.Unavailable, // server is currently unavailable
144 codes.DeadlineExceeded, // deadline for the operation was exceeded
145 }
nikesh.krishnan97e74d22023-06-28 13:54:01 +0530146 // the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
147 backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
148 grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
nikesh.krishnan6dd882b2023-03-14 10:02:41 +0530149 logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
150 go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
Devmalya Paulfb990a52019-07-09 10:01:49 -0400151
cuilin20187b2a8c32019-03-26 19:52:28 -0700152 // Create the open OLT adapter
khenaidoo106c61a2021-08-11 18:05:46 -0400153 if a.oltAdapter, err = a.startOpenOLT(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000154 logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700155 }
156
khenaidooefff76e2021-12-15 16:51:30 -0500157 // Create the open OLT Inter adapter adapter
158 if a.oltInterAdapter, err = a.startOpenOLTInterAdapter(ctx, a.oltAdapter); err != nil {
159 logger.Fatalw(ctx, "error-starting-openolt-inter-adapter", log.Fields{"error": err})
160 }
161
khenaidoo106c61a2021-08-11 18:05:46 -0400162 // Create and start the grpc server
163 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
164
165 //Register the adapter service
166 a.addAdapterService(ctx, a.grpcServer, a.oltAdapter)
167
168 //Register the olt inter-adapter service
khenaidooefff76e2021-12-15 16:51:30 -0500169 a.addOltInterAdapterService(ctx, a.grpcServer, a.oltInterAdapter)
khenaidoo106c61a2021-08-11 18:05:46 -0400170
171 // Start the grpc server
172 go a.startGRPCService(ctx, a.grpcServer, oltAdapterService)
cuilin20187b2a8c32019-03-26 19:52:28 -0700173
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700174 // Register this adapter to the Core - retries indefinitely
khenaidoo106c61a2021-08-11 18:05:46 -0400175 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000176 logger.Fatal(ctx, "error-registering-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700177 }
cbabu95f21522019-11-13 14:25:18 +0100178
cbabu116b73f2019-12-10 17:56:32 +0530179 // check the readiness and liveliness and update the probe status
180 a.checkServicesReadiness(ctx)
cbabu95f21522019-11-13 14:25:18 +0100181}
182
khenaidoo106c61a2021-08-11 18:05:46 -0400183// TODO: Any action the adapter needs to do following a Core restart?
184func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
185 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
186 return nil
187}
188
khenaidooefff76e2021-12-15 16:51:30 -0500189// getCoreServiceClientHandler is used to test whether the remote gRPC service is up
190func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
191 if conn == nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400192 return nil
193 }
khenaidooefff76e2021-12-15 16:51:30 -0500194 return core_service.NewCoreServiceClient(conn)
khenaidoo106c61a2021-08-11 18:05:46 -0400195}
196
cbabu95f21522019-11-13 14:25:18 +0100197/**
198This function checks the liveliness and readiness of the kakfa and kv-client services
199and update the status in the probe.
200*/
cbabu116b73f2019-12-10 17:56:32 +0530201func (a *adapter) checkServicesReadiness(ctx context.Context) {
202 // checks the kafka readiness
khenaidoo106c61a2021-08-11 18:05:46 -0400203 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
cbabu116b73f2019-12-10 17:56:32 +0530204
205 // checks the kv-store readiness
206 go a.checkKvStoreReadiness(ctx)
207}
208
209/**
210This function checks the liveliness and readiness of the kv-store service
211and update the status in the probe.
212*/
213func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
214 // dividing the live probe interval by 2 to get updated status every 30s
215 timeout := a.config.LiveProbeInterval / 2
216 kvStoreChannel := make(chan bool, 1)
217
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700218 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
219 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
220 cancelFunc()
221
cbabu95f21522019-11-13 14:25:18 +0100222 for {
cbabu116b73f2019-12-10 17:56:32 +0530223 timeoutTimer := time.NewTimer(timeout)
224 select {
225 case liveliness := <-kvStoreChannel:
226 if !liveliness {
227 // kv-store not reachable or down, updating the status to not ready state
khenaidoo106c61a2021-08-11 18:05:46 -0400228 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
cbabu116b73f2019-12-10 17:56:32 +0530229 timeout = a.config.NotLiveProbeInterval
230 } else {
231 // kv-store is reachable , updating the status to running state
khenaidoo106c61a2021-08-11 18:05:46 -0400232 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
cbabu116b73f2019-12-10 17:56:32 +0530233 timeout = a.config.LiveProbeInterval / 2
234 }
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700235
cbabu116b73f2019-12-10 17:56:32 +0530236 // Check if the timer has expired or not
237 if !timeoutTimer.Stop() {
238 <-timeoutTimer.C
239 }
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700240
cbabu116b73f2019-12-10 17:56:32 +0530241 case <-timeoutTimer.C:
Girish Kumarbeadc112020-02-26 18:41:02 +0000242 // Check the status of the kv-store. Use timeout of 2 seconds to avoid forever blocking
Neha Sharma96b7bf22020-06-15 10:37:32 +0000243 logger.Info(ctx, "kv-store liveliness-recheck")
Girish Kumarbeadc112020-02-26 18:41:02 +0000244 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
245
246 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
247 // Cleanup cancel func resources
248 cancelFunc()
cbabu95f21522019-11-13 14:25:18 +0100249 }
cbabu116b73f2019-12-10 17:56:32 +0530250 }
251}
252
npujarec5762e2020-01-01 14:08:48 +0530253func (a *adapter) stop(ctx context.Context) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700254 // Stop leadership tracking
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700255 a.halted = true
cuilin20187b2a8c32019-03-26 19:52:28 -0700256
257 // send exit signal
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700258 a.exitChannel <- 0
cuilin20187b2a8c32019-03-26 19:52:28 -0700259
khenaidooefff76e2021-12-15 16:51:30 -0500260 // Stop all grpc processing
261 if err := a.oltAdapter.Stop(ctx); err != nil {
262 logger.Errorw(ctx, "failure-stopping-olt-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
263 }
264 if err := a.oltInterAdapter.Stop(ctx); err != nil {
265 logger.Errorw(ctx, "failure-stopping-olt-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
266 }
267
cuilin20187b2a8c32019-03-26 19:52:28 -0700268 // Cleanup - applies only if we had a kvClient
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700269 if a.kvClient != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700270 // Release all reservations
npujarec5762e2020-01-01 14:08:48 +0530271 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000272 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700273 }
274 // Close the DB connection
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700275 go a.kvClient.Close(ctx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700276 }
277
khenaidoo106c61a2021-08-11 18:05:46 -0400278 if a.eventProxy != nil {
279 a.eventProxy.Stop()
Scott Bakere701b862020-02-20 16:19:16 -0800280 }
281
khenaidoo106c61a2021-08-11 18:05:46 -0400282 if a.kafkaClient != nil {
283 a.kafkaClient.Stop(ctx)
284 }
285
286 // Stop core client
287 if a.coreClient != nil {
288 a.coreClient.Stop(ctx)
289 }
290
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700291 logger.Info(ctx, "main-stop-processing-complete")
292
khenaidoo106c61a2021-08-11 18:05:46 -0400293 // TODO: Stop child devices connections
294
cuilin20187b2a8c32019-03-26 19:52:28 -0700295 // TODO: More cleanup
296}
297
Neha Sharma96b7bf22020-06-15 10:37:32 +0000298func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700299
Neha Sharma96b7bf22020-06-15 10:37:32 +0000300 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700301 switch storeType {
cuilin20187b2a8c32019-03-26 19:52:28 -0700302 case "etcd":
Neha Sharma96b7bf22020-06-15 10:37:32 +0000303 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
cuilin20187b2a8c32019-03-26 19:52:28 -0700304 }
305 return nil, errors.New("unsupported-kv-store")
306}
307
Neha Sharma96b7bf22020-06-15 10:37:32 +0000308func newKafkaClient(ctx context.Context, clientType, address string) (kafka.Client, error) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700309
Neha Sharma96b7bf22020-06-15 10:37:32 +0000310 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700311 switch clientType {
312 case "sarama":
313 return kafka.NewSaramaClient(
Neha Sharma3f221ae2020-04-29 19:02:12 +0000314 kafka.Address(address),
cuilin20187b2a8c32019-03-26 19:52:28 -0700315 kafka.ProducerReturnOnErrors(true),
316 kafka.ProducerReturnOnSuccess(true),
317 kafka.ProducerMaxRetries(6),
Abhilash S.L3b494632019-07-16 15:51:09 +0530318 kafka.ProducerRetryBackoff(time.Millisecond*30),
319 kafka.MetadatMaxRetries(15)), nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700320 }
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700321
cuilin20187b2a8c32019-03-26 19:52:28 -0700322 return nil, errors.New("unsupported-client-type")
323}
324
Neha Sharma96b7bf22020-06-15 10:37:32 +0000325func (a *adapter) setKVClient(ctx context.Context) error {
326 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
cuilin20187b2a8c32019-03-26 19:52:28 -0700327 if err != nil {
328 a.kvClient = nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700329 return err
330 }
331 a.kvClient = client
divyadesaia37f78b2020-02-07 12:41:22 +0000332
cuilin20187b2a8c32019-03-26 19:52:28 -0700333 return nil
334}
335
khenaidoo106c61a2021-08-11 18:05:46 -0400336// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
337func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
338 logger.Infow(ctx, "starting-grpc-service", log.Fields{"service": serviceName})
339
340 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
341 logger.Infow(ctx, "grpc-service-started", log.Fields{"service": serviceName})
342
343 server.Start(ctx)
344 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
cuilin20187b2a8c32019-03-26 19:52:28 -0700345}
346
khenaidoodc2116e2021-10-19 17:33:19 -0400347func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo106c61a2021-08-11 18:05:46 -0400348 logger.Info(ctx, "adding-adapter-service")
349
350 server.AddService(func(gs *grpc.Server) {
khenaidoodc2116e2021-10-19 17:33:19 -0400351 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo106c61a2021-08-11 18:05:46 -0400352 })
353}
354
khenaidoodc2116e2021-10-19 17:33:19 -0400355func (a *adapter) addOltInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler olt_inter_adapter_service.OltInterAdapterServiceServer) {
khenaidoo106c61a2021-08-11 18:05:46 -0400356 logger.Info(ctx, "adding-olt-inter-adapter-service")
357
358 server.AddService(func(gs *grpc.Server) {
khenaidoodc2116e2021-10-19 17:33:19 -0400359 olt_inter_adapter_service.RegisterOltInterAdapterServiceServer(gs, handler)
khenaidoo106c61a2021-08-11 18:05:46 -0400360 })
361}
362
363func (a *adapter) startOpenOLT(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolodfa7a972020-11-06 13:03:40 -0800364 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenOLT, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000365 logger.Info(ctx, "starting-open-olt")
cuilin20187b2a8c32019-03-26 19:52:28 -0700366 var err error
khenaidoo106c61a2021-08-11 18:05:46 -0400367 sOLT := ac.NewOpenOLT(ctx, cc, ep, cfg, cm)
cuilin20187b2a8c32019-03-26 19:52:28 -0700368
369 if err = sOLT.Start(ctx); err != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700370 return nil, err
371 }
372
Neha Sharma96b7bf22020-06-15 10:37:32 +0000373 logger.Info(ctx, "open-olt-started")
cuilin20187b2a8c32019-03-26 19:52:28 -0700374 return sOLT, nil
375}
376
khenaidooefff76e2021-12-15 16:51:30 -0500377func (a *adapter) startOpenOLTInterAdapter(ctx context.Context, oo *ac.OpenOLT) (*ac.OpenOLTInterAdapter, error) {
378 logger.Info(ctx, "starting-open-olt-inter-adapter")
379 var err error
380 sOLTInterAdapter := ac.NewOpenOLTInterAdapter(oo)
381
382 if err = sOLTInterAdapter.Start(ctx); err != nil {
383 return nil, err
384 }
385
386 logger.Info(ctx, "open-olt-inter-adapter-started")
387 return sOLTInterAdapter, nil
388}
389
khenaidoo106c61a2021-08-11 18:05:46 -0400390func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700391 adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000392 logger.Infow(ctx, "registering-with-core", log.Fields{
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700393 "adapterID": adapterID,
394 "currentReplica": a.config.CurrentReplica,
395 "totalReplicas": a.config.TotalReplicas,
396 })
397 adapterDescription := &voltha.Adapter{
398 Id: adapterID, // Unique name for the device type
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400399 Vendor: "VOLTHA OpenOLT",
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700400 Version: version.VersionInfo.Version,
khenaidoo106c61a2021-08-11 18:05:46 -0400401 // The Endpoint refers to the address this service is listening on.
402 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700403 Type: "openolt",
404 CurrentReplica: int32(a.config.CurrentReplica),
405 TotalReplicas: int32(a.config.TotalReplicas),
406 }
407 types := []*voltha.DeviceType{{
408 Id: "openolt",
khenaidoo106c61a2021-08-11 18:05:46 -0400409 AdapterType: "openolt", // Type of the adapter that handles device type
410 Adapter: "openolt", // Deprecated attribute
Girish Gowdru0c588b22019-04-23 23:24:56 -0400411 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
412 AcceptsAddRemoveFlowUpdates: true}}
cuilin20187b2a8c32019-03-26 19:52:28 -0700413 deviceTypes := &voltha.DeviceTypes{Items: types}
414 count := 0
415 for {
khenaidoo106c61a2021-08-11 18:05:46 -0400416 gClient, err := a.coreClient.GetCoreServiceClient()
417 if gClient != nil {
khenaidoodc2116e2021-10-19 17:33:19 -0400418 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ca.AdapterRegistration{
khenaidoo106c61a2021-08-11 18:05:46 -0400419 Adapter: adapterDescription,
420 DTypes: deviceTypes}); err == nil {
421 break
cuilin20187b2a8c32019-03-26 19:52:28 -0700422 }
cuilin20187b2a8c32019-03-26 19:52:28 -0700423 }
khenaidoo106c61a2021-08-11 18:05:46 -0400424 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
425 if retries == count {
426 return err
427 }
428 count++
429 // Take a nap before retrying
430 time.Sleep(2 * time.Second)
cuilin20187b2a8c32019-03-26 19:52:28 -0700431 }
khenaidoo106c61a2021-08-11 18:05:46 -0400432 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000433 logger.Info(ctx, "registered-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700434 return nil
435}
436
Neha Sharma96b7bf22020-06-15 10:37:32 +0000437func waitForExit(ctx context.Context) int {
cuilin20187b2a8c32019-03-26 19:52:28 -0700438 signalChannel := make(chan os.Signal, 1)
439 signal.Notify(signalChannel,
440 syscall.SIGHUP,
441 syscall.SIGINT,
442 syscall.SIGTERM,
443 syscall.SIGQUIT)
444
445 exitChannel := make(chan int)
446
447 go func() {
448 s := <-signalChannel
449 switch s {
450 case syscall.SIGHUP,
451 syscall.SIGINT,
452 syscall.SIGTERM,
453 syscall.SIGQUIT:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000454 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700455 exitChannel <- 0
456 default:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000457 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700458 exitChannel <- 1
459 }
460 }()
461
462 code := <-exitChannel
463 return code
464}
465
466func printBanner() {
David K. Bainbridge794735f2020-02-11 21:01:37 -0800467 fmt.Println(` ____ ____ _ _______ `)
468 fmt.Println(` / _ \ / __ \| | |__ __|`)
469 fmt.Println(` | | | |_ __ ___ _ __ | | | | | | | `)
470 fmt.Println(` | | | | '_ \ / _ \ '_ \ | | | | | | | `)
471 fmt.Println(` | |__| | |_) | __/ | | || |__| | |____| | `)
472 fmt.Println(` \____/| .__/ \___|_| |_| \____/|______|_| `)
473 fmt.Println(` | | `)
474 fmt.Println(` |_| `)
475 fmt.Println(` `)
cuilin20187b2a8c32019-03-26 19:52:28 -0700476}
477
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400478func printVersion() {
479 fmt.Println("VOLTHA OpenOLT Adapter")
480 fmt.Println(version.VersionInfo.String(" "))
481}
482
cuilin20187b2a8c32019-03-26 19:52:28 -0700483func main() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000484 ctx := context.Background()
cuilin20187b2a8c32019-03-26 19:52:28 -0700485 start := time.Now()
486
487 cf := config.NewAdapterFlags()
488 cf.ParseCommandArguments()
489
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700490 // Setup logging
cuilin20187b2a8c32019-03-26 19:52:28 -0700491
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000492 logLevel, err := log.StringToLogLevel(cf.LogLevel)
493 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000494 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000495 }
Rohan Agrawal2488f192020-01-31 09:26:55 +0000496
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700497 // Setup default logger - applies for packages that do not have specific logger set
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000498 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
Girish Kumara1ea2aa2020-08-19 18:14:22 +0000499 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
cuilin20187b2a8c32019-03-26 19:52:28 -0700500 }
501
502 // Update all loggers (provisionned via init) with a common field
Hardik Windlassb9c869b2019-10-10 08:34:32 +0000503 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
Girish Kumara1ea2aa2020-08-19 18:14:22 +0000504 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
cuilin20187b2a8c32019-03-26 19:52:28 -0700505 }
506
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000507 log.SetAllLogLevel(logLevel)
Rohan Agrawal93bced32020-02-11 10:16:01 +0000508
Matteo Scandolo8f2b9572020-02-28 15:35:23 -0800509 realMain()
510
Kent Hagermane6ff1012020-07-14 15:07:53 -0400511 defer func() {
512 err := log.CleanUp()
513 if err != nil {
514 logger.Errorw(context.Background(), "unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
515 }
516 }()
cuilin20187b2a8c32019-03-26 19:52:28 -0700517
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400518 // Print version / build information and exit
519 if cf.DisplayVersionOnly {
520 printVersion()
521 return
522 }
523
cuilin20187b2a8c32019-03-26 19:52:28 -0700524 // Print banner if specified
525 if cf.Banner {
526 printBanner()
527 }
528
Neha Sharma96b7bf22020-06-15 10:37:32 +0000529 logger.Infow(ctx, "config", log.Fields{"config": *cf})
cuilin20187b2a8c32019-03-26 19:52:28 -0700530
531 ctx, cancel := context.WithCancel(context.Background())
532 defer cancel()
533
534 ad := newAdapter(cf)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000535
536 p := &probe.Probe{}
Neha Sharma96b7bf22020-06-15 10:37:32 +0000537 go p.ListenAndServe(ctx, ad.config.ProbeAddress)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000538
539 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
540
Girish Kumar935f7af2020-08-18 11:59:42 +0000541 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
Girish Kumar11e15972020-06-15 14:51:10 +0000542 if err != nil {
543 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
544 } else {
545 defer log.TerminateTracing(closer)
546 }
547
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000548 go ad.start(probeCtx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700549
Neha Sharma96b7bf22020-06-15 10:37:32 +0000550 code := waitForExit(ctx)
551 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
cuilin20187b2a8c32019-03-26 19:52:28 -0700552
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700553 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
554 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700555 // Cleanup before leaving
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700556 ad.stop(ctxWithCancel)
557 // Will halt any long-running stop routine gracefully
558 cancelFunc()
cuilin20187b2a8c32019-03-26 19:52:28 -0700559
560 elapsed := time.Since(start)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000561 logger.Infow(ctx, "run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
cuilin20187b2a8c32019-03-26 19:52:28 -0700562}