blob: 86e2f2a3ad9389946603c49c0d73436c36cb5032 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
27 "strconv"
28 "syscall"
29 "time"
30
31 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
32 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
33 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000034 conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000035 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
36 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v3/pkg/log"
38 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000039 "github.com/opencord/voltha-lib-go/v3/pkg/version"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000040 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
41 "github.com/opencord/voltha-protos/v3/go/voltha"
42
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000043 "test.internal/openadapter/internal/pkg/config"
44 ac "test.internal/openadapter/internal/pkg/onuadaptercore"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000045)
46
47type adapter struct {
48 defaultAppName string
49 instanceID string
50 config *config.AdapterFlags
51 iAdapter adapters.IAdapter // from Voltha interface adapters
52 kafkaClient kafka.Client
53 kvClient kvstore.Client
54 kip kafka.InterContainerProxy
55 coreProxy adapterif.CoreProxy
56 adapterProxy adapterif.AdapterProxy
57 eventProxy adapterif.EventProxy
58 halted bool
59 exitChannel chan int
60 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
61}
62
Holger Hildebrandtfa074992020-03-27 15:42:06 +000063func newAdapter(cf *config.AdapterFlags) *adapter {
64 var a adapter
65 a.instanceID = cf.InstanceID
66 a.config = cf
67 a.halted = false
68 a.exitChannel = make(chan int, 1)
69 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
70 return &a
71}
72
73func (a *adapter) start(ctx context.Context) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000074 logger.Info("Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000075 var err error
76
77 var p *probe.Probe
78 if value := ctx.Value(probe.ProbeContextKey); value != nil {
79 if _, ok := value.(*probe.Probe); ok {
80 p = value.(*probe.Probe)
81 p.RegisterService(
82 "message-bus",
83 "kv-store",
84 "container-proxy",
85 "core-request-handler",
86 "register-with-core",
87 )
88 }
89 }
90
91 // Setup KV Client
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000092 logger.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000093 if err = a.setKVClient(); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000094 logger.Fatalw("error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000095 }
96
97 if p != nil {
98 p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
99 }
100
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000101 // Setup Log Config
102 cm := conf.NewConfigManager(a.kvClient, a.config.KVStoreType, a.config.KVStoreHost, a.config.KVStorePort, a.config.KVStoreTimeout)
103 go conf.StartLogLevelConfigProcessing(cm, ctx)
104
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000105 // Setup Kafka Client
106 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000107 logger.Fatalw("Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000108 }
109
110 if p != nil {
111 p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
112 }
113
114 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
115 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000116 logger.Fatalw("error-starting-inter-container-proxy", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000117 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
118 return err
119 }
120
121 // Create the core proxy to handle requests to the Core
122 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
123
124 // Create the adaptor proxy to handle request between olt and onu
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000125 a.adapterProxy = com.NewAdapterProxy(a.kip, "openolt", a.config.CoreTopic, cm.Backend)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000126
127 // Create the event proxy to post events to KAFKA
128 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
129
130 // Create the open ONU interface adapter
131 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
132 a.config); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000133 logger.Fatalw("error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000134 }
135
136 // Register the core request handler
137 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000138 logger.Fatalw("error-setting-core-request-handler", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000139 }
140
141 // Register this adapter to the Core - retries indefinitely
142 if err = a.registerWithCore(ctx, -1); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000143 logger.Fatalw("error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000144 }
145
146 // check the readiness and liveliness and update the probe status
147 a.checkServicesReadiness(ctx)
148 return err
149}
150
151func (a *adapter) stop(ctx context.Context) {
152 // Stop leadership tracking
153 a.halted = true
154
155 // send exit signal
156 a.exitChannel <- 0
157
158 // Cleanup - applies only if we had a kvClient
159 if a.kvClient != nil {
160 // Release all reservations
161 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000162 logger.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000163 }
164 // Close the DB connection
165 a.kvClient.Close()
166 }
167
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000168 if a.kip != nil {
169 a.kip.Stop()
170 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000171}
172
173// #############################################
174// Adapter Utility methods ##### begin #########
175
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000176func newKVClient(storeType, address string, timeout int) (kvstore.Client, error) {
177
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000178 logger.Infow("kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000179 switch storeType {
180 case "consul":
181 return kvstore.NewConsulClient(address, timeout)
182 case "etcd":
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700183 return kvstore.NewEtcdClient(address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000184 }
185 return nil, errors.New("unsupported-kv-store")
186}
187
188func newKafkaClient(clientType, host string, port int) (kafka.Client, error) {
189
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000190 logger.Infow("common-client-type", log.Fields{"client": clientType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000191 switch clientType {
192 case "sarama":
193 return kafka.NewSaramaClient(
194 kafka.Host(host),
195 kafka.Port(port),
196 kafka.ProducerReturnOnErrors(true),
197 kafka.ProducerReturnOnSuccess(true),
198 kafka.ProducerMaxRetries(6),
199 kafka.ProducerRetryBackoff(time.Millisecond*30),
200 kafka.MetadatMaxRetries(15)), nil
201 }
202
203 return nil, errors.New("unsupported-client-type")
204}
205
206func (a *adapter) setKVClient() error {
207 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
208 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
209 if err != nil {
210 a.kvClient = nil
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000211 logger.Errorw("error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000212 return err
213 }
214 a.kvClient = client
215 return nil
216}
217
218func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000219 logger.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000220 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
221 var err error
222 kip := kafka.NewInterContainerProxy(
223 kafka.InterContainerHost(a.config.KafkaAdapterHost),
224 kafka.InterContainerPort(a.config.KafkaAdapterPort),
225 kafka.MsgClient(a.kafkaClient),
226 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000227 count := 0
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000228 for {
229 if err = kip.Start(); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000230 logger.Warnw("error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
231 if retries == count {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000232 return nil, err
233 }
234 count++
235 // Take a nap before retrying
236 time.Sleep(2 * time.Second)
237 } else {
238 break
239 }
240 }
241 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000242 logger.Info("common-messaging-proxy-created")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000243 return kip, nil
244}
245
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000246func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
247 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
248 cfg *config.AdapterFlags) (*ac.OpenONUAC, error) {
249 var err error
250 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, cfg)
251
252 if err = sAcONU.Start(ctx); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000253 logger.Fatalw("error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000254 return nil, err
255 }
256
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000257 logger.Info("open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000258 return sAcONU, nil
259}
260
261func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000262 logger.Info("setting-request-handler")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000263 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
264 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000265 logger.Errorw("request-handler-setup-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000266 return err
267
268 }
269 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000270 logger.Info("request-handler-setup-done")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000271 return nil
272}
273
274func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000275 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
276 logger.Infow("registering-with-core", log.Fields{
277 "adapterID": adapterID,
278 "currentReplica": a.config.CurrentReplica,
279 "totalReplicas": a.config.TotalReplicas,
280 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700281 adapterDescription := &voltha.Adapter{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000282 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000283 Vendor: "VOLTHA OpenONUGo",
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700284 Version: version.VersionInfo.Version,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000285 // TODO once we'll be ready to support multiple versions of the adapter
286 // the Endpoint will have to change to `brcm_openomci_onu_<currentReplica`>
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700287 Endpoint: "brcm_openomci_onu",
288 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000289 CurrentReplica: int32(a.config.CurrentReplica),
290 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700291 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000292 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
293 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM"},
294 Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
295 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
296 AcceptsAddRemoveFlowUpdates: true}}
297 deviceTypes := &voltha.DeviceTypes{Items: types}
298 count := 0
299 for {
300 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000301 logger.Warnw("registering-with-core-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000302 if retries == count {
303 return err
304 }
305 count++
306 // Take a nap before retrying
307 time.Sleep(2 * time.Second)
308 } else {
309 break
310 }
311 }
312 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000313 logger.Info("registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000314 return nil
315}
316
317/**
318This function checks the liveliness and readiness of the kakfa and kv-client services
319and update the status in the probe.
320*/
321func (a *adapter) checkServicesReadiness(ctx context.Context) {
322 // checks the kafka readiness
323 go a.checkKafkaReadiness(ctx)
324
325 // checks the kv-store readiness
326 go a.checkKvStoreReadiness(ctx)
327}
328
329/**
330This function checks the liveliness and readiness of the kv-store service
331and update the status in the probe.
332*/
333func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
334 // dividing the live probe interval by 2 to get updated status every 30s
335 timeout := a.config.LiveProbeInterval / 2
336 kvStoreChannel := make(chan bool, 1)
337
338 // Default false to check the liveliness.
339 kvStoreChannel <- false
340 for {
341 timeoutTimer := time.NewTimer(timeout)
342 select {
343 case liveliness := <-kvStoreChannel:
344 if !liveliness {
345 // kv-store not reachable or down, updating the status to not ready state
346 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
347 timeout = a.config.NotLiveProbeInterval
348 } else {
349 // kv-store is reachable , updating the status to running state
350 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
351 timeout = a.config.LiveProbeInterval / 2
352 }
353 // Check if the timer has expired or not
354 if !timeoutTimer.Stop() {
355 <-timeoutTimer.C
356 }
357 case <-timeoutTimer.C:
358 // Check the status of the kv-store
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000359 logger.Info("kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000360 if a.kvClient.IsConnectionUp(ctx) {
361 kvStoreChannel <- true
362 } else {
363 kvStoreChannel <- false
364 }
365 }
366 }
367}
368
369/**
370This function checks the liveliness and readiness of the kafka service
371and update the status in the probe.
372*/
373func (a *adapter) checkKafkaReadiness(ctx context.Context) {
374 livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
375 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
376 timeout := a.config.LiveProbeInterval
377 for {
378 timeoutTimer := time.NewTimer(timeout)
379
380 select {
381 case healthiness := <-healthinessChannel:
382 if !healthiness {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000383 // logger.Fatal will call os.Exit(1) to terminate
384 logger.Fatal("Kafka service has become unhealthy")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000385 }
386 case liveliness := <-livelinessChannel:
387 if !liveliness {
388 // kafka not reachable or down, updating the status to not ready state
389 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
390 timeout = a.config.NotLiveProbeInterval
391 } else {
392 // kafka is reachable , updating the status to running state
393 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
394 timeout = a.config.LiveProbeInterval
395 }
396 // Check if the timer has expired or not
397 if !timeoutTimer.Stop() {
398 <-timeoutTimer.C
399 }
400 case <-timeoutTimer.C:
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000401 logger.Info("kafka-proxy-liveness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000402 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
403 // the liveness probe may wait (and block) writing to our channel.
404 err := a.kafkaClient.SendLiveness()
405 if err != nil {
406 // Catch possible error case if sending liveness after Sarama has been stopped.
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000407 logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000408 }
409 }
410 }
411}
412
413// Adapter Utility methods ##### end #########
414// #############################################
415
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000416func getVerifiedCodeVersion() string {
417 if version.VersionInfo.Version == "unknown-version" {
418 content, err := ioutil.ReadFile("VERSION")
419 if err == nil {
420 return (string(content))
421 } else {
422 logger.Error("'VERSION'-file not readable")
423 }
424 }
425 return version.VersionInfo.Version
426}
427
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000428func printVersion(appName string) {
429 fmt.Println(appName)
430 fmt.Println(version.VersionInfo.String(" "))
431}
432
433func printBanner() {
434 fmt.Println(" ____ ____ ___ ___ _ ")
435 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
436 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
437 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
438 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
439 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
440 fmt.Println(" | | __| |")
441 fmt.Println(" |_| |____/")
442 fmt.Println(" ")
443}
444
445func waitForExit(ctx context.Context) int {
446 signalChannel := make(chan os.Signal, 1)
447 signal.Notify(signalChannel,
448 syscall.SIGHUP,
449 syscall.SIGINT,
450 syscall.SIGTERM,
451 syscall.SIGQUIT)
452
453 exitChannel := make(chan int)
454
455 go func() {
456 select {
457 case <-ctx.Done():
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000458 logger.Infow("Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000459 exitChannel <- 2
460 case s := <-signalChannel:
461 switch s {
462 case syscall.SIGHUP,
463 syscall.SIGINT,
464 syscall.SIGTERM,
465 syscall.SIGQUIT:
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000466 logger.Infow("closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000467 exitChannel <- 0
468 default:
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000469 logger.Infow("unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000470 exitChannel <- 1
471 }
472 }
473 }()
474
475 code := <-exitChannel
476 return code
477}
478
479func main() {
480 start := time.Now()
481
482 cf := config.NewAdapterFlags()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000483 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000484 cf.ParseCommandArguments()
485
486 // Setup logging
487
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000488 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700489 if err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000490 logger.Fatalf("Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700491 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000492
493 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000494 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000495 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
496 }
497
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000498 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000499 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000500 logger.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000501 }
502
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000503 log.SetAllLogLevel(logLevel)
504
505 realMain() //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000506
507 defer log.CleanUp()
508
509 // Print version / build information and exit
510 if cf.DisplayVersionOnly {
511 printVersion(defaultAppName)
512 return
513 } else {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000514 logger.Infow("config", log.Fields{"StartName": defaultAppName})
515 logger.Infow("config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
516 logger.Infow("config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000517 }
518
519 // Print banner if specified
520 if cf.Banner {
521 printBanner()
522 }
523
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000524 logger.Infow("config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000525
526 ctx, cancel := context.WithCancel(context.Background())
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000527 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000528
529 ad := newAdapter(cf)
530
531 p := &probe.Probe{}
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000532 logger.Infow("resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000533
534 go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000535 logger.Infow("probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000536
537 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
538
539 go func() {
540 err := ad.start(probeCtx)
541 // If this operation returns an error
542 // cancel all operations using this context
543 if err != nil {
544 cancel()
545 }
546 }()
547
548 code := waitForExit(ctx)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000549 logger.Infow("received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000550
551 // Cleanup before leaving
552 ad.stop(ctx)
553
554 elapsed := time.Since(start)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000555 logger.Infow("run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
556 //logger.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000557}