blob: 1d9dc3bff9304c667687c7b38f11b657745e9c43 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
Joey Armstrong89c812c2024-01-12 19:00:20 -05002 * Copyright 2020-2024 Open Networking Foundation (ONF) and the ONF Contributors
Holger Hildebrandtfa074992020-03-27 15:42:06 +00003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Joey Armstrong89c812c2024-01-12 19:00:20 -050017// Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010027 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000028 "syscall"
29 "time"
30
nikesh.krishnanca4afa32023-06-28 03:42:16 +053031 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
khenaidoo7d3c5582021-08-11 18:09:44 -040032 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
33 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events"
35 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
36 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
37 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
38 "github.com/opencord/voltha-lib-go/v7/pkg/log"
39 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
40 "github.com/opencord/voltha-lib-go/v7/pkg/version"
khenaidoo42dcdfd2021-10-19 17:34:12 -040041 "github.com/opencord/voltha-protos/v5/go/adapter_service"
42 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoo42dcdfd2021-10-19 17:34:12 -040043 "github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
khenaidoo7d3c5582021-08-11 18:09:44 -040044 "github.com/opencord/voltha-protos/v5/go/voltha"
45 "google.golang.org/grpc"
nikesh.krishnanca4afa32023-06-28 03:42:16 +053046 codes "google.golang.org/grpc/codes"
khenaidoo7d3c5582021-08-11 18:09:44 -040047
khenaidoo42dcdfd2021-10-19 17:34:12 -040048 "github.com/opencord/voltha-protos/v5/go/core_adapter"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000049
Matteo Scandolo761f7512020-11-23 15:52:40 -080050 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
Holger Hildebrandt4b5e73f2021-08-19 06:51:21 +000051 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/core"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000052)
53
khenaidoo7d3c5582021-08-11 18:09:44 -040054const (
55 clusterMessagingService = "cluster-message-service"
56 onuAdapterService = "onu-adapter-service"
57 kvService = "kv-service"
58 coreService = "core-service"
59)
60
Holger Hildebrandtfa074992020-03-27 15:42:06 +000061type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053062 //defaultAppName string
khenaidoof3333552021-12-15 16:52:31 -050063 instanceID string
64 config *config.AdapterFlags
65 kafkaClient kafka.Client
66 kvClient kvstore.Client
67 eventProxy eventif.EventProxy
68 grpcServer *vgrpc.GrpcServer
69 onuAdapter *ac.OpenONUAC
70 onuInterAdapter *ac.OpenONUACInterAdapter
71 coreClient *vgrpc.Client
Holger Hildebrandtfa074992020-03-27 15:42:06 +000072}
73
Holger Hildebrandtfa074992020-03-27 15:42:06 +000074func newAdapter(cf *config.AdapterFlags) *adapter {
75 var a adapter
76 a.instanceID = cf.InstanceID
77 a.config = cf
Holger Hildebrandtfa074992020-03-27 15:42:06 +000078 return &a
79}
80
81func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000082 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000083 var err error
84
85 var p *probe.Probe
86 if value := ctx.Value(probe.ProbeContextKey); value != nil {
87 if _, ok := value.(*probe.Probe); ok {
88 p = value.(*probe.Probe)
89 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000090 ctx,
khenaidoo7d3c5582021-08-11 18:09:44 -040091 clusterMessagingService,
92 kvService,
93 onuAdapterService,
94 coreService,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000095 )
96 }
97 }
98
99 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +0000100 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
101 if err = a.setKVClient(ctx); err != nil {
102 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000103 }
104
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000105 // Setup Log Config
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800106 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000107 go conf.StartLogLevelConfigProcessing(cm, ctx)
108
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000109 // Setup Kafka Client
khenaidoo7d3c5582021-08-11 18:09:44 -0400110 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000111 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000112 }
113
khenaidoo7d3c5582021-08-11 18:09:44 -0400114 // Start kafka communication with the broker
115 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
116 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000117 }
118
khenaidoo7d3c5582021-08-11 18:09:44 -0400119 // Wait until connection to KV store is established
120 if err := WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
121 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000122 }
123
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000124 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530125 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo7d3c5582021-08-11 18:09:44 -0400126 go func() {
127 if err := a.eventProxy.Start(); err != nil {
128 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
129 }
130 }()
131
132 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
133 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo55cebc62021-12-08 14:44:41 -0500134 if a.coreClient, err = vgrpc.NewClient(
135 a.config.AdapterEndpoint,
136 a.config.CoreEndpoint,
khenaidoof3333552021-12-15 16:52:31 -0500137 "core_service.CoreService",
khenaidoo55cebc62021-12-08 14:44:41 -0500138 a.coreRestarted); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400139 logger.Fatal(ctx, "grpc-client-not-created")
140 }
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530141 // the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
142 backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
143
144 retryCodes := []codes.Code{
145 codes.Unavailable, // server is currently unavailable
146 codes.DeadlineExceeded, // deadline for the operation was exceeded
147 }
148 grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
149 logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
khenaidoo7d3c5582021-08-11 18:09:44 -0400150 // Start the core grpc client
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530151 go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000152
153 // Create the open ONU interface adapter
khenaidoof3333552021-12-15 16:52:31 -0500154 if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
155 logger.Fatalw(ctx, "error-starting-startONUAdapter", log.Fields{"error": err})
156 }
157
158 // Create the open ONU Inter adapter
159 if a.onuInterAdapter, err = a.startONUInterAdapter(ctx, a.onuAdapter); err != nil {
160 logger.Fatalw(ctx, "error-starting-startONUInterAdapter", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000161 }
162
khenaidoo7d3c5582021-08-11 18:09:44 -0400163 // Create and start the grpc server
164 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
165
166 //Register the adapter service
167 a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
168
169 //Register the onu inter adapter service
khenaidoof3333552021-12-15 16:52:31 -0500170 a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuInterAdapter)
khenaidoo7d3c5582021-08-11 18:09:44 -0400171
172 go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000173
174 // Register this adapter to the Core - retries indefinitely
khenaidoo7d3c5582021-08-11 18:09:44 -0400175 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000176 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000177 }
178
khenaidoo7d3c5582021-08-11 18:09:44 -0400179 // Start the readiness and liveliness check and update the probe status
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000180 a.checkServicesReadiness(ctx)
181 return err
182}
183
khenaidoo7d3c5582021-08-11 18:09:44 -0400184// TODO: Any action the adapter needs to do following a Core restart?
185func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
186 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
187 return nil
188}
189
khenaidoof3333552021-12-15 16:52:31 -0500190// getCoreServiceClientHandler is used to setup the remote gRPC service
191func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
192 if conn == nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400193 return nil
194 }
khenaidoof3333552021-12-15 16:52:31 -0500195 return core_service.NewCoreServiceClient(conn)
khenaidoo7d3c5582021-08-11 18:09:44 -0400196}
197
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000198func (a *adapter) stop(ctx context.Context) {
khenaidoof3333552021-12-15 16:52:31 -0500199 // Cleanup the grpc services first
200 if err := a.onuAdapter.Stop(ctx); err != nil {
201 logger.Errorw(ctx, "failure-stopping-onu-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
202 }
203 if err := a.onuInterAdapter.Stop(ctx); err != nil {
204 logger.Errorw(ctx, "failure-stopping-onu-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
205 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000206 // Cleanup - applies only if we had a kvClient
207 if a.kvClient != nil {
208 // Release all reservations
209 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000210 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000211 }
212 // Close the DB connection
Girish Gowdra55507832022-06-01 18:12:06 -0700213 go a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000214 }
215
khenaidoo7d3c5582021-08-11 18:09:44 -0400216 if a.eventProxy != nil {
217 a.eventProxy.Stop()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000218 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400219
220 if a.kafkaClient != nil {
221 a.kafkaClient.Stop(ctx)
222 }
223
224 // Stop core client
225 if a.coreClient != nil {
226 a.coreClient.Stop(ctx)
227 }
228
229 // TODO: More cleanup
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000230}
231
232// #############################################
233// Adapter Utility methods ##### begin #########
234
dbainbri4d3a0dc2020-12-02 00:33:42 +0000235func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
236 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000237 switch storeType {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000238 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000239 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000240 }
241 return nil, errors.New("unsupported-kv-store")
242}
243
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800244func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000245
dbainbri4d3a0dc2020-12-02 00:33:42 +0000246 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000247
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000248 switch clientType {
249 case "sarama":
250 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000251 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000252 kafka.ProducerReturnOnErrors(true),
253 kafka.ProducerReturnOnSuccess(true),
254 kafka.ProducerMaxRetries(6),
255 kafka.ProducerRetryBackoff(time.Millisecond*30),
256 kafka.MetadatMaxRetries(15)), nil
257 }
258
259 return nil, errors.New("unsupported-client-type")
260}
261
dbainbri4d3a0dc2020-12-02 00:33:42 +0000262func (a *adapter) setKVClient(ctx context.Context) error {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800263 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000264 if err != nil {
265 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000266 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000267 return err
268 }
269 a.kvClient = client
270 return nil
271}
272
khenaidoof3333552021-12-15 16:52:31 -0500273func (a *adapter) startONUAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800274 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000275 var err error
khenaidoo7d3c5582021-08-11 18:09:44 -0400276 sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000277
278 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000279 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000280 return nil, err
281 }
282
dbainbri4d3a0dc2020-12-02 00:33:42 +0000283 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000284 return sAcONU, nil
285}
286
khenaidoof3333552021-12-15 16:52:31 -0500287func (a *adapter) startONUInterAdapter(ctx context.Context, onuA *ac.OpenONUAC) (*ac.OpenONUACInterAdapter, error) {
288 var err error
289 sAcONUInterAdapter := ac.NewOpenONUACAdapter(ctx, onuA)
290
291 if err = sAcONUInterAdapter.Start(ctx); err != nil {
292 logger.Fatalw(ctx, "error-starting-OpenONUACInterAdapter", log.Fields{"error": err})
293 return nil, err
294 }
295
296 logger.Info(ctx, "OpenONUACInterAdapter-started")
297 return sAcONUInterAdapter, nil
298}
299
khenaidoo7d3c5582021-08-11 18:09:44 -0400300func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000301 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100302 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000303 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000304 "adapterID": adapterID,
305 "currentReplica": a.config.CurrentReplica,
306 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100307 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000308 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700309 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100310 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
311 Vendor: "VOLTHA OpenONUGo",
312 Version: version.VersionInfo.Version,
khenaidoo7d3c5582021-08-11 18:09:44 -0400313 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700314 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000315 CurrentReplica: int32(a.config.CurrentReplica),
316 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700317 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000318 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100319 VendorIds: vendorIdsList,
khenaidoo7d3c5582021-08-11 18:09:44 -0400320 AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
321 Adapter: "brcm_openomci_onu", // Deprecated attribute
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000322 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
323 AcceptsAddRemoveFlowUpdates: true}}
324 deviceTypes := &voltha.DeviceTypes{Items: types}
325 count := 0
326 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400327 gClient, err := a.coreClient.GetCoreServiceClient()
328 if gClient != nil {
329 if gClient != nil {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400330 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &core_adapter.AdapterRegistration{
khenaidoo7d3c5582021-08-11 18:09:44 -0400331 Adapter: adapterDescription,
332 DTypes: deviceTypes}); err == nil {
333 break
334 }
335 }
336 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000337 if retries == count {
338 return err
339 }
340 count++
khenaidoo7d3c5582021-08-11 18:09:44 -0400341 // Take a power nap before retrying
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000342 time.Sleep(2 * time.Second)
khenaidoo7d3c5582021-08-11 18:09:44 -0400343
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000344 }
345 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400346 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000347 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000348 return nil
349}
350
khenaidoo7d3c5582021-08-11 18:09:44 -0400351// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
352func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
353 logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
354
355 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
356 logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
357
358 server.Start(ctx)
359 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
360}
361
khenaidoo42dcdfd2021-10-19 17:34:12 -0400362func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400363 logger.Info(ctx, "adding-adapter-service")
364
365 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400366 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400367 })
368}
369
khenaidoo42dcdfd2021-10-19 17:34:12 -0400370func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler onu_inter_adapter_service.OnuInterAdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400371 logger.Info(ctx, "adding-onu-inter-adapter-service")
372
373 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400374 onu_inter_adapter_service.RegisterOnuInterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400375 })
376}
377
Joey Armstrong89c812c2024-01-12 19:00:20 -0500378/*
379*
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000380This function checks the liveliness and readiness of the kakfa and kv-client services
381and update the status in the probe.
382*/
383func (a *adapter) checkServicesReadiness(ctx context.Context) {
384 // checks the kafka readiness
khenaidoo7d3c5582021-08-11 18:09:44 -0400385 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000386
387 // checks the kv-store readiness
388 go a.checkKvStoreReadiness(ctx)
389}
390
Joey Armstrong89c812c2024-01-12 19:00:20 -0500391/*
392*
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000393This function checks the liveliness and readiness of the kv-store service
394and update the status in the probe.
395*/
396func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
397 // dividing the live probe interval by 2 to get updated status every 30s
398 timeout := a.config.LiveProbeInterval / 2
399 kvStoreChannel := make(chan bool, 1)
400
khenaidoo7d3c5582021-08-11 18:09:44 -0400401 // Default true - we are here only after we already had a KV store connection
402 kvStoreChannel <- true
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000403 for {
404 timeoutTimer := time.NewTimer(timeout)
405 select {
406 case liveliness := <-kvStoreChannel:
407 if !liveliness {
408 // kv-store not reachable or down, updating the status to not ready state
khenaidoo7d3c5582021-08-11 18:09:44 -0400409 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000410 timeout = a.config.NotLiveProbeInterval
411 } else {
412 // kv-store is reachable , updating the status to running state
khenaidoo7d3c5582021-08-11 18:09:44 -0400413 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000414 timeout = a.config.LiveProbeInterval / 2
415 }
416 // Check if the timer has expired or not
417 if !timeoutTimer.Stop() {
418 <-timeoutTimer.C
419 }
420 case <-timeoutTimer.C:
421 // Check the status of the kv-store
dbainbri4d3a0dc2020-12-02 00:33:42 +0000422 logger.Info(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000423 if a.kvClient.IsConnectionUp(ctx) {
424 kvStoreChannel <- true
425 } else {
426 kvStoreChannel <- false
427 }
428 }
429 }
430}
431
khenaidoo7d3c5582021-08-11 18:09:44 -0400432// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
433// context times out.
434func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
435 if kvClient == nil {
436 return errors.New("kvclient-is-nil")
437 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000438 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400439 if !kvClient.IsConnectionUp(ctx) {
440 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
441 logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
442 select {
443 case <-time.After(connectionRetryInterval):
444 continue
445 case <-ctx.Done():
446 return ctx.Err()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000447 }
448 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400449 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
450 logger.Info(ctx, "kv-connection-up")
451 break
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000452 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400453 return nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000454}
455
456// Adapter Utility methods ##### end #########
457// #############################################
458
dbainbri4d3a0dc2020-12-02 00:33:42 +0000459func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000460 if version.VersionInfo.Version == "unknown-version" {
461 content, err := ioutil.ReadFile("VERSION")
462 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100463 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000464 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000465 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000466 }
467 return version.VersionInfo.Version
468}
469
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000470func printVersion(appName string) {
471 fmt.Println(appName)
472 fmt.Println(version.VersionInfo.String(" "))
473}
474
475func printBanner() {
476 fmt.Println(" ____ ____ ___ ___ _ ")
477 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
478 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
479 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
480 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
481 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
482 fmt.Println(" | | __| |")
483 fmt.Println(" |_| |____/")
484 fmt.Println(" ")
485}
486
487func waitForExit(ctx context.Context) int {
488 signalChannel := make(chan os.Signal, 1)
489 signal.Notify(signalChannel,
490 syscall.SIGHUP,
491 syscall.SIGINT,
492 syscall.SIGTERM,
493 syscall.SIGQUIT)
494
495 exitChannel := make(chan int)
496
497 go func() {
498 select {
499 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000500 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000501 exitChannel <- 2
502 case s := <-signalChannel:
503 switch s {
504 case syscall.SIGHUP,
505 syscall.SIGINT,
506 syscall.SIGTERM,
507 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000508 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000509 exitChannel <- 0
510 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000511 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000512 exitChannel <- 1
513 }
514 }
515 }()
516
517 code := <-exitChannel
518 return code
519}
520
521func main() {
522 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000523 ctx, cancel := context.WithCancel(context.Background())
524 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000525
khenaidoo7d3c5582021-08-11 18:09:44 -0400526 cf := &config.AdapterFlags{}
527 cf.ParseCommandArguments(os.Args[1:])
528
dbainbri4d3a0dc2020-12-02 00:33:42 +0000529 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000530
531 // Setup logging
532
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000533 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700534 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000535 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700536 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000537
538 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000539 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000540 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000541 }
542
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000543 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000544 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000545 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000546 }
547
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000548 log.SetAllLogLevel(logLevel)
549
dbainbri4d3a0dc2020-12-02 00:33:42 +0000550 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000551
Himani Chawla4d908332020-08-31 12:30:20 +0530552 defer func() {
553 _ = log.CleanUp()
554 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000555 // Print version / build information and exit
556 if cf.DisplayVersionOnly {
557 printVersion(defaultAppName)
558 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000559 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000560 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
561 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
562 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000563
564 // Print banner if specified
565 if cf.Banner {
566 printBanner()
567 }
568
dbainbri4d3a0dc2020-12-02 00:33:42 +0000569 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000570
571 ad := newAdapter(cf)
572
573 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000574 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000575
dbainbri4d3a0dc2020-12-02 00:33:42 +0000576 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
577 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000578
579 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
580
dbainbri4d3a0dc2020-12-02 00:33:42 +0000581 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
582 if err != nil {
583 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
584 } else {
585 defer log.TerminateTracing(closer)
586 }
587
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000588 go func() {
589 err := ad.start(probeCtx)
590 // If this operation returns an error
591 // cancel all operations using this context
592 if err != nil {
593 cancel()
594 }
595 }()
596
597 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000598 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000599
khenaidoo1fd58e02021-10-13 11:51:20 -0400600 // Set the ONU adapter GRPC service as not ready. This will prevent any request from coming to this adapter instance
601 probe.UpdateStatusFromContext(probeCtx, onuAdapterService, probe.ServiceStatusStopped)
602
Girish Gowdra55507832022-06-01 18:12:06 -0700603 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
604 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000605 // Cleanup before leaving
Girish Gowdra55507832022-06-01 18:12:06 -0700606 ad.stop(ctxWithCancel)
607 // Will halt any long-running stop routine gracefully
608 cancelFunc()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000609
610 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000611 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
612 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000613}