blob: 96bfabe51c8556a676f5d8413abe2bac22f6fb6f [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010027 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000028 "syscall"
29 "time"
30
khenaidoo7d3c5582021-08-11 18:09:44 -040031 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
32 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v7/pkg/events"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
35 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
36 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v7/pkg/log"
38 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
39 "github.com/opencord/voltha-lib-go/v7/pkg/version"
khenaidoo42dcdfd2021-10-19 17:34:12 -040040 "github.com/opencord/voltha-protos/v5/go/adapter_service"
khenaidoo55cebc62021-12-08 14:44:41 -050041 "github.com/opencord/voltha-protos/v5/go/common"
khenaidoo42dcdfd2021-10-19 17:34:12 -040042 "github.com/opencord/voltha-protos/v5/go/core_service"
43 "github.com/opencord/voltha-protos/v5/go/health"
44 "github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
khenaidoo7d3c5582021-08-11 18:09:44 -040045 "github.com/opencord/voltha-protos/v5/go/voltha"
46 "google.golang.org/grpc"
47
khenaidoo42dcdfd2021-10-19 17:34:12 -040048 "github.com/opencord/voltha-protos/v5/go/core_adapter"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000049
Matteo Scandolo761f7512020-11-23 15:52:40 -080050 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
Holger Hildebrandt4b5e73f2021-08-19 06:51:21 +000051 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/core"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000052)
53
khenaidoo7d3c5582021-08-11 18:09:44 -040054const (
55 clusterMessagingService = "cluster-message-service"
56 onuAdapterService = "onu-adapter-service"
57 kvService = "kv-service"
58 coreService = "core-service"
59)
60
Holger Hildebrandtfa074992020-03-27 15:42:06 +000061type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053062 //defaultAppName string
khenaidoo7d3c5582021-08-11 18:09:44 -040063 instanceID string
64 config *config.AdapterFlags
65 kafkaClient kafka.Client
66 kvClient kvstore.Client
67 eventProxy eventif.EventProxy
68 grpcServer *vgrpc.GrpcServer
69 onuAdapter *ac.OpenONUAC
70 coreClient *vgrpc.Client
Holger Hildebrandtfa074992020-03-27 15:42:06 +000071}
72
Holger Hildebrandtfa074992020-03-27 15:42:06 +000073func newAdapter(cf *config.AdapterFlags) *adapter {
74 var a adapter
75 a.instanceID = cf.InstanceID
76 a.config = cf
Holger Hildebrandtfa074992020-03-27 15:42:06 +000077 return &a
78}
79
80func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000081 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000082 var err error
83
84 var p *probe.Probe
85 if value := ctx.Value(probe.ProbeContextKey); value != nil {
86 if _, ok := value.(*probe.Probe); ok {
87 p = value.(*probe.Probe)
88 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000089 ctx,
khenaidoo7d3c5582021-08-11 18:09:44 -040090 clusterMessagingService,
91 kvService,
92 onuAdapterService,
93 coreService,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000094 )
95 }
96 }
97
98 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +000099 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
100 if err = a.setKVClient(ctx); err != nil {
101 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000102 }
103
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000104 // Setup Log Config
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800105 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000106 go conf.StartLogLevelConfigProcessing(cm, ctx)
107
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000108 // Setup Kafka Client
khenaidoo7d3c5582021-08-11 18:09:44 -0400109 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000110 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000111 }
112
khenaidoo7d3c5582021-08-11 18:09:44 -0400113 // Start kafka communication with the broker
114 if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
115 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000116 }
117
khenaidoo7d3c5582021-08-11 18:09:44 -0400118 // Wait until connection to KV store is established
119 if err := WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
120 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000121 }
122
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000123 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530124 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo7d3c5582021-08-11 18:09:44 -0400125 go func() {
126 if err := a.eventProxy.Start(); err != nil {
127 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
128 }
129 }()
130
131 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
132 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo55cebc62021-12-08 14:44:41 -0500133 if a.coreClient, err = vgrpc.NewClient(
134 a.config.AdapterEndpoint,
135 a.config.CoreEndpoint,
136 a.coreRestarted); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400137 logger.Fatal(ctx, "grpc-client-not-created")
138 }
139 // Start the core grpc client
140 go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000141
142 // Create the open ONU interface adapter
khenaidoo7d3c5582021-08-11 18:09:44 -0400143 if a.onuAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000144 logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000145 }
146
khenaidoo7d3c5582021-08-11 18:09:44 -0400147 // Create and start the grpc server
148 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
149
150 //Register the adapter service
151 a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
152
153 //Register the onu inter adapter service
154 a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuAdapter)
155
156 go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000157
158 // Register this adapter to the Core - retries indefinitely
khenaidoo7d3c5582021-08-11 18:09:44 -0400159 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000160 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000161 }
162
khenaidoo7d3c5582021-08-11 18:09:44 -0400163 // Start the readiness and liveliness check and update the probe status
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000164 a.checkServicesReadiness(ctx)
165 return err
166}
167
khenaidoo7d3c5582021-08-11 18:09:44 -0400168// TODO: Any action the adapter needs to do following a Core restart?
169func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
170 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
171 return nil
172}
173
174// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
khenaidoo55cebc62021-12-08 14:44:41 -0500175func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn, clientConn *common.Connection) interface{} {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400176 svc := core_service.NewCoreServiceClient(conn)
khenaidoo55cebc62021-12-08 14:44:41 -0500177 if h, err := svc.GetHealthStatus(ctx, clientConn); err != nil || h.State != health.HealthStatus_HEALTHY {
khenaidoo7d3c5582021-08-11 18:09:44 -0400178 return nil
179 }
180 return svc
181}
182
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000183func (a *adapter) stop(ctx context.Context) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000184 // Cleanup - applies only if we had a kvClient
185 if a.kvClient != nil {
186 // Release all reservations
187 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000188 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000189 }
190 // Close the DB connection
dbainbri4d3a0dc2020-12-02 00:33:42 +0000191 a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000192 }
193
khenaidoo7d3c5582021-08-11 18:09:44 -0400194 if a.eventProxy != nil {
195 a.eventProxy.Stop()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000196 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400197
198 if a.kafkaClient != nil {
199 a.kafkaClient.Stop(ctx)
200 }
201
202 // Stop core client
203 if a.coreClient != nil {
204 a.coreClient.Stop(ctx)
205 }
206
207 // TODO: More cleanup
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000208}
209
210// #############################################
211// Adapter Utility methods ##### begin #########
212
dbainbri4d3a0dc2020-12-02 00:33:42 +0000213func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
214 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000215 switch storeType {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000216 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000217 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000218 }
219 return nil, errors.New("unsupported-kv-store")
220}
221
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800222func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000223
dbainbri4d3a0dc2020-12-02 00:33:42 +0000224 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000225
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000226 switch clientType {
227 case "sarama":
228 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000229 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000230 kafka.ProducerReturnOnErrors(true),
231 kafka.ProducerReturnOnSuccess(true),
232 kafka.ProducerMaxRetries(6),
233 kafka.ProducerRetryBackoff(time.Millisecond*30),
234 kafka.MetadatMaxRetries(15)), nil
235 }
236
237 return nil, errors.New("unsupported-client-type")
238}
239
dbainbri4d3a0dc2020-12-02 00:33:42 +0000240func (a *adapter) setKVClient(ctx context.Context) error {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800241 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000242 if err != nil {
243 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000244 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000245 return err
246 }
247 a.kvClient = client
248 return nil
249}
250
khenaidoo7d3c5582021-08-11 18:09:44 -0400251func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800252 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000253 var err error
khenaidoo7d3c5582021-08-11 18:09:44 -0400254 sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000255
256 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000257 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000258 return nil, err
259 }
260
dbainbri4d3a0dc2020-12-02 00:33:42 +0000261 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000262 return sAcONU, nil
263}
264
khenaidoo7d3c5582021-08-11 18:09:44 -0400265func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000266 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100267 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000268 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000269 "adapterID": adapterID,
270 "currentReplica": a.config.CurrentReplica,
271 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100272 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000273 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700274 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100275 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
276 Vendor: "VOLTHA OpenONUGo",
277 Version: version.VersionInfo.Version,
khenaidoo7d3c5582021-08-11 18:09:44 -0400278 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700279 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000280 CurrentReplica: int32(a.config.CurrentReplica),
281 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700282 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000283 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100284 VendorIds: vendorIdsList,
khenaidoo7d3c5582021-08-11 18:09:44 -0400285 AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
286 Adapter: "brcm_openomci_onu", // Deprecated attribute
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000287 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
288 AcceptsAddRemoveFlowUpdates: true}}
289 deviceTypes := &voltha.DeviceTypes{Items: types}
290 count := 0
291 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400292 gClient, err := a.coreClient.GetCoreServiceClient()
293 if gClient != nil {
294 if gClient != nil {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400295 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &core_adapter.AdapterRegistration{
khenaidoo7d3c5582021-08-11 18:09:44 -0400296 Adapter: adapterDescription,
297 DTypes: deviceTypes}); err == nil {
298 break
299 }
300 }
301 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000302 if retries == count {
303 return err
304 }
305 count++
khenaidoo7d3c5582021-08-11 18:09:44 -0400306 // Take a power nap before retrying
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000307 time.Sleep(2 * time.Second)
khenaidoo7d3c5582021-08-11 18:09:44 -0400308
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000309 }
310 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400311 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000312 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000313 return nil
314}
315
khenaidoo7d3c5582021-08-11 18:09:44 -0400316// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
317func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
318 logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
319
320 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
321 logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
322
323 server.Start(ctx)
324 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
325}
326
khenaidoo42dcdfd2021-10-19 17:34:12 -0400327func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400328 logger.Info(ctx, "adding-adapter-service")
329
330 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400331 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400332 })
333}
334
khenaidoo42dcdfd2021-10-19 17:34:12 -0400335func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler onu_inter_adapter_service.OnuInterAdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400336 logger.Info(ctx, "adding-onu-inter-adapter-service")
337
338 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400339 onu_inter_adapter_service.RegisterOnuInterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400340 })
341}
342
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000343/**
344This function checks the liveliness and readiness of the kakfa and kv-client services
345and update the status in the probe.
346*/
347func (a *adapter) checkServicesReadiness(ctx context.Context) {
348 // checks the kafka readiness
khenaidoo7d3c5582021-08-11 18:09:44 -0400349 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000350
351 // checks the kv-store readiness
352 go a.checkKvStoreReadiness(ctx)
353}
354
355/**
356This function checks the liveliness and readiness of the kv-store service
357and update the status in the probe.
358*/
359func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
360 // dividing the live probe interval by 2 to get updated status every 30s
361 timeout := a.config.LiveProbeInterval / 2
362 kvStoreChannel := make(chan bool, 1)
363
khenaidoo7d3c5582021-08-11 18:09:44 -0400364 // Default true - we are here only after we already had a KV store connection
365 kvStoreChannel <- true
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000366 for {
367 timeoutTimer := time.NewTimer(timeout)
368 select {
369 case liveliness := <-kvStoreChannel:
370 if !liveliness {
371 // kv-store not reachable or down, updating the status to not ready state
khenaidoo7d3c5582021-08-11 18:09:44 -0400372 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000373 timeout = a.config.NotLiveProbeInterval
374 } else {
375 // kv-store is reachable , updating the status to running state
khenaidoo7d3c5582021-08-11 18:09:44 -0400376 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000377 timeout = a.config.LiveProbeInterval / 2
378 }
379 // Check if the timer has expired or not
380 if !timeoutTimer.Stop() {
381 <-timeoutTimer.C
382 }
383 case <-timeoutTimer.C:
384 // Check the status of the kv-store
dbainbri4d3a0dc2020-12-02 00:33:42 +0000385 logger.Info(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000386 if a.kvClient.IsConnectionUp(ctx) {
387 kvStoreChannel <- true
388 } else {
389 kvStoreChannel <- false
390 }
391 }
392 }
393}
394
khenaidoo7d3c5582021-08-11 18:09:44 -0400395// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
396// context times out.
397func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
398 if kvClient == nil {
399 return errors.New("kvclient-is-nil")
400 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000401 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400402 if !kvClient.IsConnectionUp(ctx) {
403 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
404 logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
405 select {
406 case <-time.After(connectionRetryInterval):
407 continue
408 case <-ctx.Done():
409 return ctx.Err()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000410 }
411 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400412 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
413 logger.Info(ctx, "kv-connection-up")
414 break
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000415 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400416 return nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000417}
418
419// Adapter Utility methods ##### end #########
420// #############################################
421
dbainbri4d3a0dc2020-12-02 00:33:42 +0000422func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000423 if version.VersionInfo.Version == "unknown-version" {
424 content, err := ioutil.ReadFile("VERSION")
425 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100426 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000427 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000428 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000429 }
430 return version.VersionInfo.Version
431}
432
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000433func printVersion(appName string) {
434 fmt.Println(appName)
435 fmt.Println(version.VersionInfo.String(" "))
436}
437
438func printBanner() {
439 fmt.Println(" ____ ____ ___ ___ _ ")
440 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
441 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
442 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
443 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
444 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
445 fmt.Println(" | | __| |")
446 fmt.Println(" |_| |____/")
447 fmt.Println(" ")
448}
449
450func waitForExit(ctx context.Context) int {
451 signalChannel := make(chan os.Signal, 1)
452 signal.Notify(signalChannel,
453 syscall.SIGHUP,
454 syscall.SIGINT,
455 syscall.SIGTERM,
456 syscall.SIGQUIT)
457
458 exitChannel := make(chan int)
459
460 go func() {
461 select {
462 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000463 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000464 exitChannel <- 2
465 case s := <-signalChannel:
466 switch s {
467 case syscall.SIGHUP,
468 syscall.SIGINT,
469 syscall.SIGTERM,
470 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000471 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000472 exitChannel <- 0
473 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000474 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000475 exitChannel <- 1
476 }
477 }
478 }()
479
480 code := <-exitChannel
481 return code
482}
483
484func main() {
485 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000486 ctx, cancel := context.WithCancel(context.Background())
487 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000488
khenaidoo7d3c5582021-08-11 18:09:44 -0400489 cf := &config.AdapterFlags{}
490 cf.ParseCommandArguments(os.Args[1:])
491
dbainbri4d3a0dc2020-12-02 00:33:42 +0000492 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000493
494 // Setup logging
495
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000496 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700497 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000498 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700499 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000500
501 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000502 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000503 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000504 }
505
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000506 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000507 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000508 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000509 }
510
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000511 log.SetAllLogLevel(logLevel)
512
dbainbri4d3a0dc2020-12-02 00:33:42 +0000513 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000514
Himani Chawla4d908332020-08-31 12:30:20 +0530515 defer func() {
516 _ = log.CleanUp()
517 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000518 // Print version / build information and exit
519 if cf.DisplayVersionOnly {
520 printVersion(defaultAppName)
521 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000522 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000523 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
524 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
525 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000526
527 // Print banner if specified
528 if cf.Banner {
529 printBanner()
530 }
531
dbainbri4d3a0dc2020-12-02 00:33:42 +0000532 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000533
534 ad := newAdapter(cf)
535
536 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000537 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000538
dbainbri4d3a0dc2020-12-02 00:33:42 +0000539 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
540 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000541
542 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
543
dbainbri4d3a0dc2020-12-02 00:33:42 +0000544 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
545 if err != nil {
546 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
547 } else {
548 defer log.TerminateTracing(closer)
549 }
550
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000551 go func() {
552 err := ad.start(probeCtx)
553 // If this operation returns an error
554 // cancel all operations using this context
555 if err != nil {
556 cancel()
557 }
558 }()
559
560 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000561 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000562
khenaidoo1fd58e02021-10-13 11:51:20 -0400563 // Set the ONU adapter GRPC service as not ready. This will prevent any request from coming to this adapter instance
564 probe.UpdateStatusFromContext(probeCtx, onuAdapterService, probe.ServiceStatusStopped)
565
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000566 // Cleanup before leaving
567 ad.stop(ctx)
568
569 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000570 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
571 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000572}