blob: 8aa6c1cef2ab7f2cb6595b96b1468f4ed0534c1e [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
Joey Armstronge8c091f2023-01-17 16:56:26 -05002 * Copyright 2020-2023 Open Networking Foundation (ONF) and the ONF Contributors
Holger Hildebrandtfa074992020-03-27 15:42:06 +00003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010027 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000028 "syscall"
29 "time"
30
nikesh.krishnanca4afa32023-06-28 03:42:16 +053031 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
khenaidoo7d3c5582021-08-11 18:09:44 -040032 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
33 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events"
35 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
36 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
37 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
38 "github.com/opencord/voltha-lib-go/v7/pkg/log"
39 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
40 "github.com/opencord/voltha-lib-go/v7/pkg/version"
khenaidoo42dcdfd2021-10-19 17:34:12 -040041 "github.com/opencord/voltha-protos/v5/go/adapter_service"
42 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoo42dcdfd2021-10-19 17:34:12 -040043 "github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
khenaidoo7d3c5582021-08-11 18:09:44 -040044 "github.com/opencord/voltha-protos/v5/go/voltha"
45 "google.golang.org/grpc"
nikesh.krishnanca4afa32023-06-28 03:42:16 +053046 codes "google.golang.org/grpc/codes"
khenaidoo7d3c5582021-08-11 18:09:44 -040047
khenaidoo42dcdfd2021-10-19 17:34:12 -040048 "github.com/opencord/voltha-protos/v5/go/core_adapter"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000049
Matteo Scandolo761f7512020-11-23 15:52:40 -080050 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
Holger Hildebrandt4b5e73f2021-08-19 06:51:21 +000051 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/core"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000052)
53
khenaidoo7d3c5582021-08-11 18:09:44 -040054const (
55 clusterMessagingService = "cluster-message-service"
56 onuAdapterService = "onu-adapter-service"
57 kvService = "kv-service"
58 coreService = "core-service"
59)
60
Holger Hildebrandtfa074992020-03-27 15:42:06 +000061type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053062 //defaultAppName string
khenaidoof3333552021-12-15 16:52:31 -050063 instanceID string
64 config *config.AdapterFlags
65 kafkaClient kafka.Client
66 kvClient kvstore.Client
67 eventProxy eventif.EventProxy
68 grpcServer *vgrpc.GrpcServer
69 onuAdapter *ac.OpenONUAC
70 onuInterAdapter *ac.OpenONUACInterAdapter
71 coreClient *vgrpc.Client
Holger Hildebrandtfa074992020-03-27 15:42:06 +000072}
73
Holger Hildebrandtfa074992020-03-27 15:42:06 +000074func newAdapter(cf *config.AdapterFlags) *adapter {
75 var a adapter
76 a.instanceID = cf.InstanceID
77 a.config = cf
Holger Hildebrandtfa074992020-03-27 15:42:06 +000078 return &a
79}
80
81func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000082 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000083 var err error
84
85 var p *probe.Probe
86 if value := ctx.Value(probe.ProbeContextKey); value != nil {
87 if _, ok := value.(*probe.Probe); ok {
88 p = value.(*probe.Probe)
89 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000090 ctx,
khenaidoo7d3c5582021-08-11 18:09:44 -040091 clusterMessagingService,
92 kvService,
93 onuAdapterService,
94 coreService,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000095 )
96 }
97 }
98
99 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +0000100 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
101 if err = a.setKVClient(ctx); err != nil {
102 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000103 }
104
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000105 // Setup Log Config
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800106 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000107 go conf.StartLogLevelConfigProcessing(cm, ctx)
108
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000109 // Setup Kafka Client
khenaidoo7d3c5582021-08-11 18:09:44 -0400110 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000111 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000112 }
113
khenaidoo7d3c5582021-08-11 18:09:44 -0400114 // Start kafka communication with the broker
115 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
116 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000117 }
118
khenaidoo7d3c5582021-08-11 18:09:44 -0400119 // Wait until connection to KV store is established
120 if err := WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
121 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000122 }
123
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000124 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530125 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo7d3c5582021-08-11 18:09:44 -0400126 go func() {
127 if err := a.eventProxy.Start(); err != nil {
128 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
129 }
130 }()
131
132 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
133 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo55cebc62021-12-08 14:44:41 -0500134 if a.coreClient, err = vgrpc.NewClient(
135 a.config.AdapterEndpoint,
136 a.config.CoreEndpoint,
khenaidoof3333552021-12-15 16:52:31 -0500137 "core_service.CoreService",
khenaidoo55cebc62021-12-08 14:44:41 -0500138 a.coreRestarted); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400139 logger.Fatal(ctx, "grpc-client-not-created")
140 }
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530141 // the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
142 backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
143
144 retryCodes := []codes.Code{
145 codes.Unavailable, // server is currently unavailable
146 codes.DeadlineExceeded, // deadline for the operation was exceeded
147 }
148 grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
149 logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
khenaidoo7d3c5582021-08-11 18:09:44 -0400150 // Start the core grpc client
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530151 go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000152
153 // Create the open ONU interface adapter
khenaidoof3333552021-12-15 16:52:31 -0500154 if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
155 logger.Fatalw(ctx, "error-starting-startONUAdapter", log.Fields{"error": err})
156 }
157
158 // Create the open ONU Inter adapter
159 if a.onuInterAdapter, err = a.startONUInterAdapter(ctx, a.onuAdapter); err != nil {
160 logger.Fatalw(ctx, "error-starting-startONUInterAdapter", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000161 }
162
khenaidoo7d3c5582021-08-11 18:09:44 -0400163 // Create and start the grpc server
164 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
165
166 //Register the adapter service
167 a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
168
169 //Register the onu inter adapter service
khenaidoof3333552021-12-15 16:52:31 -0500170 a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuInterAdapter)
khenaidoo7d3c5582021-08-11 18:09:44 -0400171
172 go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000173
174 // Register this adapter to the Core - retries indefinitely
khenaidoo7d3c5582021-08-11 18:09:44 -0400175 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000176 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000177 }
178
khenaidoo7d3c5582021-08-11 18:09:44 -0400179 // Start the readiness and liveliness check and update the probe status
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000180 a.checkServicesReadiness(ctx)
181 return err
182}
183
khenaidoo7d3c5582021-08-11 18:09:44 -0400184// TODO: Any action the adapter needs to do following a Core restart?
185func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
186 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
187 return nil
188}
189
khenaidoof3333552021-12-15 16:52:31 -0500190// getCoreServiceClientHandler is used to setup the remote gRPC service
191func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
192 if conn == nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400193 return nil
194 }
khenaidoof3333552021-12-15 16:52:31 -0500195 return core_service.NewCoreServiceClient(conn)
khenaidoo7d3c5582021-08-11 18:09:44 -0400196}
197
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000198func (a *adapter) stop(ctx context.Context) {
khenaidoof3333552021-12-15 16:52:31 -0500199 // Cleanup the grpc services first
200 if err := a.onuAdapter.Stop(ctx); err != nil {
201 logger.Errorw(ctx, "failure-stopping-onu-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
202 }
203 if err := a.onuInterAdapter.Stop(ctx); err != nil {
204 logger.Errorw(ctx, "failure-stopping-onu-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
205 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000206 // Cleanup - applies only if we had a kvClient
207 if a.kvClient != nil {
208 // Release all reservations
209 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000210 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000211 }
212 // Close the DB connection
Girish Gowdra55507832022-06-01 18:12:06 -0700213 go a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000214 }
215
khenaidoo7d3c5582021-08-11 18:09:44 -0400216 if a.eventProxy != nil {
217 a.eventProxy.Stop()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000218 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400219
220 if a.kafkaClient != nil {
221 a.kafkaClient.Stop(ctx)
222 }
223
224 // Stop core client
225 if a.coreClient != nil {
226 a.coreClient.Stop(ctx)
227 }
228
229 // TODO: More cleanup
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000230}
231
232// #############################################
233// Adapter Utility methods ##### begin #########
234
dbainbri4d3a0dc2020-12-02 00:33:42 +0000235func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
236 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000237 switch storeType {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000238 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000239 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000240 }
241 return nil, errors.New("unsupported-kv-store")
242}
243
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800244func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000245
dbainbri4d3a0dc2020-12-02 00:33:42 +0000246 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000247
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000248 switch clientType {
249 case "sarama":
250 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000251 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000252 kafka.ProducerReturnOnErrors(true),
253 kafka.ProducerReturnOnSuccess(true),
254 kafka.ProducerMaxRetries(6),
255 kafka.ProducerRetryBackoff(time.Millisecond*30),
256 kafka.MetadatMaxRetries(15)), nil
257 }
258
259 return nil, errors.New("unsupported-client-type")
260}
261
dbainbri4d3a0dc2020-12-02 00:33:42 +0000262func (a *adapter) setKVClient(ctx context.Context) error {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800263 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000264 if err != nil {
265 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000266 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000267 return err
268 }
269 a.kvClient = client
270 return nil
271}
272
khenaidoof3333552021-12-15 16:52:31 -0500273func (a *adapter) startONUAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800274 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000275 var err error
khenaidoo7d3c5582021-08-11 18:09:44 -0400276 sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000277
278 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000279 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000280 return nil, err
281 }
282
dbainbri4d3a0dc2020-12-02 00:33:42 +0000283 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000284 return sAcONU, nil
285}
286
khenaidoof3333552021-12-15 16:52:31 -0500287func (a *adapter) startONUInterAdapter(ctx context.Context, onuA *ac.OpenONUAC) (*ac.OpenONUACInterAdapter, error) {
288 var err error
289 sAcONUInterAdapter := ac.NewOpenONUACAdapter(ctx, onuA)
290
291 if err = sAcONUInterAdapter.Start(ctx); err != nil {
292 logger.Fatalw(ctx, "error-starting-OpenONUACInterAdapter", log.Fields{"error": err})
293 return nil, err
294 }
295
296 logger.Info(ctx, "OpenONUACInterAdapter-started")
297 return sAcONUInterAdapter, nil
298}
299
khenaidoo7d3c5582021-08-11 18:09:44 -0400300func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000301 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100302 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000303 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000304 "adapterID": adapterID,
305 "currentReplica": a.config.CurrentReplica,
306 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100307 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000308 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700309 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100310 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
311 Vendor: "VOLTHA OpenONUGo",
312 Version: version.VersionInfo.Version,
khenaidoo7d3c5582021-08-11 18:09:44 -0400313 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700314 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000315 CurrentReplica: int32(a.config.CurrentReplica),
316 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700317 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000318 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100319 VendorIds: vendorIdsList,
khenaidoo7d3c5582021-08-11 18:09:44 -0400320 AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
321 Adapter: "brcm_openomci_onu", // Deprecated attribute
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000322 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
323 AcceptsAddRemoveFlowUpdates: true}}
324 deviceTypes := &voltha.DeviceTypes{Items: types}
325 count := 0
326 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400327 gClient, err := a.coreClient.GetCoreServiceClient()
328 if gClient != nil {
329 if gClient != nil {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400330 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &core_adapter.AdapterRegistration{
khenaidoo7d3c5582021-08-11 18:09:44 -0400331 Adapter: adapterDescription,
332 DTypes: deviceTypes}); err == nil {
333 break
334 }
335 }
336 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000337 if retries == count {
338 return err
339 }
340 count++
khenaidoo7d3c5582021-08-11 18:09:44 -0400341 // Take a power nap before retrying
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000342 time.Sleep(2 * time.Second)
khenaidoo7d3c5582021-08-11 18:09:44 -0400343
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000344 }
345 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400346 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000347 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000348 return nil
349}
350
khenaidoo7d3c5582021-08-11 18:09:44 -0400351// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
352func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
353 logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
354
355 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
356 logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
357
358 server.Start(ctx)
359 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
360}
361
khenaidoo42dcdfd2021-10-19 17:34:12 -0400362func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400363 logger.Info(ctx, "adding-adapter-service")
364
365 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400366 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400367 })
368}
369
khenaidoo42dcdfd2021-10-19 17:34:12 -0400370func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler onu_inter_adapter_service.OnuInterAdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400371 logger.Info(ctx, "adding-onu-inter-adapter-service")
372
373 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400374 onu_inter_adapter_service.RegisterOnuInterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400375 })
376}
377
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000378/**
379This function checks the liveliness and readiness of the kakfa and kv-client services
380and update the status in the probe.
381*/
382func (a *adapter) checkServicesReadiness(ctx context.Context) {
383 // checks the kafka readiness
khenaidoo7d3c5582021-08-11 18:09:44 -0400384 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000385
386 // checks the kv-store readiness
387 go a.checkKvStoreReadiness(ctx)
388}
389
390/**
391This function checks the liveliness and readiness of the kv-store service
392and update the status in the probe.
393*/
394func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
395 // dividing the live probe interval by 2 to get updated status every 30s
396 timeout := a.config.LiveProbeInterval / 2
397 kvStoreChannel := make(chan bool, 1)
398
khenaidoo7d3c5582021-08-11 18:09:44 -0400399 // Default true - we are here only after we already had a KV store connection
400 kvStoreChannel <- true
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000401 for {
402 timeoutTimer := time.NewTimer(timeout)
403 select {
404 case liveliness := <-kvStoreChannel:
405 if !liveliness {
406 // kv-store not reachable or down, updating the status to not ready state
khenaidoo7d3c5582021-08-11 18:09:44 -0400407 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000408 timeout = a.config.NotLiveProbeInterval
409 } else {
410 // kv-store is reachable , updating the status to running state
khenaidoo7d3c5582021-08-11 18:09:44 -0400411 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000412 timeout = a.config.LiveProbeInterval / 2
413 }
414 // Check if the timer has expired or not
415 if !timeoutTimer.Stop() {
416 <-timeoutTimer.C
417 }
418 case <-timeoutTimer.C:
419 // Check the status of the kv-store
dbainbri4d3a0dc2020-12-02 00:33:42 +0000420 logger.Info(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000421 if a.kvClient.IsConnectionUp(ctx) {
422 kvStoreChannel <- true
423 } else {
424 kvStoreChannel <- false
425 }
426 }
427 }
428}
429
khenaidoo7d3c5582021-08-11 18:09:44 -0400430// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
431// context times out.
432func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
433 if kvClient == nil {
434 return errors.New("kvclient-is-nil")
435 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000436 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400437 if !kvClient.IsConnectionUp(ctx) {
438 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
439 logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
440 select {
441 case <-time.After(connectionRetryInterval):
442 continue
443 case <-ctx.Done():
444 return ctx.Err()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000445 }
446 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400447 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
448 logger.Info(ctx, "kv-connection-up")
449 break
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000450 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400451 return nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000452}
453
454// Adapter Utility methods ##### end #########
455// #############################################
456
dbainbri4d3a0dc2020-12-02 00:33:42 +0000457func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000458 if version.VersionInfo.Version == "unknown-version" {
459 content, err := ioutil.ReadFile("VERSION")
460 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100461 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000462 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000463 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000464 }
465 return version.VersionInfo.Version
466}
467
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000468func printVersion(appName string) {
469 fmt.Println(appName)
470 fmt.Println(version.VersionInfo.String(" "))
471}
472
473func printBanner() {
474 fmt.Println(" ____ ____ ___ ___ _ ")
475 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
476 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
477 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
478 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
479 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
480 fmt.Println(" | | __| |")
481 fmt.Println(" |_| |____/")
482 fmt.Println(" ")
483}
484
485func waitForExit(ctx context.Context) int {
486 signalChannel := make(chan os.Signal, 1)
487 signal.Notify(signalChannel,
488 syscall.SIGHUP,
489 syscall.SIGINT,
490 syscall.SIGTERM,
491 syscall.SIGQUIT)
492
493 exitChannel := make(chan int)
494
495 go func() {
496 select {
497 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000498 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000499 exitChannel <- 2
500 case s := <-signalChannel:
501 switch s {
502 case syscall.SIGHUP,
503 syscall.SIGINT,
504 syscall.SIGTERM,
505 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000506 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000507 exitChannel <- 0
508 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000509 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000510 exitChannel <- 1
511 }
512 }
513 }()
514
515 code := <-exitChannel
516 return code
517}
518
519func main() {
520 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000521 ctx, cancel := context.WithCancel(context.Background())
522 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000523
khenaidoo7d3c5582021-08-11 18:09:44 -0400524 cf := &config.AdapterFlags{}
525 cf.ParseCommandArguments(os.Args[1:])
526
dbainbri4d3a0dc2020-12-02 00:33:42 +0000527 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000528
529 // Setup logging
530
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000531 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700532 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000533 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700534 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000535
536 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000537 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000538 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000539 }
540
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000541 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000542 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000543 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000544 }
545
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000546 log.SetAllLogLevel(logLevel)
547
dbainbri4d3a0dc2020-12-02 00:33:42 +0000548 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000549
Himani Chawla4d908332020-08-31 12:30:20 +0530550 defer func() {
551 _ = log.CleanUp()
552 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000553 // Print version / build information and exit
554 if cf.DisplayVersionOnly {
555 printVersion(defaultAppName)
556 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000557 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000558 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
559 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
560 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000561
562 // Print banner if specified
563 if cf.Banner {
564 printBanner()
565 }
566
dbainbri4d3a0dc2020-12-02 00:33:42 +0000567 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000568
569 ad := newAdapter(cf)
570
571 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000572 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000573
dbainbri4d3a0dc2020-12-02 00:33:42 +0000574 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
575 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000576
577 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
578
dbainbri4d3a0dc2020-12-02 00:33:42 +0000579 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
580 if err != nil {
581 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
582 } else {
583 defer log.TerminateTracing(closer)
584 }
585
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000586 go func() {
587 err := ad.start(probeCtx)
588 // If this operation returns an error
589 // cancel all operations using this context
590 if err != nil {
591 cancel()
592 }
593 }()
594
595 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000596 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000597
khenaidoo1fd58e02021-10-13 11:51:20 -0400598 // Set the ONU adapter GRPC service as not ready. This will prevent any request from coming to this adapter instance
599 probe.UpdateStatusFromContext(probeCtx, onuAdapterService, probe.ServiceStatusStopped)
600
Girish Gowdra55507832022-06-01 18:12:06 -0700601 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
602 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000603 // Cleanup before leaving
Girish Gowdra55507832022-06-01 18:12:06 -0700604 ad.stop(ctxWithCancel)
605 // Will halt any long-running stop routine gracefully
606 cancelFunc()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000607
608 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000609 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
610 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000611}