blob: e298ccc962f9609543124674782a74851386988d [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
Joey Armstrong89c812c2024-01-12 19:00:20 -05002 * Copyright 2020-2024 Open Networking Foundation (ONF) and the ONF Contributors
Holger Hildebrandtfa074992020-03-27 15:42:06 +00003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Joey Armstrong89c812c2024-01-12 19:00:20 -050017// Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010027 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000028 "syscall"
29 "time"
30
nikesh.krishnanca4afa32023-06-28 03:42:16 +053031 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
khenaidoo7d3c5582021-08-11 18:09:44 -040032 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
33 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events"
35 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
36 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
37 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
38 "github.com/opencord/voltha-lib-go/v7/pkg/log"
39 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
40 "github.com/opencord/voltha-lib-go/v7/pkg/version"
khenaidoo42dcdfd2021-10-19 17:34:12 -040041 "github.com/opencord/voltha-protos/v5/go/adapter_service"
42 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoo42dcdfd2021-10-19 17:34:12 -040043 "github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
khenaidoo7d3c5582021-08-11 18:09:44 -040044 "github.com/opencord/voltha-protos/v5/go/voltha"
45 "google.golang.org/grpc"
nikesh.krishnanca4afa32023-06-28 03:42:16 +053046 codes "google.golang.org/grpc/codes"
khenaidoo7d3c5582021-08-11 18:09:44 -040047
khenaidoo42dcdfd2021-10-19 17:34:12 -040048 "github.com/opencord/voltha-protos/v5/go/core_adapter"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000049
Matteo Scandolo761f7512020-11-23 15:52:40 -080050 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
Holger Hildebrandt4b5e73f2021-08-19 06:51:21 +000051 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/core"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000052)
53
khenaidoo7d3c5582021-08-11 18:09:44 -040054const (
55 clusterMessagingService = "cluster-message-service"
56 onuAdapterService = "onu-adapter-service"
57 kvService = "kv-service"
58 coreService = "core-service"
59)
60
Holger Hildebrandtfa074992020-03-27 15:42:06 +000061type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053062 //defaultAppName string
khenaidoof3333552021-12-15 16:52:31 -050063 instanceID string
64 config *config.AdapterFlags
65 kafkaClient kafka.Client
66 kvClient kvstore.Client
67 eventProxy eventif.EventProxy
68 grpcServer *vgrpc.GrpcServer
69 onuAdapter *ac.OpenONUAC
70 onuInterAdapter *ac.OpenONUACInterAdapter
71 coreClient *vgrpc.Client
Holger Hildebrandtfa074992020-03-27 15:42:06 +000072}
73
Holger Hildebrandtfa074992020-03-27 15:42:06 +000074func newAdapter(cf *config.AdapterFlags) *adapter {
75 var a adapter
76 a.instanceID = cf.InstanceID
77 a.config = cf
Holger Hildebrandtfa074992020-03-27 15:42:06 +000078 return &a
79}
80
81func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000082 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000083 var err error
84
85 var p *probe.Probe
86 if value := ctx.Value(probe.ProbeContextKey); value != nil {
87 if _, ok := value.(*probe.Probe); ok {
88 p = value.(*probe.Probe)
89 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000090 ctx,
khenaidoo7d3c5582021-08-11 18:09:44 -040091 clusterMessagingService,
92 kvService,
93 onuAdapterService,
94 coreService,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000095 )
96 }
97 }
98
99 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +0000100 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
101 if err = a.setKVClient(ctx); err != nil {
102 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000103 }
104
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000105 // Setup Log Config
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800106 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000107 go conf.StartLogLevelConfigProcessing(cm, ctx)
108
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000109 // Setup Kafka Client
khenaidoo7d3c5582021-08-11 18:09:44 -0400110 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000111 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000112 }
113
khenaidoo7d3c5582021-08-11 18:09:44 -0400114 // Start kafka communication with the broker
115 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
116 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000117 }
118
khenaidoo7d3c5582021-08-11 18:09:44 -0400119 // Wait until connection to KV store is established
120 if err := WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
121 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000122 }
123
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000124 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530125 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo7d3c5582021-08-11 18:09:44 -0400126 go func() {
127 if err := a.eventProxy.Start(); err != nil {
128 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
129 }
130 }()
131
132 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
133 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo55cebc62021-12-08 14:44:41 -0500134 if a.coreClient, err = vgrpc.NewClient(
135 a.config.AdapterEndpoint,
136 a.config.CoreEndpoint,
khenaidoof3333552021-12-15 16:52:31 -0500137 "core_service.CoreService",
khenaidoo55cebc62021-12-08 14:44:41 -0500138 a.coreRestarted); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400139 logger.Fatal(ctx, "grpc-client-not-created")
140 }
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530141 // the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
142 backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
143
144 retryCodes := []codes.Code{
145 codes.Unavailable, // server is currently unavailable
146 codes.DeadlineExceeded, // deadline for the operation was exceeded
147 }
148 grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
149 logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
khenaidoo7d3c5582021-08-11 18:09:44 -0400150 // Start the core grpc client
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530151 go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000152
153 // Create the open ONU interface adapter
khenaidoof3333552021-12-15 16:52:31 -0500154 if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
155 logger.Fatalw(ctx, "error-starting-startONUAdapter", log.Fields{"error": err})
156 }
157
158 // Create the open ONU Inter adapter
159 if a.onuInterAdapter, err = a.startONUInterAdapter(ctx, a.onuAdapter); err != nil {
160 logger.Fatalw(ctx, "error-starting-startONUInterAdapter", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000161 }
162
khenaidoo7d3c5582021-08-11 18:09:44 -0400163 // Create and start the grpc server
164 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
165
166 //Register the adapter service
167 a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
168
169 //Register the onu inter adapter service
khenaidoof3333552021-12-15 16:52:31 -0500170 a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuInterAdapter)
khenaidoo7d3c5582021-08-11 18:09:44 -0400171
172 go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000173
174 // Register this adapter to the Core - retries indefinitely
khenaidoo7d3c5582021-08-11 18:09:44 -0400175 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000176 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000177 }
178
khenaidoo7d3c5582021-08-11 18:09:44 -0400179 // Start the readiness and liveliness check and update the probe status
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000180 a.checkServicesReadiness(ctx)
181 return err
182}
183
khenaidoo7d3c5582021-08-11 18:09:44 -0400184// TODO: Any action the adapter needs to do following a Core restart?
185func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
186 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
187 return nil
188}
189
khenaidoof3333552021-12-15 16:52:31 -0500190// getCoreServiceClientHandler is used to setup the remote gRPC service
191func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
192 if conn == nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400193 return nil
194 }
khenaidoof3333552021-12-15 16:52:31 -0500195 return core_service.NewCoreServiceClient(conn)
khenaidoo7d3c5582021-08-11 18:09:44 -0400196}
197
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000198func (a *adapter) stop(ctx context.Context) {
khenaidoof3333552021-12-15 16:52:31 -0500199 // Cleanup the grpc services first
200 if err := a.onuAdapter.Stop(ctx); err != nil {
201 logger.Errorw(ctx, "failure-stopping-onu-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
202 }
203 if err := a.onuInterAdapter.Stop(ctx); err != nil {
204 logger.Errorw(ctx, "failure-stopping-onu-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
205 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000206 // Cleanup - applies only if we had a kvClient
207 if a.kvClient != nil {
208 // Release all reservations
209 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000210 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000211 }
212 // Close the DB connection
Girish Gowdra55507832022-06-01 18:12:06 -0700213 go a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000214 }
215
khenaidoo7d3c5582021-08-11 18:09:44 -0400216 if a.eventProxy != nil {
217 a.eventProxy.Stop()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000218 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400219
220 if a.kafkaClient != nil {
221 a.kafkaClient.Stop(ctx)
222 }
223
224 // Stop core client
225 if a.coreClient != nil {
226 a.coreClient.Stop(ctx)
227 }
228
229 // TODO: More cleanup
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000230}
231
232// #############################################
233// Adapter Utility methods ##### begin #########
234
dbainbri4d3a0dc2020-12-02 00:33:42 +0000235func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
236 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000237 switch storeType {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000238 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000239 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Abhay Kumar3282a142024-07-12 06:03:12 +0530240 case "redis":
241 return kvstore.NewRedisClient(address, timeout, false)
242 case "redis-sentinel":
243 return kvstore.NewRedisClient(address, timeout, true)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000244 }
245 return nil, errors.New("unsupported-kv-store")
246}
247
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800248func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000249
dbainbri4d3a0dc2020-12-02 00:33:42 +0000250 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000251
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000252 switch clientType {
253 case "sarama":
254 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000255 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000256 kafka.ProducerReturnOnErrors(true),
257 kafka.ProducerReturnOnSuccess(true),
258 kafka.ProducerMaxRetries(6),
259 kafka.ProducerRetryBackoff(time.Millisecond*30),
260 kafka.MetadatMaxRetries(15)), nil
261 }
262
263 return nil, errors.New("unsupported-client-type")
264}
265
dbainbri4d3a0dc2020-12-02 00:33:42 +0000266func (a *adapter) setKVClient(ctx context.Context) error {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800267 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000268 if err != nil {
269 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000270 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000271 return err
272 }
273 a.kvClient = client
274 return nil
275}
276
khenaidoof3333552021-12-15 16:52:31 -0500277func (a *adapter) startONUAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800278 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000279 var err error
khenaidoo7d3c5582021-08-11 18:09:44 -0400280 sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000281
282 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000283 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000284 return nil, err
285 }
286
dbainbri4d3a0dc2020-12-02 00:33:42 +0000287 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000288 return sAcONU, nil
289}
290
khenaidoof3333552021-12-15 16:52:31 -0500291func (a *adapter) startONUInterAdapter(ctx context.Context, onuA *ac.OpenONUAC) (*ac.OpenONUACInterAdapter, error) {
292 var err error
293 sAcONUInterAdapter := ac.NewOpenONUACAdapter(ctx, onuA)
294
295 if err = sAcONUInterAdapter.Start(ctx); err != nil {
296 logger.Fatalw(ctx, "error-starting-OpenONUACInterAdapter", log.Fields{"error": err})
297 return nil, err
298 }
299
300 logger.Info(ctx, "OpenONUACInterAdapter-started")
301 return sAcONUInterAdapter, nil
302}
303
khenaidoo7d3c5582021-08-11 18:09:44 -0400304func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000305 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100306 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000307 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000308 "adapterID": adapterID,
309 "currentReplica": a.config.CurrentReplica,
310 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100311 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000312 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700313 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100314 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
315 Vendor: "VOLTHA OpenONUGo",
316 Version: version.VersionInfo.Version,
khenaidoo7d3c5582021-08-11 18:09:44 -0400317 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700318 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000319 CurrentReplica: int32(a.config.CurrentReplica),
320 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700321 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000322 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100323 VendorIds: vendorIdsList,
khenaidoo7d3c5582021-08-11 18:09:44 -0400324 AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
325 Adapter: "brcm_openomci_onu", // Deprecated attribute
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000326 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
327 AcceptsAddRemoveFlowUpdates: true}}
328 deviceTypes := &voltha.DeviceTypes{Items: types}
329 count := 0
330 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400331 gClient, err := a.coreClient.GetCoreServiceClient()
332 if gClient != nil {
333 if gClient != nil {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400334 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &core_adapter.AdapterRegistration{
khenaidoo7d3c5582021-08-11 18:09:44 -0400335 Adapter: adapterDescription,
336 DTypes: deviceTypes}); err == nil {
337 break
338 }
339 }
340 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000341 if retries == count {
342 return err
343 }
344 count++
khenaidoo7d3c5582021-08-11 18:09:44 -0400345 // Take a power nap before retrying
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000346 time.Sleep(2 * time.Second)
khenaidoo7d3c5582021-08-11 18:09:44 -0400347
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000348 }
349 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400350 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000351 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000352 return nil
353}
354
khenaidoo7d3c5582021-08-11 18:09:44 -0400355// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
356func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
357 logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
358
359 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
360 logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
361
362 server.Start(ctx)
363 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
364}
365
khenaidoo42dcdfd2021-10-19 17:34:12 -0400366func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400367 logger.Info(ctx, "adding-adapter-service")
368
369 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400370 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400371 })
372}
373
khenaidoo42dcdfd2021-10-19 17:34:12 -0400374func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler onu_inter_adapter_service.OnuInterAdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400375 logger.Info(ctx, "adding-onu-inter-adapter-service")
376
377 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400378 onu_inter_adapter_service.RegisterOnuInterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400379 })
380}
381
Joey Armstrong89c812c2024-01-12 19:00:20 -0500382/*
383*
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000384This function checks the liveliness and readiness of the kakfa and kv-client services
385and update the status in the probe.
386*/
387func (a *adapter) checkServicesReadiness(ctx context.Context) {
388 // checks the kafka readiness
khenaidoo7d3c5582021-08-11 18:09:44 -0400389 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000390
391 // checks the kv-store readiness
392 go a.checkKvStoreReadiness(ctx)
393}
394
Joey Armstrong89c812c2024-01-12 19:00:20 -0500395/*
396*
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000397This function checks the liveliness and readiness of the kv-store service
398and update the status in the probe.
399*/
400func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
401 // dividing the live probe interval by 2 to get updated status every 30s
402 timeout := a.config.LiveProbeInterval / 2
403 kvStoreChannel := make(chan bool, 1)
404
khenaidoo7d3c5582021-08-11 18:09:44 -0400405 // Default true - we are here only after we already had a KV store connection
406 kvStoreChannel <- true
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000407 for {
408 timeoutTimer := time.NewTimer(timeout)
409 select {
410 case liveliness := <-kvStoreChannel:
411 if !liveliness {
412 // kv-store not reachable or down, updating the status to not ready state
khenaidoo7d3c5582021-08-11 18:09:44 -0400413 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000414 timeout = a.config.NotLiveProbeInterval
415 } else {
416 // kv-store is reachable , updating the status to running state
khenaidoo7d3c5582021-08-11 18:09:44 -0400417 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000418 timeout = a.config.LiveProbeInterval / 2
419 }
420 // Check if the timer has expired or not
421 if !timeoutTimer.Stop() {
422 <-timeoutTimer.C
423 }
424 case <-timeoutTimer.C:
425 // Check the status of the kv-store
Akash Soni840f8d62024-12-11 19:37:06 +0530426 logger.Debug(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000427 if a.kvClient.IsConnectionUp(ctx) {
428 kvStoreChannel <- true
429 } else {
430 kvStoreChannel <- false
431 }
432 }
433 }
434}
435
khenaidoo7d3c5582021-08-11 18:09:44 -0400436// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
437// context times out.
438func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
439 if kvClient == nil {
440 return errors.New("kvclient-is-nil")
441 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000442 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400443 if !kvClient.IsConnectionUp(ctx) {
444 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
445 logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
446 select {
447 case <-time.After(connectionRetryInterval):
448 continue
449 case <-ctx.Done():
450 return ctx.Err()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000451 }
452 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400453 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
454 logger.Info(ctx, "kv-connection-up")
455 break
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000456 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400457 return nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000458}
459
460// Adapter Utility methods ##### end #########
461// #############################################
462
dbainbri4d3a0dc2020-12-02 00:33:42 +0000463func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000464 if version.VersionInfo.Version == "unknown-version" {
465 content, err := ioutil.ReadFile("VERSION")
466 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100467 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000468 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000469 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000470 }
471 return version.VersionInfo.Version
472}
473
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000474func printVersion(appName string) {
475 fmt.Println(appName)
476 fmt.Println(version.VersionInfo.String(" "))
477}
478
479func printBanner() {
480 fmt.Println(" ____ ____ ___ ___ _ ")
481 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
482 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
483 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
484 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
485 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
486 fmt.Println(" | | __| |")
487 fmt.Println(" |_| |____/")
488 fmt.Println(" ")
489}
490
491func waitForExit(ctx context.Context) int {
492 signalChannel := make(chan os.Signal, 1)
493 signal.Notify(signalChannel,
494 syscall.SIGHUP,
495 syscall.SIGINT,
496 syscall.SIGTERM,
497 syscall.SIGQUIT)
498
499 exitChannel := make(chan int)
500
501 go func() {
502 select {
503 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000504 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000505 exitChannel <- 2
506 case s := <-signalChannel:
507 switch s {
508 case syscall.SIGHUP,
509 syscall.SIGINT,
510 syscall.SIGTERM,
511 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000512 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000513 exitChannel <- 0
514 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000515 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000516 exitChannel <- 1
517 }
518 }
519 }()
520
521 code := <-exitChannel
522 return code
523}
524
525func main() {
526 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000527 ctx, cancel := context.WithCancel(context.Background())
528 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000529
khenaidoo7d3c5582021-08-11 18:09:44 -0400530 cf := &config.AdapterFlags{}
531 cf.ParseCommandArguments(os.Args[1:])
532
dbainbri4d3a0dc2020-12-02 00:33:42 +0000533 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000534
535 // Setup logging
536
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000537 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700538 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000539 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700540 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000541
542 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000543 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000544 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000545 }
546
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000547 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000548 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000549 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000550 }
551
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000552 log.SetAllLogLevel(logLevel)
553
dbainbri4d3a0dc2020-12-02 00:33:42 +0000554 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000555
Himani Chawla4d908332020-08-31 12:30:20 +0530556 defer func() {
557 _ = log.CleanUp()
558 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000559 // Print version / build information and exit
560 if cf.DisplayVersionOnly {
561 printVersion(defaultAppName)
562 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000563 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000564 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
565 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
566 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000567
568 // Print banner if specified
569 if cf.Banner {
570 printBanner()
571 }
572
dbainbri4d3a0dc2020-12-02 00:33:42 +0000573 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000574
575 ad := newAdapter(cf)
576
577 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000578 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000579
dbainbri4d3a0dc2020-12-02 00:33:42 +0000580 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
581 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000582
583 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
584
dbainbri4d3a0dc2020-12-02 00:33:42 +0000585 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
586 if err != nil {
587 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
588 } else {
589 defer log.TerminateTracing(closer)
590 }
591
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000592 go func() {
593 err := ad.start(probeCtx)
594 // If this operation returns an error
595 // cancel all operations using this context
596 if err != nil {
597 cancel()
598 }
599 }()
600
601 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000602 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000603
khenaidoo1fd58e02021-10-13 11:51:20 -0400604 // Set the ONU adapter GRPC service as not ready. This will prevent any request from coming to this adapter instance
605 probe.UpdateStatusFromContext(probeCtx, onuAdapterService, probe.ServiceStatusStopped)
606
Girish Gowdra55507832022-06-01 18:12:06 -0700607 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
608 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000609 // Cleanup before leaving
Girish Gowdra55507832022-06-01 18:12:06 -0700610 ad.stop(ctxWithCancel)
611 // Will halt any long-running stop routine gracefully
612 cancelFunc()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000613
614 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000615 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
616 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000617}