blob: a21629f1a2d19caf49aaefb7821e166dc54fcf58 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
Joey Armstrong89c812c2024-01-12 19:00:20 -05002 * Copyright 2020-2024 Open Networking Foundation (ONF) and the ONF Contributors
Holger Hildebrandtfa074992020-03-27 15:42:06 +00003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Joey Armstrong89c812c2024-01-12 19:00:20 -050017// Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
24 "os"
25 "os/signal"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010026 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000027 "syscall"
28 "time"
29
nikesh.krishnanca4afa32023-06-28 03:42:16 +053030 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
khenaidoo7d3c5582021-08-11 18:09:44 -040031 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
32 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v7/pkg/events"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
35 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
36 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v7/pkg/log"
38 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
39 "github.com/opencord/voltha-lib-go/v7/pkg/version"
khenaidoo42dcdfd2021-10-19 17:34:12 -040040 "github.com/opencord/voltha-protos/v5/go/adapter_service"
41 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoo42dcdfd2021-10-19 17:34:12 -040042 "github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
khenaidoo7d3c5582021-08-11 18:09:44 -040043 "github.com/opencord/voltha-protos/v5/go/voltha"
44 "google.golang.org/grpc"
nikesh.krishnanca4afa32023-06-28 03:42:16 +053045 codes "google.golang.org/grpc/codes"
khenaidoo7d3c5582021-08-11 18:09:44 -040046
khenaidoo42dcdfd2021-10-19 17:34:12 -040047 "github.com/opencord/voltha-protos/v5/go/core_adapter"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000048
Matteo Scandolo761f7512020-11-23 15:52:40 -080049 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
Holger Hildebrandt4b5e73f2021-08-19 06:51:21 +000050 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/core"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000051)
52
khenaidoo7d3c5582021-08-11 18:09:44 -040053const (
54 clusterMessagingService = "cluster-message-service"
55 onuAdapterService = "onu-adapter-service"
56 kvService = "kv-service"
57 coreService = "core-service"
58)
59
Holger Hildebrandtfa074992020-03-27 15:42:06 +000060type adapter struct {
khenaidoof3333552021-12-15 16:52:31 -050061 kafkaClient kafka.Client
62 kvClient kvstore.Client
63 eventProxy eventif.EventProxy
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +053064 config *config.AdapterFlags
khenaidoof3333552021-12-15 16:52:31 -050065 grpcServer *vgrpc.GrpcServer
66 onuAdapter *ac.OpenONUAC
67 onuInterAdapter *ac.OpenONUACInterAdapter
68 coreClient *vgrpc.Client
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +053069 //defaultAppName string
70 instanceID string
Holger Hildebrandtfa074992020-03-27 15:42:06 +000071}
72
Holger Hildebrandtfa074992020-03-27 15:42:06 +000073func newAdapter(cf *config.AdapterFlags) *adapter {
74 var a adapter
75 a.instanceID = cf.InstanceID
76 a.config = cf
Holger Hildebrandtfa074992020-03-27 15:42:06 +000077 return &a
78}
79
80func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000081 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000082 var err error
83
84 var p *probe.Probe
85 if value := ctx.Value(probe.ProbeContextKey); value != nil {
86 if _, ok := value.(*probe.Probe); ok {
87 p = value.(*probe.Probe)
88 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000089 ctx,
khenaidoo7d3c5582021-08-11 18:09:44 -040090 clusterMessagingService,
91 kvService,
92 onuAdapterService,
93 coreService,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000094 )
95 }
96 }
97
98 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +000099 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
100 if err = a.setKVClient(ctx); err != nil {
101 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000102 }
103
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000104 // Setup Log Config
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800105 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000106 go conf.StartLogLevelConfigProcessing(cm, ctx)
107
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000108 // Setup Kafka Client
khenaidoo7d3c5582021-08-11 18:09:44 -0400109 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000110 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000111 }
112
khenaidoo7d3c5582021-08-11 18:09:44 -0400113 // Start kafka communication with the broker
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530114 if err = kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400115 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000116 }
117
khenaidoo7d3c5582021-08-11 18:09:44 -0400118 // Wait until connection to KV store is established
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530119 if err = WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400120 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000121 }
122
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000123 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530124 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo7d3c5582021-08-11 18:09:44 -0400125 go func() {
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530126 if err = a.eventProxy.Start(); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400127 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
128 }
129 }()
130
131 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
132 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo55cebc62021-12-08 14:44:41 -0500133 if a.coreClient, err = vgrpc.NewClient(
134 a.config.AdapterEndpoint,
135 a.config.CoreEndpoint,
khenaidoof3333552021-12-15 16:52:31 -0500136 "core_service.CoreService",
khenaidoo55cebc62021-12-08 14:44:41 -0500137 a.coreRestarted); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400138 logger.Fatal(ctx, "grpc-client-not-created")
139 }
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530140 // the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
141 backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
142
143 retryCodes := []codes.Code{
144 codes.Unavailable, // server is currently unavailable
145 codes.DeadlineExceeded, // deadline for the operation was exceeded
146 }
147 grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
148 logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
khenaidoo7d3c5582021-08-11 18:09:44 -0400149 // Start the core grpc client
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530150 go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000151
152 // Create the open ONU interface adapter
khenaidoof3333552021-12-15 16:52:31 -0500153 if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
154 logger.Fatalw(ctx, "error-starting-startONUAdapter", log.Fields{"error": err})
155 }
156
157 // Create the open ONU Inter adapter
158 if a.onuInterAdapter, err = a.startONUInterAdapter(ctx, a.onuAdapter); err != nil {
159 logger.Fatalw(ctx, "error-starting-startONUInterAdapter", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000160 }
161
khenaidoo7d3c5582021-08-11 18:09:44 -0400162 // Create and start the grpc server
163 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
164
165 //Register the adapter service
166 a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
167
168 //Register the onu inter adapter service
khenaidoof3333552021-12-15 16:52:31 -0500169 a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuInterAdapter)
khenaidoo7d3c5582021-08-11 18:09:44 -0400170
171 go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000172
173 // Register this adapter to the Core - retries indefinitely
khenaidoo7d3c5582021-08-11 18:09:44 -0400174 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000175 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000176 }
177
khenaidoo7d3c5582021-08-11 18:09:44 -0400178 // Start the readiness and liveliness check and update the probe status
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000179 a.checkServicesReadiness(ctx)
180 return err
181}
182
khenaidoo7d3c5582021-08-11 18:09:44 -0400183// TODO: Any action the adapter needs to do following a Core restart?
184func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
185 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
186 return nil
187}
188
khenaidoof3333552021-12-15 16:52:31 -0500189// getCoreServiceClientHandler is used to setup the remote gRPC service
190func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
191 if conn == nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400192 return nil
193 }
khenaidoof3333552021-12-15 16:52:31 -0500194 return core_service.NewCoreServiceClient(conn)
khenaidoo7d3c5582021-08-11 18:09:44 -0400195}
196
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000197func (a *adapter) stop(ctx context.Context) {
khenaidoof3333552021-12-15 16:52:31 -0500198 // Cleanup the grpc services first
199 if err := a.onuAdapter.Stop(ctx); err != nil {
200 logger.Errorw(ctx, "failure-stopping-onu-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
201 }
202 if err := a.onuInterAdapter.Stop(ctx); err != nil {
203 logger.Errorw(ctx, "failure-stopping-onu-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
204 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000205 // Cleanup - applies only if we had a kvClient
206 if a.kvClient != nil {
207 // Release all reservations
208 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000209 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000210 }
211 // Close the DB connection
Girish Gowdra55507832022-06-01 18:12:06 -0700212 go a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000213 }
214
khenaidoo7d3c5582021-08-11 18:09:44 -0400215 if a.eventProxy != nil {
216 a.eventProxy.Stop()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000217 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400218
219 if a.kafkaClient != nil {
220 a.kafkaClient.Stop(ctx)
221 }
222
223 // Stop core client
224 if a.coreClient != nil {
225 a.coreClient.Stop(ctx)
226 }
227
228 // TODO: More cleanup
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000229}
230
231// #############################################
232// Adapter Utility methods ##### begin #########
233
dbainbri4d3a0dc2020-12-02 00:33:42 +0000234func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
235 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000236 switch storeType {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000237 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000238 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Abhay Kumar3282a142024-07-12 06:03:12 +0530239 case "redis":
240 return kvstore.NewRedisClient(address, timeout, false)
241 case "redis-sentinel":
242 return kvstore.NewRedisClient(address, timeout, true)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000243 }
244 return nil, errors.New("unsupported-kv-store")
245}
246
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800247func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000248
dbainbri4d3a0dc2020-12-02 00:33:42 +0000249 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000250
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000251 switch clientType {
252 case "sarama":
253 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000254 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000255 kafka.ProducerReturnOnErrors(true),
256 kafka.ProducerReturnOnSuccess(true),
257 kafka.ProducerMaxRetries(6),
258 kafka.ProducerRetryBackoff(time.Millisecond*30),
259 kafka.MetadatMaxRetries(15)), nil
260 }
261
262 return nil, errors.New("unsupported-client-type")
263}
264
dbainbri4d3a0dc2020-12-02 00:33:42 +0000265func (a *adapter) setKVClient(ctx context.Context) error {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800266 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000267 if err != nil {
268 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000269 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000270 return err
271 }
272 a.kvClient = client
273 return nil
274}
275
khenaidoof3333552021-12-15 16:52:31 -0500276func (a *adapter) startONUAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800277 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000278 var err error
khenaidoo7d3c5582021-08-11 18:09:44 -0400279 sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000280
281 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000282 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000283 return nil, err
284 }
285
dbainbri4d3a0dc2020-12-02 00:33:42 +0000286 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000287 return sAcONU, nil
288}
289
khenaidoof3333552021-12-15 16:52:31 -0500290func (a *adapter) startONUInterAdapter(ctx context.Context, onuA *ac.OpenONUAC) (*ac.OpenONUACInterAdapter, error) {
291 var err error
292 sAcONUInterAdapter := ac.NewOpenONUACAdapter(ctx, onuA)
293
294 if err = sAcONUInterAdapter.Start(ctx); err != nil {
295 logger.Fatalw(ctx, "error-starting-OpenONUACInterAdapter", log.Fields{"error": err})
296 return nil, err
297 }
298
299 logger.Info(ctx, "OpenONUACInterAdapter-started")
300 return sAcONUInterAdapter, nil
301}
302
khenaidoo7d3c5582021-08-11 18:09:44 -0400303func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000304 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100305 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000306 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000307 "adapterID": adapterID,
308 "currentReplica": a.config.CurrentReplica,
309 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100310 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000311 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700312 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100313 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
314 Vendor: "VOLTHA OpenONUGo",
315 Version: version.VersionInfo.Version,
khenaidoo7d3c5582021-08-11 18:09:44 -0400316 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700317 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000318 CurrentReplica: int32(a.config.CurrentReplica),
319 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700320 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000321 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100322 VendorIds: vendorIdsList,
khenaidoo7d3c5582021-08-11 18:09:44 -0400323 AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
324 Adapter: "brcm_openomci_onu", // Deprecated attribute
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000325 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
326 AcceptsAddRemoveFlowUpdates: true}}
327 deviceTypes := &voltha.DeviceTypes{Items: types}
328 count := 0
329 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400330 gClient, err := a.coreClient.GetCoreServiceClient()
331 if gClient != nil {
332 if gClient != nil {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400333 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &core_adapter.AdapterRegistration{
khenaidoo7d3c5582021-08-11 18:09:44 -0400334 Adapter: adapterDescription,
335 DTypes: deviceTypes}); err == nil {
336 break
337 }
338 }
339 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000340 if retries == count {
341 return err
342 }
343 count++
khenaidoo7d3c5582021-08-11 18:09:44 -0400344 // Take a power nap before retrying
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000345 time.Sleep(2 * time.Second)
khenaidoo7d3c5582021-08-11 18:09:44 -0400346
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000347 }
348 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400349 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000350 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000351 return nil
352}
353
khenaidoo7d3c5582021-08-11 18:09:44 -0400354// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
355func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
356 logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
357
358 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
359 logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
360
361 server.Start(ctx)
362 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
363}
364
khenaidoo42dcdfd2021-10-19 17:34:12 -0400365func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400366 logger.Info(ctx, "adding-adapter-service")
367
368 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400369 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400370 })
371}
372
khenaidoo42dcdfd2021-10-19 17:34:12 -0400373func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler onu_inter_adapter_service.OnuInterAdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400374 logger.Info(ctx, "adding-onu-inter-adapter-service")
375
376 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400377 onu_inter_adapter_service.RegisterOnuInterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400378 })
379}
380
Joey Armstrong89c812c2024-01-12 19:00:20 -0500381/*
382*
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000383This function checks the liveliness and readiness of the kakfa and kv-client services
384and update the status in the probe.
385*/
386func (a *adapter) checkServicesReadiness(ctx context.Context) {
387 // checks the kafka readiness
khenaidoo7d3c5582021-08-11 18:09:44 -0400388 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000389
390 // checks the kv-store readiness
391 go a.checkKvStoreReadiness(ctx)
392}
393
Joey Armstrong89c812c2024-01-12 19:00:20 -0500394/*
395*
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000396This function checks the liveliness and readiness of the kv-store service
397and update the status in the probe.
398*/
399func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
400 // dividing the live probe interval by 2 to get updated status every 30s
401 timeout := a.config.LiveProbeInterval / 2
402 kvStoreChannel := make(chan bool, 1)
403
khenaidoo7d3c5582021-08-11 18:09:44 -0400404 // Default true - we are here only after we already had a KV store connection
405 kvStoreChannel <- true
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000406 for {
407 timeoutTimer := time.NewTimer(timeout)
408 select {
409 case liveliness := <-kvStoreChannel:
410 if !liveliness {
411 // kv-store not reachable or down, updating the status to not ready state
khenaidoo7d3c5582021-08-11 18:09:44 -0400412 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000413 timeout = a.config.NotLiveProbeInterval
414 } else {
415 // kv-store is reachable , updating the status to running state
khenaidoo7d3c5582021-08-11 18:09:44 -0400416 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000417 timeout = a.config.LiveProbeInterval / 2
418 }
419 // Check if the timer has expired or not
420 if !timeoutTimer.Stop() {
421 <-timeoutTimer.C
422 }
423 case <-timeoutTimer.C:
424 // Check the status of the kv-store
Akash Soni840f8d62024-12-11 19:37:06 +0530425 logger.Debug(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000426 if a.kvClient.IsConnectionUp(ctx) {
427 kvStoreChannel <- true
428 } else {
429 kvStoreChannel <- false
430 }
431 }
432 }
433}
434
khenaidoo7d3c5582021-08-11 18:09:44 -0400435// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
436// context times out.
437func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
438 if kvClient == nil {
439 return errors.New("kvclient-is-nil")
440 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000441 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400442 if !kvClient.IsConnectionUp(ctx) {
443 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
444 logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
445 select {
446 case <-time.After(connectionRetryInterval):
447 continue
448 case <-ctx.Done():
449 return ctx.Err()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000450 }
451 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400452 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
453 logger.Info(ctx, "kv-connection-up")
454 break
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000455 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400456 return nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000457}
458
459// Adapter Utility methods ##### end #########
460// #############################################
461
dbainbri4d3a0dc2020-12-02 00:33:42 +0000462func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000463 if version.VersionInfo.Version == "unknown-version" {
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530464 content, err := os.ReadFile("VERSION")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000465 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100466 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000467 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000468 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000469 }
470 return version.VersionInfo.Version
471}
472
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000473func printVersion(appName string) {
474 fmt.Println(appName)
475 fmt.Println(version.VersionInfo.String(" "))
476}
477
478func printBanner() {
479 fmt.Println(" ____ ____ ___ ___ _ ")
480 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
481 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
482 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
483 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
484 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
485 fmt.Println(" | | __| |")
486 fmt.Println(" |_| |____/")
487 fmt.Println(" ")
488}
489
490func waitForExit(ctx context.Context) int {
491 signalChannel := make(chan os.Signal, 1)
492 signal.Notify(signalChannel,
493 syscall.SIGHUP,
494 syscall.SIGINT,
495 syscall.SIGTERM,
496 syscall.SIGQUIT)
497
498 exitChannel := make(chan int)
499
500 go func() {
501 select {
502 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000503 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000504 exitChannel <- 2
505 case s := <-signalChannel:
506 switch s {
507 case syscall.SIGHUP,
508 syscall.SIGINT,
509 syscall.SIGTERM,
510 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000511 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000512 exitChannel <- 0
513 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000514 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000515 exitChannel <- 1
516 }
517 }
518 }()
519
520 code := <-exitChannel
521 return code
522}
523
524func main() {
525 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000526 ctx, cancel := context.WithCancel(context.Background())
527 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000528
khenaidoo7d3c5582021-08-11 18:09:44 -0400529 cf := &config.AdapterFlags{}
530 cf.ParseCommandArguments(os.Args[1:])
531
dbainbri4d3a0dc2020-12-02 00:33:42 +0000532 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000533
534 // Setup logging
535
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000536 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700537 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000538 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700539 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000540
541 // Setup default logger - applies for packages that do not have specific logger set
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530542 if _, err = log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000543 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000544 }
545
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000546 // Update all loggers (provisioned via init) with a common field
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530547 if err = log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000548 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000549 }
550
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000551 log.SetAllLogLevel(logLevel)
552
dbainbri4d3a0dc2020-12-02 00:33:42 +0000553 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000554
Himani Chawla4d908332020-08-31 12:30:20 +0530555 defer func() {
556 _ = log.CleanUp()
557 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000558 // Print version / build information and exit
559 if cf.DisplayVersionOnly {
560 printVersion(defaultAppName)
561 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000562 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000563 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
564 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
565 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000566
567 // Print banner if specified
568 if cf.Banner {
569 printBanner()
570 }
571
dbainbri4d3a0dc2020-12-02 00:33:42 +0000572 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000573
574 ad := newAdapter(cf)
575
576 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000577 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000578
dbainbri4d3a0dc2020-12-02 00:33:42 +0000579 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
580 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000581
582 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
583
dbainbri4d3a0dc2020-12-02 00:33:42 +0000584 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
585 if err != nil {
586 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
587 } else {
588 defer log.TerminateTracing(closer)
589 }
590
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000591 go func() {
592 err := ad.start(probeCtx)
593 // If this operation returns an error
594 // cancel all operations using this context
595 if err != nil {
596 cancel()
597 }
598 }()
599
600 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000601 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000602
khenaidoo1fd58e02021-10-13 11:51:20 -0400603 // Set the ONU adapter GRPC service as not ready. This will prevent any request from coming to this adapter instance
604 probe.UpdateStatusFromContext(probeCtx, onuAdapterService, probe.ServiceStatusStopped)
605
Girish Gowdra55507832022-06-01 18:12:06 -0700606 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
607 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000608 // Cleanup before leaving
Girish Gowdra55507832022-06-01 18:12:06 -0700609 ad.stop(ctxWithCancel)
610 // Will halt any long-running stop routine gracefully
611 cancelFunc()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000612
613 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000614 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
615 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000616}