blob: b472c2e8c9d3ba6aba0e5b10ed49af8718607285 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
27 "strconv"
28 "syscall"
29 "time"
30
31 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
32 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
33 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000034 conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000035 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
36 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v3/pkg/log"
38 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000039 "github.com/opencord/voltha-lib-go/v3/pkg/version"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000040 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
41 "github.com/opencord/voltha-protos/v3/go/voltha"
42
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000043 "test.internal/openadapter/internal/pkg/config"
44 ac "test.internal/openadapter/internal/pkg/onuadaptercore"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000045)
46
47type adapter struct {
48 defaultAppName string
49 instanceID string
50 config *config.AdapterFlags
51 iAdapter adapters.IAdapter // from Voltha interface adapters
52 kafkaClient kafka.Client
53 kvClient kvstore.Client
54 kip kafka.InterContainerProxy
55 coreProxy adapterif.CoreProxy
56 adapterProxy adapterif.AdapterProxy
57 eventProxy adapterif.EventProxy
58 halted bool
59 exitChannel chan int
60 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
61}
62
Holger Hildebrandtfa074992020-03-27 15:42:06 +000063func newAdapter(cf *config.AdapterFlags) *adapter {
64 var a adapter
65 a.instanceID = cf.InstanceID
66 a.config = cf
67 a.halted = false
68 a.exitChannel = make(chan int, 1)
69 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
70 return &a
71}
72
73func (a *adapter) start(ctx context.Context) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000074 logger.Info("Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000075 var err error
76
77 var p *probe.Probe
78 if value := ctx.Value(probe.ProbeContextKey); value != nil {
79 if _, ok := value.(*probe.Probe); ok {
80 p = value.(*probe.Probe)
81 p.RegisterService(
82 "message-bus",
83 "kv-store",
84 "container-proxy",
85 "core-request-handler",
86 "register-with-core",
87 )
88 }
89 }
90
91 // Setup KV Client
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000092 logger.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000093 if err = a.setKVClient(); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000094 logger.Fatalw("error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000095 }
96
97 if p != nil {
98 p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
99 }
100
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000101 // Setup Log Config
mpagenkoaf801632020-07-03 10:00:42 +0000102 /* address config update acc. to [VOL-2736] */
103 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
104 cm := conf.NewConfigManager(a.kvClient, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000105 go conf.StartLogLevelConfigProcessing(cm, ctx)
106
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000107 // Setup Kafka Client
108 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000109 logger.Fatalw("Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000110 }
111
112 if p != nil {
113 p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
114 }
115
116 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
117 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000118 logger.Fatalw("error-starting-inter-container-proxy", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000119 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
120 return err
121 }
122
123 // Create the core proxy to handle requests to the Core
124 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
125
126 // Create the adaptor proxy to handle request between olt and onu
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000127 a.adapterProxy = com.NewAdapterProxy(a.kip, "openolt", a.config.CoreTopic, cm.Backend)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000128
129 // Create the event proxy to post events to KAFKA
130 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
131
132 // Create the open ONU interface adapter
133 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
134 a.config); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000135 logger.Fatalw("error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000136 }
137
138 // Register the core request handler
139 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000140 logger.Fatalw("error-setting-core-request-handler", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000141 }
142
143 // Register this adapter to the Core - retries indefinitely
144 if err = a.registerWithCore(ctx, -1); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000145 logger.Fatalw("error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000146 }
147
148 // check the readiness and liveliness and update the probe status
149 a.checkServicesReadiness(ctx)
150 return err
151}
152
153func (a *adapter) stop(ctx context.Context) {
154 // Stop leadership tracking
155 a.halted = true
156
157 // send exit signal
158 a.exitChannel <- 0
159
160 // Cleanup - applies only if we had a kvClient
161 if a.kvClient != nil {
162 // Release all reservations
163 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000164 logger.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000165 }
166 // Close the DB connection
167 a.kvClient.Close()
168 }
169
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000170 if a.kip != nil {
171 a.kip.Stop()
172 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000173}
174
175// #############################################
176// Adapter Utility methods ##### begin #########
177
mpagenkoaf801632020-07-03 10:00:42 +0000178func newKVClient(storeType, address string, timeout time.Duration) (kvstore.Client, error) {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000179 logger.Infow("kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000180 switch storeType {
181 case "consul":
182 return kvstore.NewConsulClient(address, timeout)
183 case "etcd":
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700184 return kvstore.NewEtcdClient(address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000185 }
186 return nil, errors.New("unsupported-kv-store")
187}
188
189func newKafkaClient(clientType, host string, port int) (kafka.Client, error) {
190
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000191 logger.Infow("common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000192 /* address config update acc. to [VOL-2736] */
193 addr := host + ":" + strconv.Itoa(port)
194
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000195 switch clientType {
196 case "sarama":
197 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000198 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000199 kafka.ProducerReturnOnErrors(true),
200 kafka.ProducerReturnOnSuccess(true),
201 kafka.ProducerMaxRetries(6),
202 kafka.ProducerRetryBackoff(time.Millisecond*30),
203 kafka.MetadatMaxRetries(15)), nil
204 }
205
206 return nil, errors.New("unsupported-client-type")
207}
208
209func (a *adapter) setKVClient() error {
210 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
211 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
212 if err != nil {
213 a.kvClient = nil
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000214 logger.Errorw("error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000215 return err
216 }
217 a.kvClient = client
218 return nil
219}
220
221func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000222 logger.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000223 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
224 var err error
mpagenkoaf801632020-07-03 10:00:42 +0000225 /* address config update acc. to [VOL-2736] */
226 addr := a.config.KafkaAdapterHost + ":" + strconv.Itoa(a.config.KafkaAdapterPort)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000227 kip := kafka.NewInterContainerProxy(
mpagenkoaf801632020-07-03 10:00:42 +0000228 kafka.InterContainerAddress(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000229 kafka.MsgClient(a.kafkaClient),
230 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000231 count := 0
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000232 for {
233 if err = kip.Start(); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000234 logger.Warnw("error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
235 if retries == count {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000236 return nil, err
237 }
238 count++
239 // Take a nap before retrying
240 time.Sleep(2 * time.Second)
241 } else {
242 break
243 }
244 }
245 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000246 logger.Info("common-messaging-proxy-created")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000247 return kip, nil
248}
249
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000250func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
251 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
252 cfg *config.AdapterFlags) (*ac.OpenONUAC, error) {
253 var err error
mpagenkoaf801632020-07-03 10:00:42 +0000254 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000255
256 if err = sAcONU.Start(ctx); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000257 logger.Fatalw("error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000258 return nil, err
259 }
260
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000261 logger.Info("open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000262 return sAcONU, nil
263}
264
265func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000266 logger.Info("setting-request-handler")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000267 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
268 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000269 logger.Errorw("request-handler-setup-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000270 return err
271
272 }
273 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000274 logger.Info("request-handler-setup-done")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000275 return nil
276}
277
278func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000279 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
280 logger.Infow("registering-with-core", log.Fields{
281 "adapterID": adapterID,
282 "currentReplica": a.config.CurrentReplica,
283 "totalReplicas": a.config.TotalReplicas,
284 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700285 adapterDescription := &voltha.Adapter{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000286 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000287 Vendor: "VOLTHA OpenONUGo",
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700288 Version: version.VersionInfo.Version,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000289 // TODO once we'll be ready to support multiple versions of the adapter
290 // the Endpoint will have to change to `brcm_openomci_onu_<currentReplica`>
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700291 Endpoint: "brcm_openomci_onu",
292 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000293 CurrentReplica: int32(a.config.CurrentReplica),
294 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700295 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000296 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
297 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM"},
298 Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
299 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
300 AcceptsAddRemoveFlowUpdates: true}}
301 deviceTypes := &voltha.DeviceTypes{Items: types}
302 count := 0
303 for {
304 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000305 logger.Warnw("registering-with-core-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000306 if retries == count {
307 return err
308 }
309 count++
310 // Take a nap before retrying
311 time.Sleep(2 * time.Second)
312 } else {
313 break
314 }
315 }
316 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000317 logger.Info("registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000318 return nil
319}
320
321/**
322This function checks the liveliness and readiness of the kakfa and kv-client services
323and update the status in the probe.
324*/
325func (a *adapter) checkServicesReadiness(ctx context.Context) {
326 // checks the kafka readiness
327 go a.checkKafkaReadiness(ctx)
328
329 // checks the kv-store readiness
330 go a.checkKvStoreReadiness(ctx)
331}
332
333/**
334This function checks the liveliness and readiness of the kv-store service
335and update the status in the probe.
336*/
337func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
338 // dividing the live probe interval by 2 to get updated status every 30s
339 timeout := a.config.LiveProbeInterval / 2
340 kvStoreChannel := make(chan bool, 1)
341
342 // Default false to check the liveliness.
343 kvStoreChannel <- false
344 for {
345 timeoutTimer := time.NewTimer(timeout)
346 select {
347 case liveliness := <-kvStoreChannel:
348 if !liveliness {
349 // kv-store not reachable or down, updating the status to not ready state
350 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
351 timeout = a.config.NotLiveProbeInterval
352 } else {
353 // kv-store is reachable , updating the status to running state
354 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
355 timeout = a.config.LiveProbeInterval / 2
356 }
357 // Check if the timer has expired or not
358 if !timeoutTimer.Stop() {
359 <-timeoutTimer.C
360 }
361 case <-timeoutTimer.C:
362 // Check the status of the kv-store
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000363 logger.Info("kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000364 if a.kvClient.IsConnectionUp(ctx) {
365 kvStoreChannel <- true
366 } else {
367 kvStoreChannel <- false
368 }
369 }
370 }
371}
372
373/**
374This function checks the liveliness and readiness of the kafka service
375and update the status in the probe.
376*/
377func (a *adapter) checkKafkaReadiness(ctx context.Context) {
378 livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
379 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
380 timeout := a.config.LiveProbeInterval
381 for {
382 timeoutTimer := time.NewTimer(timeout)
383
384 select {
385 case healthiness := <-healthinessChannel:
386 if !healthiness {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000387 // logger.Fatal will call os.Exit(1) to terminate
388 logger.Fatal("Kafka service has become unhealthy")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000389 }
390 case liveliness := <-livelinessChannel:
391 if !liveliness {
392 // kafka not reachable or down, updating the status to not ready state
393 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
394 timeout = a.config.NotLiveProbeInterval
395 } else {
396 // kafka is reachable , updating the status to running state
397 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
398 timeout = a.config.LiveProbeInterval
399 }
400 // Check if the timer has expired or not
401 if !timeoutTimer.Stop() {
402 <-timeoutTimer.C
403 }
404 case <-timeoutTimer.C:
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000405 logger.Info("kafka-proxy-liveness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000406 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
407 // the liveness probe may wait (and block) writing to our channel.
408 err := a.kafkaClient.SendLiveness()
409 if err != nil {
410 // Catch possible error case if sending liveness after Sarama has been stopped.
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000411 logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000412 }
413 }
414 }
415}
416
417// Adapter Utility methods ##### end #########
418// #############################################
419
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000420func getVerifiedCodeVersion() string {
421 if version.VersionInfo.Version == "unknown-version" {
422 content, err := ioutil.ReadFile("VERSION")
423 if err == nil {
424 return (string(content))
425 } else {
426 logger.Error("'VERSION'-file not readable")
427 }
428 }
429 return version.VersionInfo.Version
430}
431
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000432func printVersion(appName string) {
433 fmt.Println(appName)
434 fmt.Println(version.VersionInfo.String(" "))
435}
436
437func printBanner() {
438 fmt.Println(" ____ ____ ___ ___ _ ")
439 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
440 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
441 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
442 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
443 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
444 fmt.Println(" | | __| |")
445 fmt.Println(" |_| |____/")
446 fmt.Println(" ")
447}
448
449func waitForExit(ctx context.Context) int {
450 signalChannel := make(chan os.Signal, 1)
451 signal.Notify(signalChannel,
452 syscall.SIGHUP,
453 syscall.SIGINT,
454 syscall.SIGTERM,
455 syscall.SIGQUIT)
456
457 exitChannel := make(chan int)
458
459 go func() {
460 select {
461 case <-ctx.Done():
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000462 logger.Infow("Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000463 exitChannel <- 2
464 case s := <-signalChannel:
465 switch s {
466 case syscall.SIGHUP,
467 syscall.SIGINT,
468 syscall.SIGTERM,
469 syscall.SIGQUIT:
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000470 logger.Infow("closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000471 exitChannel <- 0
472 default:
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000473 logger.Infow("unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000474 exitChannel <- 1
475 }
476 }
477 }()
478
479 code := <-exitChannel
480 return code
481}
482
483func main() {
484 start := time.Now()
485
486 cf := config.NewAdapterFlags()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000487 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000488 cf.ParseCommandArguments()
489
490 // Setup logging
491
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000492 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700493 if err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000494 logger.Fatalf("Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700495 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000496
497 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000498 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000499 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
500 }
501
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000502 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000503 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000504 logger.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000505 }
506
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000507 log.SetAllLogLevel(logLevel)
508
509 realMain() //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000510
511 defer log.CleanUp()
512
513 // Print version / build information and exit
514 if cf.DisplayVersionOnly {
515 printVersion(defaultAppName)
516 return
517 } else {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000518 logger.Infow("config", log.Fields{"StartName": defaultAppName})
519 logger.Infow("config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
520 logger.Infow("config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000521 }
522
523 // Print banner if specified
524 if cf.Banner {
525 printBanner()
526 }
527
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000528 logger.Infow("config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000529
530 ctx, cancel := context.WithCancel(context.Background())
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000531 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000532
533 ad := newAdapter(cf)
534
535 p := &probe.Probe{}
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000536 logger.Infow("resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000537
538 go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000539 logger.Infow("probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000540
541 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
542
543 go func() {
544 err := ad.start(probeCtx)
545 // If this operation returns an error
546 // cancel all operations using this context
547 if err != nil {
548 cancel()
549 }
550 }()
551
552 code := waitForExit(ctx)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000553 logger.Infow("received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000554
555 // Cleanup before leaving
556 ad.stop(ctx)
557
558 elapsed := time.Since(start)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000559 logger.Infow("run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
560 //logger.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000561}