blob: be2c534022ab44cdb0eaa95dd6f8b3529c57a41d [file] [log] [blame]
cuilin20187b2a8c32019-03-26 19:52:28 -07001/*
Joey Armstrong11f5a572024-01-12 19:11:32 -05002* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
cuilin20187b2a8c32019-03-26 19:52:28 -07003
cbabu116b73f2019-12-10 17:56:32 +05304* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
cuilin20187b2a8c32019-03-26 19:52:28 -07007
cbabu116b73f2019-12-10 17:56:32 +05308* http://www.apache.org/licenses/LICENSE-2.0
cuilin20187b2a8c32019-03-26 19:52:28 -07009
cbabu116b73f2019-12-10 17:56:32 +053010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
cuilin20187b2a8c32019-03-26 19:52:28 -070015 */
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070016
Joey Armstrong87b55f72023-06-27 12:12:53 -040017// Package main invokes the application
cuilin20187b2a8c32019-03-26 19:52:28 -070018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
kdarapu381c6902019-07-31 18:23:16 +053024 "os"
25 "os/signal"
kdarapu381c6902019-07-31 18:23:16 +053026 "syscall"
27 "time"
28
Abhay Kumar9bcfeb22024-07-12 09:14:25 +053029 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
30 codes "google.golang.org/grpc/codes"
31
khenaidoo106c61a2021-08-11 18:05:46 -040032 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
33 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events"
35 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
36 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
37 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
38 "github.com/opencord/voltha-lib-go/v7/pkg/log"
39 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
40 "github.com/opencord/voltha-lib-go/v7/pkg/version"
Scott Bakerdbd960e2020-02-28 08:57:51 -080041 "github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
42 ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
khenaidoodc2116e2021-10-19 17:33:19 -040043 "github.com/opencord/voltha-protos/v5/go/adapter_service"
44 ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
45 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoodc2116e2021-10-19 17:33:19 -040046 "github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
khenaidoo106c61a2021-08-11 18:05:46 -040047 "github.com/opencord/voltha-protos/v5/go/voltha"
48 "google.golang.org/grpc"
49)
50
51const (
52 clusterMessagingService = "cluster-message-service"
53 oltAdapterService = "olt-adapter-service"
54 kvService = "kv-service"
55 coreService = "core-service"
cuilin20187b2a8c32019-03-26 19:52:28 -070056)
57
58type adapter struct {
khenaidooefff76e2021-12-15 16:51:30 -050059 instanceID string
60 config *config.AdapterFlags
61 grpcServer *vgrpc.GrpcServer
62 oltAdapter *ac.OpenOLT
63 oltInterAdapter *ac.OpenOLTInterAdapter
64 kafkaClient kafka.Client
65 kvClient kvstore.Client
66 coreClient *vgrpc.Client
67 eventProxy eventif.EventProxy
68 halted bool
69 exitChannel chan int
cuilin20187b2a8c32019-03-26 19:52:28 -070070}
71
cuilin20187b2a8c32019-03-26 19:52:28 -070072func newAdapter(cf *config.AdapterFlags) *adapter {
73 var a adapter
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070074 a.instanceID = cf.InstanceID
cuilin20187b2a8c32019-03-26 19:52:28 -070075 a.config = cf
76 a.halted = false
77 a.exitChannel = make(chan int, 1)
cuilin20187b2a8c32019-03-26 19:52:28 -070078 return &a
79}
80
81func (a *adapter) start(ctx context.Context) {
Neha Sharma96b7bf22020-06-15 10:37:32 +000082 logger.Info(ctx, "Starting Core Adapter components")
cuilin20187b2a8c32019-03-26 19:52:28 -070083 var err error
84
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000085 var p *probe.Probe
86 if value := ctx.Value(probe.ProbeContextKey); value != nil {
87 if _, ok := value.(*probe.Probe); ok {
88 p = value.(*probe.Probe)
89 p.RegisterService(
Neha Sharma96b7bf22020-06-15 10:37:32 +000090 ctx,
khenaidoo106c61a2021-08-11 18:05:46 -040091 clusterMessagingService,
92 kvService,
93 oltAdapterService,
94 coreService,
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000095 )
96 }
97 }
98
cuilin20187b2a8c32019-03-26 19:52:28 -070099 // Setup KV Client
Neha Sharma96b7bf22020-06-15 10:37:32 +0000100 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
101 if err = a.setKVClient(ctx); err != nil {
102 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700103 }
104
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000105 if p != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400106 p.UpdateStatus(ctx, kvService, probe.ServiceStatusRunning)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000107 }
108
divyadesaia37f78b2020-02-07 12:41:22 +0000109 // Setup Log Config
Neha Sharma96b7bf22020-06-15 10:37:32 +0000110 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Matteo Scandolodfa7a972020-11-06 13:03:40 -0800111
divyadesaid26f6b12020-03-19 06:30:28 +0000112 go conf.StartLogLevelConfigProcessing(cm, ctx)
Girish Kumar935f7af2020-08-18 11:59:42 +0000113 go conf.StartLogFeaturesConfigProcessing(cm, ctx)
divyadesaia37f78b2020-02-07 12:41:22 +0000114
cuilin20187b2a8c32019-03-26 19:52:28 -0700115 // Setup Kafka Client
khenaidoo106c61a2021-08-11 18:05:46 -0400116 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000117 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700118 }
119
khenaidoo106c61a2021-08-11 18:05:46 -0400120 // Start kafka communication with the broker
121 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
122 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000123 }
124
Devmalya Paulfb990a52019-07-09 10:01:49 -0400125 // Create the event proxy to post events to KAFKA
Himani Chawlacd407802020-12-10 12:08:59 +0530126 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo106c61a2021-08-11 18:05:46 -0400127 go func() {
128 if err := a.eventProxy.Start(); err != nil {
129 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
130 }
131 }()
132
133 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
134 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo27e7ac92021-12-08 14:43:09 -0500135 if a.coreClient, err = vgrpc.NewClient(
136 a.config.AdapterEndpoint,
137 a.config.CoreEndpoint,
khenaidooefff76e2021-12-15 16:51:30 -0500138 "core_service.CoreService",
khenaidoo27e7ac92021-12-08 14:43:09 -0500139 a.coreRestarted); err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400140 logger.Fatal(ctx, "grpc-client-not-created")
141 }
142 // Start the core grpc client
nikesh.krishnan6dd882b2023-03-14 10:02:41 +0530143 retryCodes := []codes.Code{
144 codes.Unavailable, // server is currently unavailable
145 codes.DeadlineExceeded, // deadline for the operation was exceeded
146 }
nikesh.krishnan97e74d22023-06-28 13:54:01 +0530147 // the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
148 backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
149 grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
nikesh.krishnan6dd882b2023-03-14 10:02:41 +0530150 logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
151 go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
Devmalya Paulfb990a52019-07-09 10:01:49 -0400152
cuilin20187b2a8c32019-03-26 19:52:28 -0700153 // Create the open OLT adapter
khenaidoo106c61a2021-08-11 18:05:46 -0400154 if a.oltAdapter, err = a.startOpenOLT(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000155 logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700156 }
157
khenaidooefff76e2021-12-15 16:51:30 -0500158 // Create the open OLT Inter adapter adapter
159 if a.oltInterAdapter, err = a.startOpenOLTInterAdapter(ctx, a.oltAdapter); err != nil {
160 logger.Fatalw(ctx, "error-starting-openolt-inter-adapter", log.Fields{"error": err})
161 }
162
khenaidoo106c61a2021-08-11 18:05:46 -0400163 // Create and start the grpc server
164 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
165
166 //Register the adapter service
167 a.addAdapterService(ctx, a.grpcServer, a.oltAdapter)
168
169 //Register the olt inter-adapter service
khenaidooefff76e2021-12-15 16:51:30 -0500170 a.addOltInterAdapterService(ctx, a.grpcServer, a.oltInterAdapter)
khenaidoo106c61a2021-08-11 18:05:46 -0400171
172 // Start the grpc server
173 go a.startGRPCService(ctx, a.grpcServer, oltAdapterService)
cuilin20187b2a8c32019-03-26 19:52:28 -0700174
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700175 // Register this adapter to the Core - retries indefinitely
khenaidoo106c61a2021-08-11 18:05:46 -0400176 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000177 logger.Fatal(ctx, "error-registering-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700178 }
cbabu95f21522019-11-13 14:25:18 +0100179
cbabu116b73f2019-12-10 17:56:32 +0530180 // check the readiness and liveliness and update the probe status
181 a.checkServicesReadiness(ctx)
cbabu95f21522019-11-13 14:25:18 +0100182}
183
khenaidoo106c61a2021-08-11 18:05:46 -0400184// TODO: Any action the adapter needs to do following a Core restart?
185func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
186 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
187 return nil
188}
189
khenaidooefff76e2021-12-15 16:51:30 -0500190// getCoreServiceClientHandler is used to test whether the remote gRPC service is up
191func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
192 if conn == nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400193 return nil
194 }
khenaidooefff76e2021-12-15 16:51:30 -0500195 return core_service.NewCoreServiceClient(conn)
khenaidoo106c61a2021-08-11 18:05:46 -0400196}
197
Joey Armstrong87b55f72023-06-27 12:12:53 -0400198/*
199*
cbabu95f21522019-11-13 14:25:18 +0100200This function checks the liveliness and readiness of the kakfa and kv-client services
201and update the status in the probe.
202*/
cbabu116b73f2019-12-10 17:56:32 +0530203func (a *adapter) checkServicesReadiness(ctx context.Context) {
204 // checks the kafka readiness
khenaidoo106c61a2021-08-11 18:05:46 -0400205 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
cbabu116b73f2019-12-10 17:56:32 +0530206
207 // checks the kv-store readiness
208 go a.checkKvStoreReadiness(ctx)
209}
210
Joey Armstrong87b55f72023-06-27 12:12:53 -0400211/*
212*
cbabu116b73f2019-12-10 17:56:32 +0530213This function checks the liveliness and readiness of the kv-store service
214and update the status in the probe.
215*/
216func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
217 // dividing the live probe interval by 2 to get updated status every 30s
218 timeout := a.config.LiveProbeInterval / 2
219 kvStoreChannel := make(chan bool, 1)
220
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700221 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
222 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
223 cancelFunc()
224
cbabu95f21522019-11-13 14:25:18 +0100225 for {
cbabu116b73f2019-12-10 17:56:32 +0530226 timeoutTimer := time.NewTimer(timeout)
227 select {
228 case liveliness := <-kvStoreChannel:
229 if !liveliness {
230 // kv-store not reachable or down, updating the status to not ready state
khenaidoo106c61a2021-08-11 18:05:46 -0400231 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
cbabu116b73f2019-12-10 17:56:32 +0530232 timeout = a.config.NotLiveProbeInterval
233 } else {
234 // kv-store is reachable , updating the status to running state
khenaidoo106c61a2021-08-11 18:05:46 -0400235 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
cbabu116b73f2019-12-10 17:56:32 +0530236 timeout = a.config.LiveProbeInterval / 2
237 }
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700238
cbabu116b73f2019-12-10 17:56:32 +0530239 // Check if the timer has expired or not
240 if !timeoutTimer.Stop() {
241 <-timeoutTimer.C
242 }
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700243
cbabu116b73f2019-12-10 17:56:32 +0530244 case <-timeoutTimer.C:
Girish Kumarbeadc112020-02-26 18:41:02 +0000245 // Check the status of the kv-store. Use timeout of 2 seconds to avoid forever blocking
Neha Sharma96b7bf22020-06-15 10:37:32 +0000246 logger.Info(ctx, "kv-store liveliness-recheck")
Girish Kumarbeadc112020-02-26 18:41:02 +0000247 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
248
249 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
250 // Cleanup cancel func resources
251 cancelFunc()
cbabu95f21522019-11-13 14:25:18 +0100252 }
cbabu116b73f2019-12-10 17:56:32 +0530253 }
254}
255
npujarec5762e2020-01-01 14:08:48 +0530256func (a *adapter) stop(ctx context.Context) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700257 // Stop leadership tracking
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700258 a.halted = true
cuilin20187b2a8c32019-03-26 19:52:28 -0700259
260 // send exit signal
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700261 a.exitChannel <- 0
cuilin20187b2a8c32019-03-26 19:52:28 -0700262
khenaidooefff76e2021-12-15 16:51:30 -0500263 // Stop all grpc processing
264 if err := a.oltAdapter.Stop(ctx); err != nil {
265 logger.Errorw(ctx, "failure-stopping-olt-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
266 }
267 if err := a.oltInterAdapter.Stop(ctx); err != nil {
268 logger.Errorw(ctx, "failure-stopping-olt-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
269 }
270
cuilin20187b2a8c32019-03-26 19:52:28 -0700271 // Cleanup - applies only if we had a kvClient
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700272 if a.kvClient != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700273 // Release all reservations
npujarec5762e2020-01-01 14:08:48 +0530274 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000275 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700276 }
277 // Close the DB connection
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700278 go a.kvClient.Close(ctx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700279 }
280
khenaidoo106c61a2021-08-11 18:05:46 -0400281 if a.eventProxy != nil {
282 a.eventProxy.Stop()
Scott Bakere701b862020-02-20 16:19:16 -0800283 }
284
khenaidoo106c61a2021-08-11 18:05:46 -0400285 if a.kafkaClient != nil {
286 a.kafkaClient.Stop(ctx)
287 }
288
289 // Stop core client
290 if a.coreClient != nil {
291 a.coreClient.Stop(ctx)
292 }
293
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700294 logger.Info(ctx, "main-stop-processing-complete")
295
khenaidoo106c61a2021-08-11 18:05:46 -0400296 // TODO: Stop child devices connections
297
cuilin20187b2a8c32019-03-26 19:52:28 -0700298 // TODO: More cleanup
299}
300
Neha Sharma96b7bf22020-06-15 10:37:32 +0000301func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700302
Neha Sharma96b7bf22020-06-15 10:37:32 +0000303 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700304 switch storeType {
cuilin20187b2a8c32019-03-26 19:52:28 -0700305 case "etcd":
Neha Sharma96b7bf22020-06-15 10:37:32 +0000306 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Abhay Kumar9bcfeb22024-07-12 09:14:25 +0530307 case "redis":
308 return kvstore.NewRedisClient(address, timeout, false)
309 case "redis-sentinel":
310 return kvstore.NewRedisClient(address, timeout, true)
cuilin20187b2a8c32019-03-26 19:52:28 -0700311 }
312 return nil, errors.New("unsupported-kv-store")
313}
314
Neha Sharma96b7bf22020-06-15 10:37:32 +0000315func newKafkaClient(ctx context.Context, clientType, address string) (kafka.Client, error) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700316
Neha Sharma96b7bf22020-06-15 10:37:32 +0000317 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700318 switch clientType {
319 case "sarama":
320 return kafka.NewSaramaClient(
Neha Sharma3f221ae2020-04-29 19:02:12 +0000321 kafka.Address(address),
cuilin20187b2a8c32019-03-26 19:52:28 -0700322 kafka.ProducerReturnOnErrors(true),
323 kafka.ProducerReturnOnSuccess(true),
324 kafka.ProducerMaxRetries(6),
Abhilash S.L3b494632019-07-16 15:51:09 +0530325 kafka.ProducerRetryBackoff(time.Millisecond*30),
326 kafka.MetadatMaxRetries(15)), nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700327 }
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700328
cuilin20187b2a8c32019-03-26 19:52:28 -0700329 return nil, errors.New("unsupported-client-type")
330}
331
Neha Sharma96b7bf22020-06-15 10:37:32 +0000332func (a *adapter) setKVClient(ctx context.Context) error {
333 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
cuilin20187b2a8c32019-03-26 19:52:28 -0700334 if err != nil {
335 a.kvClient = nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700336 return err
337 }
338 a.kvClient = client
divyadesaia37f78b2020-02-07 12:41:22 +0000339
cuilin20187b2a8c32019-03-26 19:52:28 -0700340 return nil
341}
342
khenaidoo106c61a2021-08-11 18:05:46 -0400343// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
344func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
345 logger.Infow(ctx, "starting-grpc-service", log.Fields{"service": serviceName})
346
347 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
348 logger.Infow(ctx, "grpc-service-started", log.Fields{"service": serviceName})
349
350 server.Start(ctx)
351 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
cuilin20187b2a8c32019-03-26 19:52:28 -0700352}
353
khenaidoodc2116e2021-10-19 17:33:19 -0400354func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo106c61a2021-08-11 18:05:46 -0400355 logger.Info(ctx, "adding-adapter-service")
356
357 server.AddService(func(gs *grpc.Server) {
khenaidoodc2116e2021-10-19 17:33:19 -0400358 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo106c61a2021-08-11 18:05:46 -0400359 })
360}
361
khenaidoodc2116e2021-10-19 17:33:19 -0400362func (a *adapter) addOltInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler olt_inter_adapter_service.OltInterAdapterServiceServer) {
khenaidoo106c61a2021-08-11 18:05:46 -0400363 logger.Info(ctx, "adding-olt-inter-adapter-service")
364
365 server.AddService(func(gs *grpc.Server) {
khenaidoodc2116e2021-10-19 17:33:19 -0400366 olt_inter_adapter_service.RegisterOltInterAdapterServiceServer(gs, handler)
khenaidoo106c61a2021-08-11 18:05:46 -0400367 })
368}
369
370func (a *adapter) startOpenOLT(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolodfa7a972020-11-06 13:03:40 -0800371 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenOLT, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000372 logger.Info(ctx, "starting-open-olt")
cuilin20187b2a8c32019-03-26 19:52:28 -0700373 var err error
khenaidoo106c61a2021-08-11 18:05:46 -0400374 sOLT := ac.NewOpenOLT(ctx, cc, ep, cfg, cm)
cuilin20187b2a8c32019-03-26 19:52:28 -0700375
376 if err = sOLT.Start(ctx); err != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700377 return nil, err
378 }
379
Neha Sharma96b7bf22020-06-15 10:37:32 +0000380 logger.Info(ctx, "open-olt-started")
cuilin20187b2a8c32019-03-26 19:52:28 -0700381 return sOLT, nil
382}
383
khenaidooefff76e2021-12-15 16:51:30 -0500384func (a *adapter) startOpenOLTInterAdapter(ctx context.Context, oo *ac.OpenOLT) (*ac.OpenOLTInterAdapter, error) {
385 logger.Info(ctx, "starting-open-olt-inter-adapter")
386 var err error
387 sOLTInterAdapter := ac.NewOpenOLTInterAdapter(oo)
388
389 if err = sOLTInterAdapter.Start(ctx); err != nil {
390 return nil, err
391 }
392
393 logger.Info(ctx, "open-olt-inter-adapter-started")
394 return sOLTInterAdapter, nil
395}
396
khenaidoo106c61a2021-08-11 18:05:46 -0400397func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700398 adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000399 logger.Infow(ctx, "registering-with-core", log.Fields{
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700400 "adapterID": adapterID,
401 "currentReplica": a.config.CurrentReplica,
402 "totalReplicas": a.config.TotalReplicas,
403 })
404 adapterDescription := &voltha.Adapter{
405 Id: adapterID, // Unique name for the device type
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400406 Vendor: "VOLTHA OpenOLT",
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700407 Version: version.VersionInfo.Version,
khenaidoo106c61a2021-08-11 18:05:46 -0400408 // The Endpoint refers to the address this service is listening on.
409 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700410 Type: "openolt",
411 CurrentReplica: int32(a.config.CurrentReplica),
412 TotalReplicas: int32(a.config.TotalReplicas),
413 }
414 types := []*voltha.DeviceType{{
415 Id: "openolt",
khenaidoo106c61a2021-08-11 18:05:46 -0400416 AdapterType: "openolt", // Type of the adapter that handles device type
417 Adapter: "openolt", // Deprecated attribute
Girish Gowdru0c588b22019-04-23 23:24:56 -0400418 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
419 AcceptsAddRemoveFlowUpdates: true}}
cuilin20187b2a8c32019-03-26 19:52:28 -0700420 deviceTypes := &voltha.DeviceTypes{Items: types}
421 count := 0
422 for {
khenaidoo106c61a2021-08-11 18:05:46 -0400423 gClient, err := a.coreClient.GetCoreServiceClient()
424 if gClient != nil {
khenaidoodc2116e2021-10-19 17:33:19 -0400425 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ca.AdapterRegistration{
khenaidoo106c61a2021-08-11 18:05:46 -0400426 Adapter: adapterDescription,
427 DTypes: deviceTypes}); err == nil {
428 break
cuilin20187b2a8c32019-03-26 19:52:28 -0700429 }
cuilin20187b2a8c32019-03-26 19:52:28 -0700430 }
khenaidoo106c61a2021-08-11 18:05:46 -0400431 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
432 if retries == count {
433 return err
434 }
435 count++
436 // Take a nap before retrying
437 time.Sleep(2 * time.Second)
cuilin20187b2a8c32019-03-26 19:52:28 -0700438 }
khenaidoo106c61a2021-08-11 18:05:46 -0400439 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000440 logger.Info(ctx, "registered-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700441 return nil
442}
443
Neha Sharma96b7bf22020-06-15 10:37:32 +0000444func waitForExit(ctx context.Context) int {
cuilin20187b2a8c32019-03-26 19:52:28 -0700445 signalChannel := make(chan os.Signal, 1)
446 signal.Notify(signalChannel,
447 syscall.SIGHUP,
448 syscall.SIGINT,
449 syscall.SIGTERM,
450 syscall.SIGQUIT)
451
452 exitChannel := make(chan int)
453
454 go func() {
455 s := <-signalChannel
456 switch s {
457 case syscall.SIGHUP,
458 syscall.SIGINT,
459 syscall.SIGTERM,
460 syscall.SIGQUIT:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000461 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700462 exitChannel <- 0
463 default:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000464 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700465 exitChannel <- 1
466 }
467 }()
468
469 code := <-exitChannel
470 return code
471}
472
473func printBanner() {
David K. Bainbridge794735f2020-02-11 21:01:37 -0800474 fmt.Println(` ____ ____ _ _______ `)
475 fmt.Println(` / _ \ / __ \| | |__ __|`)
476 fmt.Println(` | | | |_ __ ___ _ __ | | | | | | | `)
477 fmt.Println(` | | | | '_ \ / _ \ '_ \ | | | | | | | `)
478 fmt.Println(` | |__| | |_) | __/ | | || |__| | |____| | `)
479 fmt.Println(` \____/| .__/ \___|_| |_| \____/|______|_| `)
480 fmt.Println(` | | `)
481 fmt.Println(` |_| `)
482 fmt.Println(` `)
cuilin20187b2a8c32019-03-26 19:52:28 -0700483}
484
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400485func printVersion() {
486 fmt.Println("VOLTHA OpenOLT Adapter")
487 fmt.Println(version.VersionInfo.String(" "))
488}
489
cuilin20187b2a8c32019-03-26 19:52:28 -0700490func main() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000491 ctx := context.Background()
cuilin20187b2a8c32019-03-26 19:52:28 -0700492 start := time.Now()
493
494 cf := config.NewAdapterFlags()
495 cf.ParseCommandArguments()
496
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700497 // Setup logging
cuilin20187b2a8c32019-03-26 19:52:28 -0700498
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000499 logLevel, err := log.StringToLogLevel(cf.LogLevel)
500 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000501 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000502 }
Rohan Agrawal2488f192020-01-31 09:26:55 +0000503
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700504 // Setup default logger - applies for packages that do not have specific logger set
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000505 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
Girish Kumara1ea2aa2020-08-19 18:14:22 +0000506 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
cuilin20187b2a8c32019-03-26 19:52:28 -0700507 }
508
509 // Update all loggers (provisionned via init) with a common field
Hardik Windlassb9c869b2019-10-10 08:34:32 +0000510 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
Girish Kumara1ea2aa2020-08-19 18:14:22 +0000511 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
cuilin20187b2a8c32019-03-26 19:52:28 -0700512 }
513
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000514 log.SetAllLogLevel(logLevel)
Rohan Agrawal93bced32020-02-11 10:16:01 +0000515
Matteo Scandolo8f2b9572020-02-28 15:35:23 -0800516 realMain()
517
Kent Hagermane6ff1012020-07-14 15:07:53 -0400518 defer func() {
519 err := log.CleanUp()
520 if err != nil {
521 logger.Errorw(context.Background(), "unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
522 }
523 }()
cuilin20187b2a8c32019-03-26 19:52:28 -0700524
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400525 // Print version / build information and exit
526 if cf.DisplayVersionOnly {
527 printVersion()
528 return
529 }
530
cuilin20187b2a8c32019-03-26 19:52:28 -0700531 // Print banner if specified
532 if cf.Banner {
533 printBanner()
534 }
535
Neha Sharma96b7bf22020-06-15 10:37:32 +0000536 logger.Infow(ctx, "config", log.Fields{"config": *cf})
cuilin20187b2a8c32019-03-26 19:52:28 -0700537
538 ctx, cancel := context.WithCancel(context.Background())
539 defer cancel()
540
541 ad := newAdapter(cf)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000542
543 p := &probe.Probe{}
Neha Sharma96b7bf22020-06-15 10:37:32 +0000544 go p.ListenAndServe(ctx, ad.config.ProbeAddress)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000545
546 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
547
Girish Kumar935f7af2020-08-18 11:59:42 +0000548 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
Girish Kumar11e15972020-06-15 14:51:10 +0000549 if err != nil {
550 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
551 } else {
552 defer log.TerminateTracing(closer)
553 }
554
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000555 go ad.start(probeCtx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700556
Neha Sharma96b7bf22020-06-15 10:37:32 +0000557 code := waitForExit(ctx)
558 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
cuilin20187b2a8c32019-03-26 19:52:28 -0700559
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700560 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
561 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700562 // Cleanup before leaving
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700563 ad.stop(ctxWithCancel)
564 // Will halt any long-running stop routine gracefully
565 cancelFunc()
cuilin20187b2a8c32019-03-26 19:52:28 -0700566
567 elapsed := time.Since(start)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000568 logger.Infow(ctx, "run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
cuilin20187b2a8c32019-03-26 19:52:28 -0700569}
Joey Armstrong87b55f72023-06-27 12:12:53 -0400570
571// [EOF]