blob: a01976a96bda1d4323132235e8a9cd16de89920c [file] [log] [blame]
cuilin20187b2a8c32019-03-26 19:52:28 -07001/*
Joey Armstrongf9bffdf2022-12-27 07:05:28 -05002* Copyright 2018-2023 Open Networking Foundation (ONF) and the ONF Contributors
cuilin20187b2a8c32019-03-26 19:52:28 -07003
cbabu116b73f2019-12-10 17:56:32 +05304* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
cuilin20187b2a8c32019-03-26 19:52:28 -07007
cbabu116b73f2019-12-10 17:56:32 +05308* http://www.apache.org/licenses/LICENSE-2.0
cuilin20187b2a8c32019-03-26 19:52:28 -07009
cbabu116b73f2019-12-10 17:56:32 +053010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
cuilin20187b2a8c32019-03-26 19:52:28 -070015 */
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070016
Joey Armstrong87b55f72023-06-27 12:12:53 -040017// Package main invokes the application
cuilin20187b2a8c32019-03-26 19:52:28 -070018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
nikesh.krishnan6dd882b2023-03-14 10:02:41 +053024 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
25 codes "google.golang.org/grpc/codes"
kdarapu381c6902019-07-31 18:23:16 +053026 "os"
27 "os/signal"
kdarapu381c6902019-07-31 18:23:16 +053028 "syscall"
29 "time"
30
khenaidoo106c61a2021-08-11 18:05:46 -040031 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
32 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v7/pkg/events"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
35 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
36 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v7/pkg/log"
38 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
39 "github.com/opencord/voltha-lib-go/v7/pkg/version"
Scott Bakerdbd960e2020-02-28 08:57:51 -080040 "github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
41 ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
khenaidoodc2116e2021-10-19 17:33:19 -040042 "github.com/opencord/voltha-protos/v5/go/adapter_service"
43 ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
44 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoodc2116e2021-10-19 17:33:19 -040045 "github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
khenaidoo106c61a2021-08-11 18:05:46 -040046 "github.com/opencord/voltha-protos/v5/go/voltha"
47 "google.golang.org/grpc"
48)
49
50const (
51 clusterMessagingService = "cluster-message-service"
52 oltAdapterService = "olt-adapter-service"
53 kvService = "kv-service"
54 coreService = "core-service"
cuilin20187b2a8c32019-03-26 19:52:28 -070055)
56
57type adapter struct {
khenaidooefff76e2021-12-15 16:51:30 -050058 instanceID string
59 config *config.AdapterFlags
60 grpcServer *vgrpc.GrpcServer
61 oltAdapter *ac.OpenOLT
62 oltInterAdapter *ac.OpenOLTInterAdapter
63 kafkaClient kafka.Client
64 kvClient kvstore.Client
65 coreClient *vgrpc.Client
66 eventProxy eventif.EventProxy
67 halted bool
68 exitChannel chan int
cuilin20187b2a8c32019-03-26 19:52:28 -070069}
70
cuilin20187b2a8c32019-03-26 19:52:28 -070071func newAdapter(cf *config.AdapterFlags) *adapter {
72 var a adapter
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070073 a.instanceID = cf.InstanceID
cuilin20187b2a8c32019-03-26 19:52:28 -070074 a.config = cf
75 a.halted = false
76 a.exitChannel = make(chan int, 1)
cuilin20187b2a8c32019-03-26 19:52:28 -070077 return &a
78}
79
80func (a *adapter) start(ctx context.Context) {
Neha Sharma96b7bf22020-06-15 10:37:32 +000081 logger.Info(ctx, "Starting Core Adapter components")
cuilin20187b2a8c32019-03-26 19:52:28 -070082 var err error
83
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000084 var p *probe.Probe
85 if value := ctx.Value(probe.ProbeContextKey); value != nil {
86 if _, ok := value.(*probe.Probe); ok {
87 p = value.(*probe.Probe)
88 p.RegisterService(
Neha Sharma96b7bf22020-06-15 10:37:32 +000089 ctx,
khenaidoo106c61a2021-08-11 18:05:46 -040090 clusterMessagingService,
91 kvService,
92 oltAdapterService,
93 coreService,
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000094 )
95 }
96 }
97
cuilin20187b2a8c32019-03-26 19:52:28 -070098 // Setup KV Client
Neha Sharma96b7bf22020-06-15 10:37:32 +000099 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
100 if err = a.setKVClient(ctx); err != nil {
101 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700102 }
103
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000104 if p != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400105 p.UpdateStatus(ctx, kvService, probe.ServiceStatusRunning)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000106 }
107
divyadesaia37f78b2020-02-07 12:41:22 +0000108 // Setup Log Config
Neha Sharma96b7bf22020-06-15 10:37:32 +0000109 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Matteo Scandolodfa7a972020-11-06 13:03:40 -0800110
divyadesaid26f6b12020-03-19 06:30:28 +0000111 go conf.StartLogLevelConfigProcessing(cm, ctx)
Girish Kumar935f7af2020-08-18 11:59:42 +0000112 go conf.StartLogFeaturesConfigProcessing(cm, ctx)
divyadesaia37f78b2020-02-07 12:41:22 +0000113
cuilin20187b2a8c32019-03-26 19:52:28 -0700114 // Setup Kafka Client
khenaidoo106c61a2021-08-11 18:05:46 -0400115 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000116 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700117 }
118
khenaidoo106c61a2021-08-11 18:05:46 -0400119 // Start kafka communication with the broker
120 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
121 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000122 }
123
Devmalya Paulfb990a52019-07-09 10:01:49 -0400124 // Create the event proxy to post events to KAFKA
Himani Chawlacd407802020-12-10 12:08:59 +0530125 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo106c61a2021-08-11 18:05:46 -0400126 go func() {
127 if err := a.eventProxy.Start(); err != nil {
128 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
129 }
130 }()
131
132 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
133 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo27e7ac92021-12-08 14:43:09 -0500134 if a.coreClient, err = vgrpc.NewClient(
135 a.config.AdapterEndpoint,
136 a.config.CoreEndpoint,
khenaidooefff76e2021-12-15 16:51:30 -0500137 "core_service.CoreService",
khenaidoo27e7ac92021-12-08 14:43:09 -0500138 a.coreRestarted); err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400139 logger.Fatal(ctx, "grpc-client-not-created")
140 }
141 // Start the core grpc client
nikesh.krishnan6dd882b2023-03-14 10:02:41 +0530142 retryCodes := []codes.Code{
143 codes.Unavailable, // server is currently unavailable
144 codes.DeadlineExceeded, // deadline for the operation was exceeded
145 }
nikesh.krishnan97e74d22023-06-28 13:54:01 +0530146 // the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
147 backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
148 grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
nikesh.krishnan6dd882b2023-03-14 10:02:41 +0530149 logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
150 go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
Devmalya Paulfb990a52019-07-09 10:01:49 -0400151
cuilin20187b2a8c32019-03-26 19:52:28 -0700152 // Create the open OLT adapter
khenaidoo106c61a2021-08-11 18:05:46 -0400153 if a.oltAdapter, err = a.startOpenOLT(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000154 logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700155 }
156
khenaidooefff76e2021-12-15 16:51:30 -0500157 // Create the open OLT Inter adapter adapter
158 if a.oltInterAdapter, err = a.startOpenOLTInterAdapter(ctx, a.oltAdapter); err != nil {
159 logger.Fatalw(ctx, "error-starting-openolt-inter-adapter", log.Fields{"error": err})
160 }
161
khenaidoo106c61a2021-08-11 18:05:46 -0400162 // Create and start the grpc server
163 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
164
165 //Register the adapter service
166 a.addAdapterService(ctx, a.grpcServer, a.oltAdapter)
167
168 //Register the olt inter-adapter service
khenaidooefff76e2021-12-15 16:51:30 -0500169 a.addOltInterAdapterService(ctx, a.grpcServer, a.oltInterAdapter)
khenaidoo106c61a2021-08-11 18:05:46 -0400170
171 // Start the grpc server
172 go a.startGRPCService(ctx, a.grpcServer, oltAdapterService)
cuilin20187b2a8c32019-03-26 19:52:28 -0700173
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700174 // Register this adapter to the Core - retries indefinitely
khenaidoo106c61a2021-08-11 18:05:46 -0400175 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000176 logger.Fatal(ctx, "error-registering-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700177 }
cbabu95f21522019-11-13 14:25:18 +0100178
cbabu116b73f2019-12-10 17:56:32 +0530179 // check the readiness and liveliness and update the probe status
180 a.checkServicesReadiness(ctx)
cbabu95f21522019-11-13 14:25:18 +0100181}
182
khenaidoo106c61a2021-08-11 18:05:46 -0400183// TODO: Any action the adapter needs to do following a Core restart?
184func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
185 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
186 return nil
187}
188
khenaidooefff76e2021-12-15 16:51:30 -0500189// getCoreServiceClientHandler is used to test whether the remote gRPC service is up
190func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
191 if conn == nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400192 return nil
193 }
khenaidooefff76e2021-12-15 16:51:30 -0500194 return core_service.NewCoreServiceClient(conn)
khenaidoo106c61a2021-08-11 18:05:46 -0400195}
196
Joey Armstrong87b55f72023-06-27 12:12:53 -0400197/*
198*
cbabu95f21522019-11-13 14:25:18 +0100199This function checks the liveliness and readiness of the kakfa and kv-client services
200and update the status in the probe.
201*/
cbabu116b73f2019-12-10 17:56:32 +0530202func (a *adapter) checkServicesReadiness(ctx context.Context) {
203 // checks the kafka readiness
khenaidoo106c61a2021-08-11 18:05:46 -0400204 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
cbabu116b73f2019-12-10 17:56:32 +0530205
206 // checks the kv-store readiness
207 go a.checkKvStoreReadiness(ctx)
208}
209
Joey Armstrong87b55f72023-06-27 12:12:53 -0400210/*
211*
cbabu116b73f2019-12-10 17:56:32 +0530212This function checks the liveliness and readiness of the kv-store service
213and update the status in the probe.
214*/
215func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
216 // dividing the live probe interval by 2 to get updated status every 30s
217 timeout := a.config.LiveProbeInterval / 2
218 kvStoreChannel := make(chan bool, 1)
219
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700220 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
221 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
222 cancelFunc()
223
cbabu95f21522019-11-13 14:25:18 +0100224 for {
cbabu116b73f2019-12-10 17:56:32 +0530225 timeoutTimer := time.NewTimer(timeout)
226 select {
227 case liveliness := <-kvStoreChannel:
228 if !liveliness {
229 // kv-store not reachable or down, updating the status to not ready state
khenaidoo106c61a2021-08-11 18:05:46 -0400230 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
cbabu116b73f2019-12-10 17:56:32 +0530231 timeout = a.config.NotLiveProbeInterval
232 } else {
233 // kv-store is reachable , updating the status to running state
khenaidoo106c61a2021-08-11 18:05:46 -0400234 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
cbabu116b73f2019-12-10 17:56:32 +0530235 timeout = a.config.LiveProbeInterval / 2
236 }
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700237
cbabu116b73f2019-12-10 17:56:32 +0530238 // Check if the timer has expired or not
239 if !timeoutTimer.Stop() {
240 <-timeoutTimer.C
241 }
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700242
cbabu116b73f2019-12-10 17:56:32 +0530243 case <-timeoutTimer.C:
Girish Kumarbeadc112020-02-26 18:41:02 +0000244 // Check the status of the kv-store. Use timeout of 2 seconds to avoid forever blocking
Neha Sharma96b7bf22020-06-15 10:37:32 +0000245 logger.Info(ctx, "kv-store liveliness-recheck")
Girish Kumarbeadc112020-02-26 18:41:02 +0000246 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
247
248 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
249 // Cleanup cancel func resources
250 cancelFunc()
cbabu95f21522019-11-13 14:25:18 +0100251 }
cbabu116b73f2019-12-10 17:56:32 +0530252 }
253}
254
npujarec5762e2020-01-01 14:08:48 +0530255func (a *adapter) stop(ctx context.Context) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700256 // Stop leadership tracking
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700257 a.halted = true
cuilin20187b2a8c32019-03-26 19:52:28 -0700258
259 // send exit signal
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700260 a.exitChannel <- 0
cuilin20187b2a8c32019-03-26 19:52:28 -0700261
khenaidooefff76e2021-12-15 16:51:30 -0500262 // Stop all grpc processing
263 if err := a.oltAdapter.Stop(ctx); err != nil {
264 logger.Errorw(ctx, "failure-stopping-olt-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
265 }
266 if err := a.oltInterAdapter.Stop(ctx); err != nil {
267 logger.Errorw(ctx, "failure-stopping-olt-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
268 }
269
cuilin20187b2a8c32019-03-26 19:52:28 -0700270 // Cleanup - applies only if we had a kvClient
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700271 if a.kvClient != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700272 // Release all reservations
npujarec5762e2020-01-01 14:08:48 +0530273 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000274 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700275 }
276 // Close the DB connection
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700277 go a.kvClient.Close(ctx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700278 }
279
khenaidoo106c61a2021-08-11 18:05:46 -0400280 if a.eventProxy != nil {
281 a.eventProxy.Stop()
Scott Bakere701b862020-02-20 16:19:16 -0800282 }
283
khenaidoo106c61a2021-08-11 18:05:46 -0400284 if a.kafkaClient != nil {
285 a.kafkaClient.Stop(ctx)
286 }
287
288 // Stop core client
289 if a.coreClient != nil {
290 a.coreClient.Stop(ctx)
291 }
292
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700293 logger.Info(ctx, "main-stop-processing-complete")
294
khenaidoo106c61a2021-08-11 18:05:46 -0400295 // TODO: Stop child devices connections
296
cuilin20187b2a8c32019-03-26 19:52:28 -0700297 // TODO: More cleanup
298}
299
Neha Sharma96b7bf22020-06-15 10:37:32 +0000300func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700301
Neha Sharma96b7bf22020-06-15 10:37:32 +0000302 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700303 switch storeType {
cuilin20187b2a8c32019-03-26 19:52:28 -0700304 case "etcd":
Neha Sharma96b7bf22020-06-15 10:37:32 +0000305 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
cuilin20187b2a8c32019-03-26 19:52:28 -0700306 }
307 return nil, errors.New("unsupported-kv-store")
308}
309
Neha Sharma96b7bf22020-06-15 10:37:32 +0000310func newKafkaClient(ctx context.Context, clientType, address string) (kafka.Client, error) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700311
Neha Sharma96b7bf22020-06-15 10:37:32 +0000312 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700313 switch clientType {
314 case "sarama":
315 return kafka.NewSaramaClient(
Neha Sharma3f221ae2020-04-29 19:02:12 +0000316 kafka.Address(address),
cuilin20187b2a8c32019-03-26 19:52:28 -0700317 kafka.ProducerReturnOnErrors(true),
318 kafka.ProducerReturnOnSuccess(true),
319 kafka.ProducerMaxRetries(6),
Abhilash S.L3b494632019-07-16 15:51:09 +0530320 kafka.ProducerRetryBackoff(time.Millisecond*30),
321 kafka.MetadatMaxRetries(15)), nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700322 }
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700323
cuilin20187b2a8c32019-03-26 19:52:28 -0700324 return nil, errors.New("unsupported-client-type")
325}
326
Neha Sharma96b7bf22020-06-15 10:37:32 +0000327func (a *adapter) setKVClient(ctx context.Context) error {
328 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
cuilin20187b2a8c32019-03-26 19:52:28 -0700329 if err != nil {
330 a.kvClient = nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700331 return err
332 }
333 a.kvClient = client
divyadesaia37f78b2020-02-07 12:41:22 +0000334
cuilin20187b2a8c32019-03-26 19:52:28 -0700335 return nil
336}
337
khenaidoo106c61a2021-08-11 18:05:46 -0400338// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
339func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
340 logger.Infow(ctx, "starting-grpc-service", log.Fields{"service": serviceName})
341
342 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
343 logger.Infow(ctx, "grpc-service-started", log.Fields{"service": serviceName})
344
345 server.Start(ctx)
346 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
cuilin20187b2a8c32019-03-26 19:52:28 -0700347}
348
khenaidoodc2116e2021-10-19 17:33:19 -0400349func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo106c61a2021-08-11 18:05:46 -0400350 logger.Info(ctx, "adding-adapter-service")
351
352 server.AddService(func(gs *grpc.Server) {
khenaidoodc2116e2021-10-19 17:33:19 -0400353 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo106c61a2021-08-11 18:05:46 -0400354 })
355}
356
khenaidoodc2116e2021-10-19 17:33:19 -0400357func (a *adapter) addOltInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler olt_inter_adapter_service.OltInterAdapterServiceServer) {
khenaidoo106c61a2021-08-11 18:05:46 -0400358 logger.Info(ctx, "adding-olt-inter-adapter-service")
359
360 server.AddService(func(gs *grpc.Server) {
khenaidoodc2116e2021-10-19 17:33:19 -0400361 olt_inter_adapter_service.RegisterOltInterAdapterServiceServer(gs, handler)
khenaidoo106c61a2021-08-11 18:05:46 -0400362 })
363}
364
365func (a *adapter) startOpenOLT(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolodfa7a972020-11-06 13:03:40 -0800366 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenOLT, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000367 logger.Info(ctx, "starting-open-olt")
cuilin20187b2a8c32019-03-26 19:52:28 -0700368 var err error
khenaidoo106c61a2021-08-11 18:05:46 -0400369 sOLT := ac.NewOpenOLT(ctx, cc, ep, cfg, cm)
cuilin20187b2a8c32019-03-26 19:52:28 -0700370
371 if err = sOLT.Start(ctx); err != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700372 return nil, err
373 }
374
Neha Sharma96b7bf22020-06-15 10:37:32 +0000375 logger.Info(ctx, "open-olt-started")
cuilin20187b2a8c32019-03-26 19:52:28 -0700376 return sOLT, nil
377}
378
khenaidooefff76e2021-12-15 16:51:30 -0500379func (a *adapter) startOpenOLTInterAdapter(ctx context.Context, oo *ac.OpenOLT) (*ac.OpenOLTInterAdapter, error) {
380 logger.Info(ctx, "starting-open-olt-inter-adapter")
381 var err error
382 sOLTInterAdapter := ac.NewOpenOLTInterAdapter(oo)
383
384 if err = sOLTInterAdapter.Start(ctx); err != nil {
385 return nil, err
386 }
387
388 logger.Info(ctx, "open-olt-inter-adapter-started")
389 return sOLTInterAdapter, nil
390}
391
khenaidoo106c61a2021-08-11 18:05:46 -0400392func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700393 adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000394 logger.Infow(ctx, "registering-with-core", log.Fields{
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700395 "adapterID": adapterID,
396 "currentReplica": a.config.CurrentReplica,
397 "totalReplicas": a.config.TotalReplicas,
398 })
399 adapterDescription := &voltha.Adapter{
400 Id: adapterID, // Unique name for the device type
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400401 Vendor: "VOLTHA OpenOLT",
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700402 Version: version.VersionInfo.Version,
khenaidoo106c61a2021-08-11 18:05:46 -0400403 // The Endpoint refers to the address this service is listening on.
404 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700405 Type: "openolt",
406 CurrentReplica: int32(a.config.CurrentReplica),
407 TotalReplicas: int32(a.config.TotalReplicas),
408 }
409 types := []*voltha.DeviceType{{
410 Id: "openolt",
khenaidoo106c61a2021-08-11 18:05:46 -0400411 AdapterType: "openolt", // Type of the adapter that handles device type
412 Adapter: "openolt", // Deprecated attribute
Girish Gowdru0c588b22019-04-23 23:24:56 -0400413 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
414 AcceptsAddRemoveFlowUpdates: true}}
cuilin20187b2a8c32019-03-26 19:52:28 -0700415 deviceTypes := &voltha.DeviceTypes{Items: types}
416 count := 0
417 for {
khenaidoo106c61a2021-08-11 18:05:46 -0400418 gClient, err := a.coreClient.GetCoreServiceClient()
419 if gClient != nil {
khenaidoodc2116e2021-10-19 17:33:19 -0400420 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ca.AdapterRegistration{
khenaidoo106c61a2021-08-11 18:05:46 -0400421 Adapter: adapterDescription,
422 DTypes: deviceTypes}); err == nil {
423 break
cuilin20187b2a8c32019-03-26 19:52:28 -0700424 }
cuilin20187b2a8c32019-03-26 19:52:28 -0700425 }
khenaidoo106c61a2021-08-11 18:05:46 -0400426 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
427 if retries == count {
428 return err
429 }
430 count++
431 // Take a nap before retrying
432 time.Sleep(2 * time.Second)
cuilin20187b2a8c32019-03-26 19:52:28 -0700433 }
khenaidoo106c61a2021-08-11 18:05:46 -0400434 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000435 logger.Info(ctx, "registered-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700436 return nil
437}
438
Neha Sharma96b7bf22020-06-15 10:37:32 +0000439func waitForExit(ctx context.Context) int {
cuilin20187b2a8c32019-03-26 19:52:28 -0700440 signalChannel := make(chan os.Signal, 1)
441 signal.Notify(signalChannel,
442 syscall.SIGHUP,
443 syscall.SIGINT,
444 syscall.SIGTERM,
445 syscall.SIGQUIT)
446
447 exitChannel := make(chan int)
448
449 go func() {
450 s := <-signalChannel
451 switch s {
452 case syscall.SIGHUP,
453 syscall.SIGINT,
454 syscall.SIGTERM,
455 syscall.SIGQUIT:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000456 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700457 exitChannel <- 0
458 default:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000459 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700460 exitChannel <- 1
461 }
462 }()
463
464 code := <-exitChannel
465 return code
466}
467
468func printBanner() {
David K. Bainbridge794735f2020-02-11 21:01:37 -0800469 fmt.Println(` ____ ____ _ _______ `)
470 fmt.Println(` / _ \ / __ \| | |__ __|`)
471 fmt.Println(` | | | |_ __ ___ _ __ | | | | | | | `)
472 fmt.Println(` | | | | '_ \ / _ \ '_ \ | | | | | | | `)
473 fmt.Println(` | |__| | |_) | __/ | | || |__| | |____| | `)
474 fmt.Println(` \____/| .__/ \___|_| |_| \____/|______|_| `)
475 fmt.Println(` | | `)
476 fmt.Println(` |_| `)
477 fmt.Println(` `)
cuilin20187b2a8c32019-03-26 19:52:28 -0700478}
479
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400480func printVersion() {
481 fmt.Println("VOLTHA OpenOLT Adapter")
482 fmt.Println(version.VersionInfo.String(" "))
483}
484
cuilin20187b2a8c32019-03-26 19:52:28 -0700485func main() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000486 ctx := context.Background()
cuilin20187b2a8c32019-03-26 19:52:28 -0700487 start := time.Now()
488
489 cf := config.NewAdapterFlags()
490 cf.ParseCommandArguments()
491
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700492 // Setup logging
cuilin20187b2a8c32019-03-26 19:52:28 -0700493
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000494 logLevel, err := log.StringToLogLevel(cf.LogLevel)
495 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000496 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000497 }
Rohan Agrawal2488f192020-01-31 09:26:55 +0000498
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700499 // Setup default logger - applies for packages that do not have specific logger set
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000500 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
Girish Kumara1ea2aa2020-08-19 18:14:22 +0000501 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
cuilin20187b2a8c32019-03-26 19:52:28 -0700502 }
503
504 // Update all loggers (provisionned via init) with a common field
Hardik Windlassb9c869b2019-10-10 08:34:32 +0000505 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
Girish Kumara1ea2aa2020-08-19 18:14:22 +0000506 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
cuilin20187b2a8c32019-03-26 19:52:28 -0700507 }
508
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000509 log.SetAllLogLevel(logLevel)
Rohan Agrawal93bced32020-02-11 10:16:01 +0000510
Matteo Scandolo8f2b9572020-02-28 15:35:23 -0800511 realMain()
512
Kent Hagermane6ff1012020-07-14 15:07:53 -0400513 defer func() {
514 err := log.CleanUp()
515 if err != nil {
516 logger.Errorw(context.Background(), "unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
517 }
518 }()
cuilin20187b2a8c32019-03-26 19:52:28 -0700519
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400520 // Print version / build information and exit
521 if cf.DisplayVersionOnly {
522 printVersion()
523 return
524 }
525
cuilin20187b2a8c32019-03-26 19:52:28 -0700526 // Print banner if specified
527 if cf.Banner {
528 printBanner()
529 }
530
Neha Sharma96b7bf22020-06-15 10:37:32 +0000531 logger.Infow(ctx, "config", log.Fields{"config": *cf})
cuilin20187b2a8c32019-03-26 19:52:28 -0700532
533 ctx, cancel := context.WithCancel(context.Background())
534 defer cancel()
535
536 ad := newAdapter(cf)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000537
538 p := &probe.Probe{}
Neha Sharma96b7bf22020-06-15 10:37:32 +0000539 go p.ListenAndServe(ctx, ad.config.ProbeAddress)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000540
541 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
542
Girish Kumar935f7af2020-08-18 11:59:42 +0000543 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
Girish Kumar11e15972020-06-15 14:51:10 +0000544 if err != nil {
545 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
546 } else {
547 defer log.TerminateTracing(closer)
548 }
549
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000550 go ad.start(probeCtx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700551
Neha Sharma96b7bf22020-06-15 10:37:32 +0000552 code := waitForExit(ctx)
553 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
cuilin20187b2a8c32019-03-26 19:52:28 -0700554
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700555 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
556 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700557 // Cleanup before leaving
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700558 ad.stop(ctxWithCancel)
559 // Will halt any long-running stop routine gracefully
560 cancelFunc()
cuilin20187b2a8c32019-03-26 19:52:28 -0700561
562 elapsed := time.Since(start)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000563 logger.Infow(ctx, "run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
cuilin20187b2a8c32019-03-26 19:52:28 -0700564}
Joey Armstrong87b55f72023-06-27 12:12:53 -0400565
566// [EOF]