blob: ce5dc6fe5719ca029d4e7eaf852a76ffcf11163b [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010027 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000028 "syscall"
29 "time"
30
khenaidoo7d3c5582021-08-11 18:09:44 -040031 "github.com/golang/protobuf/ptypes/empty"
32 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
33 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events"
35 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
36 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
37 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
38 "github.com/opencord/voltha-lib-go/v7/pkg/log"
39 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
40 "github.com/opencord/voltha-lib-go/v7/pkg/version"
41 "github.com/opencord/voltha-protos/v5/go/adapter_services"
42 "github.com/opencord/voltha-protos/v5/go/core"
43 "github.com/opencord/voltha-protos/v5/go/voltha"
44 "google.golang.org/grpc"
45
46 ic "github.com/opencord/voltha-protos/v5/go/inter_container"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000047
Matteo Scandolo761f7512020-11-23 15:52:40 -080048 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
49 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/onuadaptercore"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000050)
51
khenaidoo7d3c5582021-08-11 18:09:44 -040052const (
53 clusterMessagingService = "cluster-message-service"
54 onuAdapterService = "onu-adapter-service"
55 kvService = "kv-service"
56 coreService = "core-service"
57)
58
Holger Hildebrandtfa074992020-03-27 15:42:06 +000059type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053060 //defaultAppName string
khenaidoo7d3c5582021-08-11 18:09:44 -040061 instanceID string
62 config *config.AdapterFlags
63 kafkaClient kafka.Client
64 kvClient kvstore.Client
65 eventProxy eventif.EventProxy
66 grpcServer *vgrpc.GrpcServer
67 onuAdapter *ac.OpenONUAC
68 coreClient *vgrpc.Client
69 halted bool
70 exitChannel chan int
Holger Hildebrandtfa074992020-03-27 15:42:06 +000071}
72
Holger Hildebrandtfa074992020-03-27 15:42:06 +000073func newAdapter(cf *config.AdapterFlags) *adapter {
74 var a adapter
75 a.instanceID = cf.InstanceID
76 a.config = cf
77 a.halted = false
78 a.exitChannel = make(chan int, 1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +000079 return &a
80}
81
82func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000083 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000084 var err error
85
86 var p *probe.Probe
87 if value := ctx.Value(probe.ProbeContextKey); value != nil {
88 if _, ok := value.(*probe.Probe); ok {
89 p = value.(*probe.Probe)
90 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000091 ctx,
khenaidoo7d3c5582021-08-11 18:09:44 -040092 clusterMessagingService,
93 kvService,
94 onuAdapterService,
95 coreService,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000096 )
97 }
98 }
99
100 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +0000101 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
102 if err = a.setKVClient(ctx); err != nil {
103 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000104 }
105
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000106 // Setup Log Config
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800107 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000108 go conf.StartLogLevelConfigProcessing(cm, ctx)
109
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000110 // Setup Kafka Client
khenaidoo7d3c5582021-08-11 18:09:44 -0400111 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000112 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000113 }
114
khenaidoo7d3c5582021-08-11 18:09:44 -0400115 // Start kafka communication with the broker
116 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
117 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000118 }
119
khenaidoo7d3c5582021-08-11 18:09:44 -0400120 // Wait until connection to KV store is established
121 if err := WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
122 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000123 }
124
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000125 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530126 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo7d3c5582021-08-11 18:09:44 -0400127 go func() {
128 if err := a.eventProxy.Start(); err != nil {
129 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
130 }
131 }()
132
133 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
134 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
135 if a.coreClient, err = vgrpc.NewClient(a.config.CoreEndpoint,
136 a.coreRestarted,
137 vgrpc.ActivityCheck(true)); err != nil {
138 logger.Fatal(ctx, "grpc-client-not-created")
139 }
140 // Start the core grpc client
141 go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000142
143 // Create the open ONU interface adapter
khenaidoo7d3c5582021-08-11 18:09:44 -0400144 if a.onuAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000145 logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000146 }
147
khenaidoo7d3c5582021-08-11 18:09:44 -0400148 // Create and start the grpc server
149 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
150
151 //Register the adapter service
152 a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
153
154 //Register the onu inter adapter service
155 a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuAdapter)
156
157 go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000158
159 // Register this adapter to the Core - retries indefinitely
khenaidoo7d3c5582021-08-11 18:09:44 -0400160 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000161 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000162 }
163
khenaidoo7d3c5582021-08-11 18:09:44 -0400164 // Start the readiness and liveliness check and update the probe status
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000165 a.checkServicesReadiness(ctx)
166 return err
167}
168
khenaidoo7d3c5582021-08-11 18:09:44 -0400169// TODO: Any action the adapter needs to do following a Core restart?
170func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
171 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
172 return nil
173}
174
175// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
176func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
177 svc := core.NewCoreServiceClient(conn)
178 if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
179 return nil
180 }
181 return svc
182}
183
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000184func (a *adapter) stop(ctx context.Context) {
185 // Stop leadership tracking
186 a.halted = true
187
188 // send exit signal
189 a.exitChannel <- 0
190
191 // Cleanup - applies only if we had a kvClient
192 if a.kvClient != nil {
193 // Release all reservations
194 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000195 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000196 }
197 // Close the DB connection
dbainbri4d3a0dc2020-12-02 00:33:42 +0000198 a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000199 }
200
khenaidoo7d3c5582021-08-11 18:09:44 -0400201 if a.eventProxy != nil {
202 a.eventProxy.Stop()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000203 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400204
205 if a.kafkaClient != nil {
206 a.kafkaClient.Stop(ctx)
207 }
208
209 // Stop core client
210 if a.coreClient != nil {
211 a.coreClient.Stop(ctx)
212 }
213
214 // TODO: More cleanup
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000215}
216
217// #############################################
218// Adapter Utility methods ##### begin #########
219
dbainbri4d3a0dc2020-12-02 00:33:42 +0000220func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
221 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000222 switch storeType {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000223 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000224 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000225 }
226 return nil, errors.New("unsupported-kv-store")
227}
228
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800229func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000230
dbainbri4d3a0dc2020-12-02 00:33:42 +0000231 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000232
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000233 switch clientType {
234 case "sarama":
235 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000236 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000237 kafka.ProducerReturnOnErrors(true),
238 kafka.ProducerReturnOnSuccess(true),
239 kafka.ProducerMaxRetries(6),
240 kafka.ProducerRetryBackoff(time.Millisecond*30),
241 kafka.MetadatMaxRetries(15)), nil
242 }
243
244 return nil, errors.New("unsupported-client-type")
245}
246
dbainbri4d3a0dc2020-12-02 00:33:42 +0000247func (a *adapter) setKVClient(ctx context.Context) error {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800248 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000249 if err != nil {
250 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000251 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000252 return err
253 }
254 a.kvClient = client
255 return nil
256}
257
khenaidoo7d3c5582021-08-11 18:09:44 -0400258func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800259 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000260 var err error
khenaidoo7d3c5582021-08-11 18:09:44 -0400261 sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000262
263 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000264 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000265 return nil, err
266 }
267
dbainbri4d3a0dc2020-12-02 00:33:42 +0000268 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000269 return sAcONU, nil
270}
271
khenaidoo7d3c5582021-08-11 18:09:44 -0400272func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000273 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100274 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000275 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000276 "adapterID": adapterID,
277 "currentReplica": a.config.CurrentReplica,
278 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100279 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000280 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700281 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100282 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
283 Vendor: "VOLTHA OpenONUGo",
284 Version: version.VersionInfo.Version,
khenaidoo7d3c5582021-08-11 18:09:44 -0400285 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700286 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000287 CurrentReplica: int32(a.config.CurrentReplica),
288 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700289 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000290 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100291 VendorIds: vendorIdsList,
khenaidoo7d3c5582021-08-11 18:09:44 -0400292 AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
293 Adapter: "brcm_openomci_onu", // Deprecated attribute
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000294 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
295 AcceptsAddRemoveFlowUpdates: true}}
296 deviceTypes := &voltha.DeviceTypes{Items: types}
297 count := 0
298 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400299 gClient, err := a.coreClient.GetCoreServiceClient()
300 if gClient != nil {
301 if gClient != nil {
302 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ic.AdapterRegistration{
303 Adapter: adapterDescription,
304 DTypes: deviceTypes}); err == nil {
305 break
306 }
307 }
308 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000309 if retries == count {
310 return err
311 }
312 count++
khenaidoo7d3c5582021-08-11 18:09:44 -0400313 // Take a power nap before retrying
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000314 time.Sleep(2 * time.Second)
khenaidoo7d3c5582021-08-11 18:09:44 -0400315
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000316 }
317 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400318 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000319 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000320 return nil
321}
322
khenaidoo7d3c5582021-08-11 18:09:44 -0400323// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
324func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
325 logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
326
327 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
328 logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
329
330 server.Start(ctx)
331 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
332}
333
334func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.AdapterServiceServer) {
335 logger.Info(ctx, "adding-adapter-service")
336
337 server.AddService(func(gs *grpc.Server) {
338 adapter_services.RegisterAdapterServiceServer(gs, handler)
339 })
340}
341
342func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.OnuInterAdapterServiceServer) {
343 logger.Info(ctx, "adding-onu-inter-adapter-service")
344
345 server.AddService(func(gs *grpc.Server) {
346 adapter_services.RegisterOnuInterAdapterServiceServer(gs, handler)
347 })
348}
349
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000350/**
351This function checks the liveliness and readiness of the kakfa and kv-client services
352and update the status in the probe.
353*/
354func (a *adapter) checkServicesReadiness(ctx context.Context) {
355 // checks the kafka readiness
khenaidoo7d3c5582021-08-11 18:09:44 -0400356 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000357
358 // checks the kv-store readiness
359 go a.checkKvStoreReadiness(ctx)
360}
361
362/**
363This function checks the liveliness and readiness of the kv-store service
364and update the status in the probe.
365*/
366func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
367 // dividing the live probe interval by 2 to get updated status every 30s
368 timeout := a.config.LiveProbeInterval / 2
369 kvStoreChannel := make(chan bool, 1)
370
khenaidoo7d3c5582021-08-11 18:09:44 -0400371 // Default true - we are here only after we already had a KV store connection
372 kvStoreChannel <- true
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000373 for {
374 timeoutTimer := time.NewTimer(timeout)
375 select {
376 case liveliness := <-kvStoreChannel:
377 if !liveliness {
378 // kv-store not reachable or down, updating the status to not ready state
khenaidoo7d3c5582021-08-11 18:09:44 -0400379 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000380 timeout = a.config.NotLiveProbeInterval
381 } else {
382 // kv-store is reachable , updating the status to running state
khenaidoo7d3c5582021-08-11 18:09:44 -0400383 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000384 timeout = a.config.LiveProbeInterval / 2
385 }
386 // Check if the timer has expired or not
387 if !timeoutTimer.Stop() {
388 <-timeoutTimer.C
389 }
390 case <-timeoutTimer.C:
391 // Check the status of the kv-store
dbainbri4d3a0dc2020-12-02 00:33:42 +0000392 logger.Info(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000393 if a.kvClient.IsConnectionUp(ctx) {
394 kvStoreChannel <- true
395 } else {
396 kvStoreChannel <- false
397 }
398 }
399 }
400}
401
khenaidoo7d3c5582021-08-11 18:09:44 -0400402// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
403// context times out.
404func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
405 if kvClient == nil {
406 return errors.New("kvclient-is-nil")
407 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000408 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400409 if !kvClient.IsConnectionUp(ctx) {
410 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
411 logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
412 select {
413 case <-time.After(connectionRetryInterval):
414 continue
415 case <-ctx.Done():
416 return ctx.Err()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000417 }
418 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400419 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
420 logger.Info(ctx, "kv-connection-up")
421 break
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000422 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400423 return nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000424}
425
426// Adapter Utility methods ##### end #########
427// #############################################
428
dbainbri4d3a0dc2020-12-02 00:33:42 +0000429func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000430 if version.VersionInfo.Version == "unknown-version" {
431 content, err := ioutil.ReadFile("VERSION")
432 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100433 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000434 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000435 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000436 }
437 return version.VersionInfo.Version
438}
439
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000440func printVersion(appName string) {
441 fmt.Println(appName)
442 fmt.Println(version.VersionInfo.String(" "))
443}
444
445func printBanner() {
446 fmt.Println(" ____ ____ ___ ___ _ ")
447 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
448 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
449 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
450 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
451 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
452 fmt.Println(" | | __| |")
453 fmt.Println(" |_| |____/")
454 fmt.Println(" ")
455}
456
457func waitForExit(ctx context.Context) int {
458 signalChannel := make(chan os.Signal, 1)
459 signal.Notify(signalChannel,
460 syscall.SIGHUP,
461 syscall.SIGINT,
462 syscall.SIGTERM,
463 syscall.SIGQUIT)
464
465 exitChannel := make(chan int)
466
467 go func() {
468 select {
469 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000470 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000471 exitChannel <- 2
472 case s := <-signalChannel:
473 switch s {
474 case syscall.SIGHUP,
475 syscall.SIGINT,
476 syscall.SIGTERM,
477 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000478 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000479 exitChannel <- 0
480 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000481 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000482 exitChannel <- 1
483 }
484 }
485 }()
486
487 code := <-exitChannel
488 return code
489}
490
491func main() {
492 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000493 ctx, cancel := context.WithCancel(context.Background())
494 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000495
khenaidoo7d3c5582021-08-11 18:09:44 -0400496 cf := &config.AdapterFlags{}
497 cf.ParseCommandArguments(os.Args[1:])
498
dbainbri4d3a0dc2020-12-02 00:33:42 +0000499 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000500
501 // Setup logging
502
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000503 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700504 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000505 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700506 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000507
508 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000509 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000510 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000511 }
512
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000513 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000514 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000515 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000516 }
517
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000518 log.SetAllLogLevel(logLevel)
519
dbainbri4d3a0dc2020-12-02 00:33:42 +0000520 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000521
Himani Chawla4d908332020-08-31 12:30:20 +0530522 defer func() {
523 _ = log.CleanUp()
524 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000525 // Print version / build information and exit
526 if cf.DisplayVersionOnly {
527 printVersion(defaultAppName)
528 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000529 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000530 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
531 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
532 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000533
534 // Print banner if specified
535 if cf.Banner {
536 printBanner()
537 }
538
dbainbri4d3a0dc2020-12-02 00:33:42 +0000539 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000540
541 ad := newAdapter(cf)
542
543 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000544 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000545
dbainbri4d3a0dc2020-12-02 00:33:42 +0000546 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
547 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000548
549 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
550
dbainbri4d3a0dc2020-12-02 00:33:42 +0000551 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
552 if err != nil {
553 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
554 } else {
555 defer log.TerminateTracing(closer)
556 }
557
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000558 go func() {
559 err := ad.start(probeCtx)
560 // If this operation returns an error
561 // cancel all operations using this context
562 if err != nil {
563 cancel()
564 }
565 }()
566
567 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000568 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000569
570 // Cleanup before leaving
571 ad.stop(ctx)
572
573 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000574 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
575 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000576}