blob: 07f6d506e9649c1829e67579a46fc00510202ac2 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010027 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000028 "syscall"
29 "time"
30
khenaidoo7d3c5582021-08-11 18:09:44 -040031 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
32 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v7/pkg/events"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
35 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
36 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v7/pkg/log"
38 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
39 "github.com/opencord/voltha-lib-go/v7/pkg/version"
khenaidoo42dcdfd2021-10-19 17:34:12 -040040 "github.com/opencord/voltha-protos/v5/go/adapter_service"
41 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoo42dcdfd2021-10-19 17:34:12 -040042 "github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
khenaidoo7d3c5582021-08-11 18:09:44 -040043 "github.com/opencord/voltha-protos/v5/go/voltha"
44 "google.golang.org/grpc"
45
khenaidoo42dcdfd2021-10-19 17:34:12 -040046 "github.com/opencord/voltha-protos/v5/go/core_adapter"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000047
Matteo Scandolo761f7512020-11-23 15:52:40 -080048 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
Holger Hildebrandt4b5e73f2021-08-19 06:51:21 +000049 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/core"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000050)
51
khenaidoo7d3c5582021-08-11 18:09:44 -040052const (
53 clusterMessagingService = "cluster-message-service"
54 onuAdapterService = "onu-adapter-service"
55 kvService = "kv-service"
56 coreService = "core-service"
57)
58
Holger Hildebrandtfa074992020-03-27 15:42:06 +000059type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053060 //defaultAppName string
khenaidoof3333552021-12-15 16:52:31 -050061 instanceID string
62 config *config.AdapterFlags
63 kafkaClient kafka.Client
64 kvClient kvstore.Client
65 eventProxy eventif.EventProxy
66 grpcServer *vgrpc.GrpcServer
67 onuAdapter *ac.OpenONUAC
68 onuInterAdapter *ac.OpenONUACInterAdapter
69 coreClient *vgrpc.Client
Holger Hildebrandtfa074992020-03-27 15:42:06 +000070}
71
Holger Hildebrandtfa074992020-03-27 15:42:06 +000072func newAdapter(cf *config.AdapterFlags) *adapter {
73 var a adapter
74 a.instanceID = cf.InstanceID
75 a.config = cf
Holger Hildebrandtfa074992020-03-27 15:42:06 +000076 return &a
77}
78
79func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000080 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000081 var err error
82
83 var p *probe.Probe
84 if value := ctx.Value(probe.ProbeContextKey); value != nil {
85 if _, ok := value.(*probe.Probe); ok {
86 p = value.(*probe.Probe)
87 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000088 ctx,
khenaidoo7d3c5582021-08-11 18:09:44 -040089 clusterMessagingService,
90 kvService,
91 onuAdapterService,
92 coreService,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000093 )
94 }
95 }
96
97 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +000098 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
99 if err = a.setKVClient(ctx); err != nil {
100 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000101 }
102
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000103 // Setup Log Config
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800104 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000105 go conf.StartLogLevelConfigProcessing(cm, ctx)
106
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000107 // Setup Kafka Client
khenaidoo7d3c5582021-08-11 18:09:44 -0400108 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000109 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000110 }
111
khenaidoo7d3c5582021-08-11 18:09:44 -0400112 // Start kafka communication with the broker
113 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
114 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000115 }
116
khenaidoo7d3c5582021-08-11 18:09:44 -0400117 // Wait until connection to KV store is established
118 if err := WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
119 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000120 }
121
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000122 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530123 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo7d3c5582021-08-11 18:09:44 -0400124 go func() {
125 if err := a.eventProxy.Start(); err != nil {
126 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
127 }
128 }()
129
130 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
131 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo55cebc62021-12-08 14:44:41 -0500132 if a.coreClient, err = vgrpc.NewClient(
133 a.config.AdapterEndpoint,
134 a.config.CoreEndpoint,
khenaidoof3333552021-12-15 16:52:31 -0500135 "core_service.CoreService",
khenaidoo55cebc62021-12-08 14:44:41 -0500136 a.coreRestarted); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400137 logger.Fatal(ctx, "grpc-client-not-created")
138 }
139 // Start the core grpc client
khenaidoof3333552021-12-15 16:52:31 -0500140 go a.coreClient.Start(ctx, getCoreServiceClientHandler)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000141
142 // Create the open ONU interface adapter
khenaidoof3333552021-12-15 16:52:31 -0500143 if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
144 logger.Fatalw(ctx, "error-starting-startONUAdapter", log.Fields{"error": err})
145 }
146
147 // Create the open ONU Inter adapter
148 if a.onuInterAdapter, err = a.startONUInterAdapter(ctx, a.onuAdapter); err != nil {
149 logger.Fatalw(ctx, "error-starting-startONUInterAdapter", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000150 }
151
khenaidoo7d3c5582021-08-11 18:09:44 -0400152 // Create and start the grpc server
153 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
154
155 //Register the adapter service
156 a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
157
158 //Register the onu inter adapter service
khenaidoof3333552021-12-15 16:52:31 -0500159 a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuInterAdapter)
khenaidoo7d3c5582021-08-11 18:09:44 -0400160
161 go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000162
163 // Register this adapter to the Core - retries indefinitely
khenaidoo7d3c5582021-08-11 18:09:44 -0400164 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000165 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000166 }
167
khenaidoo7d3c5582021-08-11 18:09:44 -0400168 // Start the readiness and liveliness check and update the probe status
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000169 a.checkServicesReadiness(ctx)
170 return err
171}
172
khenaidoo7d3c5582021-08-11 18:09:44 -0400173// TODO: Any action the adapter needs to do following a Core restart?
174func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
175 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
176 return nil
177}
178
khenaidoof3333552021-12-15 16:52:31 -0500179// getCoreServiceClientHandler is used to setup the remote gRPC service
180func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
181 if conn == nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400182 return nil
183 }
khenaidoof3333552021-12-15 16:52:31 -0500184 return core_service.NewCoreServiceClient(conn)
khenaidoo7d3c5582021-08-11 18:09:44 -0400185}
186
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000187func (a *adapter) stop(ctx context.Context) {
khenaidoof3333552021-12-15 16:52:31 -0500188 // Cleanup the grpc services first
189 if err := a.onuAdapter.Stop(ctx); err != nil {
190 logger.Errorw(ctx, "failure-stopping-onu-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
191 }
192 if err := a.onuInterAdapter.Stop(ctx); err != nil {
193 logger.Errorw(ctx, "failure-stopping-onu-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
194 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000195 // Cleanup - applies only if we had a kvClient
196 if a.kvClient != nil {
197 // Release all reservations
198 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000199 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000200 }
201 // Close the DB connection
Girish Gowdra55507832022-06-01 18:12:06 -0700202 go a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000203 }
204
khenaidoo7d3c5582021-08-11 18:09:44 -0400205 if a.eventProxy != nil {
206 a.eventProxy.Stop()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000207 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400208
209 if a.kafkaClient != nil {
210 a.kafkaClient.Stop(ctx)
211 }
212
213 // Stop core client
214 if a.coreClient != nil {
215 a.coreClient.Stop(ctx)
216 }
217
218 // TODO: More cleanup
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000219}
220
221// #############################################
222// Adapter Utility methods ##### begin #########
223
dbainbri4d3a0dc2020-12-02 00:33:42 +0000224func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
225 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000226 switch storeType {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000227 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000228 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000229 }
230 return nil, errors.New("unsupported-kv-store")
231}
232
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800233func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000234
dbainbri4d3a0dc2020-12-02 00:33:42 +0000235 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000236
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000237 switch clientType {
238 case "sarama":
239 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000240 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000241 kafka.ProducerReturnOnErrors(true),
242 kafka.ProducerReturnOnSuccess(true),
243 kafka.ProducerMaxRetries(6),
244 kafka.ProducerRetryBackoff(time.Millisecond*30),
245 kafka.MetadatMaxRetries(15)), nil
246 }
247
248 return nil, errors.New("unsupported-client-type")
249}
250
dbainbri4d3a0dc2020-12-02 00:33:42 +0000251func (a *adapter) setKVClient(ctx context.Context) error {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800252 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000253 if err != nil {
254 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000255 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000256 return err
257 }
258 a.kvClient = client
259 return nil
260}
261
khenaidoof3333552021-12-15 16:52:31 -0500262func (a *adapter) startONUAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800263 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000264 var err error
khenaidoo7d3c5582021-08-11 18:09:44 -0400265 sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000266
267 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000268 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000269 return nil, err
270 }
271
dbainbri4d3a0dc2020-12-02 00:33:42 +0000272 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000273 return sAcONU, nil
274}
275
khenaidoof3333552021-12-15 16:52:31 -0500276func (a *adapter) startONUInterAdapter(ctx context.Context, onuA *ac.OpenONUAC) (*ac.OpenONUACInterAdapter, error) {
277 var err error
278 sAcONUInterAdapter := ac.NewOpenONUACAdapter(ctx, onuA)
279
280 if err = sAcONUInterAdapter.Start(ctx); err != nil {
281 logger.Fatalw(ctx, "error-starting-OpenONUACInterAdapter", log.Fields{"error": err})
282 return nil, err
283 }
284
285 logger.Info(ctx, "OpenONUACInterAdapter-started")
286 return sAcONUInterAdapter, nil
287}
288
khenaidoo7d3c5582021-08-11 18:09:44 -0400289func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000290 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100291 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000292 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000293 "adapterID": adapterID,
294 "currentReplica": a.config.CurrentReplica,
295 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100296 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000297 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700298 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100299 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
300 Vendor: "VOLTHA OpenONUGo",
301 Version: version.VersionInfo.Version,
khenaidoo7d3c5582021-08-11 18:09:44 -0400302 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700303 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000304 CurrentReplica: int32(a.config.CurrentReplica),
305 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700306 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000307 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100308 VendorIds: vendorIdsList,
khenaidoo7d3c5582021-08-11 18:09:44 -0400309 AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
310 Adapter: "brcm_openomci_onu", // Deprecated attribute
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000311 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
312 AcceptsAddRemoveFlowUpdates: true}}
313 deviceTypes := &voltha.DeviceTypes{Items: types}
314 count := 0
315 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400316 gClient, err := a.coreClient.GetCoreServiceClient()
317 if gClient != nil {
318 if gClient != nil {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400319 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &core_adapter.AdapterRegistration{
khenaidoo7d3c5582021-08-11 18:09:44 -0400320 Adapter: adapterDescription,
321 DTypes: deviceTypes}); err == nil {
322 break
323 }
324 }
325 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000326 if retries == count {
327 return err
328 }
329 count++
khenaidoo7d3c5582021-08-11 18:09:44 -0400330 // Take a power nap before retrying
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000331 time.Sleep(2 * time.Second)
khenaidoo7d3c5582021-08-11 18:09:44 -0400332
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000333 }
334 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400335 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000336 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000337 return nil
338}
339
khenaidoo7d3c5582021-08-11 18:09:44 -0400340// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
341func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
342 logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
343
344 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
345 logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
346
347 server.Start(ctx)
348 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
349}
350
khenaidoo42dcdfd2021-10-19 17:34:12 -0400351func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400352 logger.Info(ctx, "adding-adapter-service")
353
354 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400355 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400356 })
357}
358
khenaidoo42dcdfd2021-10-19 17:34:12 -0400359func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler onu_inter_adapter_service.OnuInterAdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400360 logger.Info(ctx, "adding-onu-inter-adapter-service")
361
362 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400363 onu_inter_adapter_service.RegisterOnuInterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400364 })
365}
366
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000367/**
368This function checks the liveliness and readiness of the kakfa and kv-client services
369and update the status in the probe.
370*/
371func (a *adapter) checkServicesReadiness(ctx context.Context) {
372 // checks the kafka readiness
khenaidoo7d3c5582021-08-11 18:09:44 -0400373 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000374
375 // checks the kv-store readiness
376 go a.checkKvStoreReadiness(ctx)
377}
378
379/**
380This function checks the liveliness and readiness of the kv-store service
381and update the status in the probe.
382*/
383func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
384 // dividing the live probe interval by 2 to get updated status every 30s
385 timeout := a.config.LiveProbeInterval / 2
386 kvStoreChannel := make(chan bool, 1)
387
khenaidoo7d3c5582021-08-11 18:09:44 -0400388 // Default true - we are here only after we already had a KV store connection
389 kvStoreChannel <- true
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000390 for {
391 timeoutTimer := time.NewTimer(timeout)
392 select {
393 case liveliness := <-kvStoreChannel:
394 if !liveliness {
395 // kv-store not reachable or down, updating the status to not ready state
khenaidoo7d3c5582021-08-11 18:09:44 -0400396 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000397 timeout = a.config.NotLiveProbeInterval
398 } else {
399 // kv-store is reachable , updating the status to running state
khenaidoo7d3c5582021-08-11 18:09:44 -0400400 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000401 timeout = a.config.LiveProbeInterval / 2
402 }
403 // Check if the timer has expired or not
404 if !timeoutTimer.Stop() {
405 <-timeoutTimer.C
406 }
407 case <-timeoutTimer.C:
408 // Check the status of the kv-store
dbainbri4d3a0dc2020-12-02 00:33:42 +0000409 logger.Info(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000410 if a.kvClient.IsConnectionUp(ctx) {
411 kvStoreChannel <- true
412 } else {
413 kvStoreChannel <- false
414 }
415 }
416 }
417}
418
khenaidoo7d3c5582021-08-11 18:09:44 -0400419// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
420// context times out.
421func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
422 if kvClient == nil {
423 return errors.New("kvclient-is-nil")
424 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000425 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400426 if !kvClient.IsConnectionUp(ctx) {
427 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
428 logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
429 select {
430 case <-time.After(connectionRetryInterval):
431 continue
432 case <-ctx.Done():
433 return ctx.Err()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000434 }
435 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400436 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
437 logger.Info(ctx, "kv-connection-up")
438 break
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000439 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400440 return nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000441}
442
443// Adapter Utility methods ##### end #########
444// #############################################
445
dbainbri4d3a0dc2020-12-02 00:33:42 +0000446func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000447 if version.VersionInfo.Version == "unknown-version" {
448 content, err := ioutil.ReadFile("VERSION")
449 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100450 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000451 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000452 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000453 }
454 return version.VersionInfo.Version
455}
456
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000457func printVersion(appName string) {
458 fmt.Println(appName)
459 fmt.Println(version.VersionInfo.String(" "))
460}
461
462func printBanner() {
463 fmt.Println(" ____ ____ ___ ___ _ ")
464 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
465 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
466 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
467 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
468 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
469 fmt.Println(" | | __| |")
470 fmt.Println(" |_| |____/")
471 fmt.Println(" ")
472}
473
474func waitForExit(ctx context.Context) int {
475 signalChannel := make(chan os.Signal, 1)
476 signal.Notify(signalChannel,
477 syscall.SIGHUP,
478 syscall.SIGINT,
479 syscall.SIGTERM,
480 syscall.SIGQUIT)
481
482 exitChannel := make(chan int)
483
484 go func() {
485 select {
486 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000487 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000488 exitChannel <- 2
489 case s := <-signalChannel:
490 switch s {
491 case syscall.SIGHUP,
492 syscall.SIGINT,
493 syscall.SIGTERM,
494 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000495 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000496 exitChannel <- 0
497 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000498 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000499 exitChannel <- 1
500 }
501 }
502 }()
503
504 code := <-exitChannel
505 return code
506}
507
508func main() {
509 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000510 ctx, cancel := context.WithCancel(context.Background())
511 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000512
khenaidoo7d3c5582021-08-11 18:09:44 -0400513 cf := &config.AdapterFlags{}
514 cf.ParseCommandArguments(os.Args[1:])
515
dbainbri4d3a0dc2020-12-02 00:33:42 +0000516 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000517
518 // Setup logging
519
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000520 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700521 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000522 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700523 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000524
525 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000526 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000527 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000528 }
529
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000530 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000531 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000532 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000533 }
534
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000535 log.SetAllLogLevel(logLevel)
536
dbainbri4d3a0dc2020-12-02 00:33:42 +0000537 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000538
Himani Chawla4d908332020-08-31 12:30:20 +0530539 defer func() {
540 _ = log.CleanUp()
541 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000542 // Print version / build information and exit
543 if cf.DisplayVersionOnly {
544 printVersion(defaultAppName)
545 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000546 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000547 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
548 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
549 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000550
551 // Print banner if specified
552 if cf.Banner {
553 printBanner()
554 }
555
dbainbri4d3a0dc2020-12-02 00:33:42 +0000556 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000557
558 ad := newAdapter(cf)
559
560 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000561 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000562
dbainbri4d3a0dc2020-12-02 00:33:42 +0000563 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
564 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000565
566 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
567
dbainbri4d3a0dc2020-12-02 00:33:42 +0000568 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
569 if err != nil {
570 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
571 } else {
572 defer log.TerminateTracing(closer)
573 }
574
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000575 go func() {
576 err := ad.start(probeCtx)
577 // If this operation returns an error
578 // cancel all operations using this context
579 if err != nil {
580 cancel()
581 }
582 }()
583
584 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000585 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000586
khenaidoo1fd58e02021-10-13 11:51:20 -0400587 // Set the ONU adapter GRPC service as not ready. This will prevent any request from coming to this adapter instance
588 probe.UpdateStatusFromContext(probeCtx, onuAdapterService, probe.ServiceStatusStopped)
589
Girish Gowdra55507832022-06-01 18:12:06 -0700590 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
591 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000592 // Cleanup before leaving
Girish Gowdra55507832022-06-01 18:12:06 -0700593 ad.stop(ctxWithCancel)
594 // Will halt any long-running stop routine gracefully
595 cancelFunc()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000596
597 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000598 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
599 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000600}