blob: df7c327a3641f578afeeecc30c5c7b801d33e7c1 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
27 "strconv"
28 "syscall"
29 "time"
30
31 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
32 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
33 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000034 conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000035 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
36 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v3/pkg/log"
38 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000039 "github.com/opencord/voltha-lib-go/v3/pkg/version"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000040 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
41 "github.com/opencord/voltha-protos/v3/go/voltha"
42
Matteo Scandolo761f7512020-11-23 15:52:40 -080043 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
44 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/onuadaptercore"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000045)
46
47type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053048 //defaultAppName string
Holger Hildebrandtfa074992020-03-27 15:42:06 +000049 instanceID string
50 config *config.AdapterFlags
51 iAdapter adapters.IAdapter // from Voltha interface adapters
52 kafkaClient kafka.Client
53 kvClient kvstore.Client
54 kip kafka.InterContainerProxy
55 coreProxy adapterif.CoreProxy
56 adapterProxy adapterif.AdapterProxy
57 eventProxy adapterif.EventProxy
58 halted bool
59 exitChannel chan int
60 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
61}
62
Holger Hildebrandtfa074992020-03-27 15:42:06 +000063func newAdapter(cf *config.AdapterFlags) *adapter {
64 var a adapter
65 a.instanceID = cf.InstanceID
66 a.config = cf
67 a.halted = false
68 a.exitChannel = make(chan int, 1)
69 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
70 return &a
71}
72
73func (a *adapter) start(ctx context.Context) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000074 logger.Info("Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000075 var err error
76
77 var p *probe.Probe
78 if value := ctx.Value(probe.ProbeContextKey); value != nil {
79 if _, ok := value.(*probe.Probe); ok {
80 p = value.(*probe.Probe)
81 p.RegisterService(
82 "message-bus",
83 "kv-store",
84 "container-proxy",
85 "core-request-handler",
86 "register-with-core",
87 )
88 }
89 }
90
91 // Setup KV Client
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000092 logger.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000093 if err = a.setKVClient(); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000094 logger.Fatalw("error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000095 }
96
97 if p != nil {
98 p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
99 }
100
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000101 // Setup Log Config
mpagenkoaf801632020-07-03 10:00:42 +0000102 /* address config update acc. to [VOL-2736] */
103 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
104 cm := conf.NewConfigManager(a.kvClient, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000105 go conf.StartLogLevelConfigProcessing(cm, ctx)
106
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000107 // Setup Kafka Client
108 if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000109 logger.Fatalw("Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000110 }
111
112 if p != nil {
113 p.UpdateStatus("message-bus", probe.ServiceStatusRunning)
114 }
115
116 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
117 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000118 logger.Fatalw("error-starting-inter-container-proxy", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000119 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
120 return err
121 }
122
123 // Create the core proxy to handle requests to the Core
124 a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
125
Holger Hildebrandta768fe92020-10-01 13:06:21 +0000126 logger.Debugw("create adapter proxy", log.Fields{"OltTopic": a.config.OltTopic, "CoreTopic": a.config.CoreTopic})
127 a.adapterProxy = com.NewAdapterProxy(a.kip, a.config.OltTopic, a.config.CoreTopic, cm.Backend)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000128
129 // Create the event proxy to post events to KAFKA
130 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
131
132 // Create the open ONU interface adapter
133 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800134 a.config, cm); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000135 logger.Fatalw("error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000136 }
137
138 // Register the core request handler
139 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000140 logger.Fatalw("error-setting-core-request-handler", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000141 }
142
143 // Register this adapter to the Core - retries indefinitely
144 if err = a.registerWithCore(ctx, -1); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000145 logger.Fatalw("error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000146 }
147
148 // check the readiness and liveliness and update the probe status
149 a.checkServicesReadiness(ctx)
150 return err
151}
152
153func (a *adapter) stop(ctx context.Context) {
154 // Stop leadership tracking
155 a.halted = true
156
157 // send exit signal
158 a.exitChannel <- 0
159
160 // Cleanup - applies only if we had a kvClient
161 if a.kvClient != nil {
162 // Release all reservations
163 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000164 logger.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000165 }
166 // Close the DB connection
167 a.kvClient.Close()
168 }
169
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000170 if a.kip != nil {
171 a.kip.Stop()
172 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000173}
174
175// #############################################
176// Adapter Utility methods ##### begin #########
177
mpagenkoaf801632020-07-03 10:00:42 +0000178func newKVClient(storeType, address string, timeout time.Duration) (kvstore.Client, error) {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000179 logger.Infow("kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000180 switch storeType {
181 case "consul":
182 return kvstore.NewConsulClient(address, timeout)
183 case "etcd":
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700184 return kvstore.NewEtcdClient(address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000185 }
186 return nil, errors.New("unsupported-kv-store")
187}
188
189func newKafkaClient(clientType, host string, port int) (kafka.Client, error) {
190
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000191 logger.Infow("common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000192 /* address config update acc. to [VOL-2736] */
193 addr := host + ":" + strconv.Itoa(port)
194
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000195 switch clientType {
196 case "sarama":
197 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000198 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000199 kafka.ProducerReturnOnErrors(true),
200 kafka.ProducerReturnOnSuccess(true),
201 kafka.ProducerMaxRetries(6),
202 kafka.ProducerRetryBackoff(time.Millisecond*30),
203 kafka.MetadatMaxRetries(15)), nil
204 }
205
206 return nil, errors.New("unsupported-client-type")
207}
208
209func (a *adapter) setKVClient() error {
210 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
211 client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
212 if err != nil {
213 a.kvClient = nil
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000214 logger.Errorw("error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000215 return err
216 }
217 a.kvClient = client
218 return nil
219}
220
221func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000222 logger.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000223 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
224 var err error
mpagenkoaf801632020-07-03 10:00:42 +0000225 /* address config update acc. to [VOL-2736] */
226 addr := a.config.KafkaAdapterHost + ":" + strconv.Itoa(a.config.KafkaAdapterPort)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000227 kip := kafka.NewInterContainerProxy(
mpagenkoaf801632020-07-03 10:00:42 +0000228 kafka.InterContainerAddress(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000229 kafka.MsgClient(a.kafkaClient),
230 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000231 count := 0
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000232 for {
233 if err = kip.Start(); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000234 logger.Warnw("error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
235 if retries == count {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000236 return nil, err
237 }
238 count++
239 // Take a nap before retrying
240 time.Sleep(2 * time.Second)
241 } else {
242 break
243 }
244 }
245 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000246 logger.Info("common-messaging-proxy-created")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000247 return kip, nil
248}
249
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000250func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
251 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800252 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000253 var err error
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800254 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000255
256 if err = sAcONU.Start(ctx); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000257 logger.Fatalw("error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000258 return nil, err
259 }
260
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000261 logger.Info("open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000262 return sAcONU, nil
263}
264
265func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000266 logger.Info("setting-request-handler")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000267 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
268 if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000269 logger.Errorw("request-handler-setup-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000270 return err
271
272 }
273 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000274 logger.Info("request-handler-setup-done")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000275 return nil
276}
277
278func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000279 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
280 logger.Infow("registering-with-core", log.Fields{
281 "adapterID": adapterID,
282 "currentReplica": a.config.CurrentReplica,
283 "totalReplicas": a.config.TotalReplicas,
284 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700285 adapterDescription := &voltha.Adapter{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000286 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000287 Vendor: "VOLTHA OpenONUGo",
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700288 Version: version.VersionInfo.Version,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000289 // TODO once we'll be ready to support multiple versions of the adapter
290 // the Endpoint will have to change to `brcm_openomci_onu_<currentReplica`>
Matteo Scandoloefbec272020-11-17 10:33:09 -0800291 Endpoint: a.config.Topic,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700292 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000293 CurrentReplica: int32(a.config.CurrentReplica),
294 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700295 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000296 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Holger Hildebrandt6fd90c32020-09-28 10:47:41 +0000297 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG", "ADTN"},
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000298 Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
299 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
300 AcceptsAddRemoveFlowUpdates: true}}
301 deviceTypes := &voltha.DeviceTypes{Items: types}
302 count := 0
303 for {
304 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000305 logger.Warnw("registering-with-core-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000306 if retries == count {
307 return err
308 }
309 count++
310 // Take a nap before retrying
311 time.Sleep(2 * time.Second)
312 } else {
313 break
314 }
315 }
316 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000317 logger.Info("registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000318 return nil
319}
320
321/**
322This function checks the liveliness and readiness of the kakfa and kv-client services
323and update the status in the probe.
324*/
325func (a *adapter) checkServicesReadiness(ctx context.Context) {
326 // checks the kafka readiness
327 go a.checkKafkaReadiness(ctx)
328
329 // checks the kv-store readiness
330 go a.checkKvStoreReadiness(ctx)
331}
332
333/**
334This function checks the liveliness and readiness of the kv-store service
335and update the status in the probe.
336*/
337func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
338 // dividing the live probe interval by 2 to get updated status every 30s
339 timeout := a.config.LiveProbeInterval / 2
340 kvStoreChannel := make(chan bool, 1)
341
342 // Default false to check the liveliness.
343 kvStoreChannel <- false
344 for {
345 timeoutTimer := time.NewTimer(timeout)
346 select {
347 case liveliness := <-kvStoreChannel:
348 if !liveliness {
349 // kv-store not reachable or down, updating the status to not ready state
350 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
351 timeout = a.config.NotLiveProbeInterval
352 } else {
353 // kv-store is reachable , updating the status to running state
354 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
355 timeout = a.config.LiveProbeInterval / 2
356 }
357 // Check if the timer has expired or not
358 if !timeoutTimer.Stop() {
359 <-timeoutTimer.C
360 }
361 case <-timeoutTimer.C:
362 // Check the status of the kv-store
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000363 logger.Info("kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000364 if a.kvClient.IsConnectionUp(ctx) {
365 kvStoreChannel <- true
366 } else {
367 kvStoreChannel <- false
368 }
369 }
370 }
371}
372
373/**
374This function checks the liveliness and readiness of the kafka service
375and update the status in the probe.
376*/
377func (a *adapter) checkKafkaReadiness(ctx context.Context) {
378 livelinessChannel := a.kafkaClient.EnableLivenessChannel(true)
379 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(true)
380 timeout := a.config.LiveProbeInterval
381 for {
382 timeoutTimer := time.NewTimer(timeout)
383
384 select {
385 case healthiness := <-healthinessChannel:
386 if !healthiness {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000387 // logger.Fatal will call os.Exit(1) to terminate
388 logger.Fatal("Kafka service has become unhealthy")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000389 }
390 case liveliness := <-livelinessChannel:
391 if !liveliness {
392 // kafka not reachable or down, updating the status to not ready state
393 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
394 timeout = a.config.NotLiveProbeInterval
395 } else {
396 // kafka is reachable , updating the status to running state
397 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
398 timeout = a.config.LiveProbeInterval
399 }
400 // Check if the timer has expired or not
401 if !timeoutTimer.Stop() {
402 <-timeoutTimer.C
403 }
404 case <-timeoutTimer.C:
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000405 logger.Info("kafka-proxy-liveness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000406 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
407 // the liveness probe may wait (and block) writing to our channel.
408 err := a.kafkaClient.SendLiveness()
409 if err != nil {
410 // Catch possible error case if sending liveness after Sarama has been stopped.
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000411 logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000412 }
413 }
414 }
415}
416
417// Adapter Utility methods ##### end #########
418// #############################################
419
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000420func getVerifiedCodeVersion() string {
421 if version.VersionInfo.Version == "unknown-version" {
422 content, err := ioutil.ReadFile("VERSION")
423 if err == nil {
424 return (string(content))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000425 }
Himani Chawla4d908332020-08-31 12:30:20 +0530426 logger.Error("'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000427 }
428 return version.VersionInfo.Version
429}
430
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000431func printVersion(appName string) {
432 fmt.Println(appName)
433 fmt.Println(version.VersionInfo.String(" "))
434}
435
436func printBanner() {
437 fmt.Println(" ____ ____ ___ ___ _ ")
438 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
439 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
440 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
441 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
442 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
443 fmt.Println(" | | __| |")
444 fmt.Println(" |_| |____/")
445 fmt.Println(" ")
446}
447
448func waitForExit(ctx context.Context) int {
449 signalChannel := make(chan os.Signal, 1)
450 signal.Notify(signalChannel,
451 syscall.SIGHUP,
452 syscall.SIGINT,
453 syscall.SIGTERM,
454 syscall.SIGQUIT)
455
456 exitChannel := make(chan int)
457
458 go func() {
459 select {
460 case <-ctx.Done():
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000461 logger.Infow("Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000462 exitChannel <- 2
463 case s := <-signalChannel:
464 switch s {
465 case syscall.SIGHUP,
466 syscall.SIGINT,
467 syscall.SIGTERM,
468 syscall.SIGQUIT:
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000469 logger.Infow("closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000470 exitChannel <- 0
471 default:
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000472 logger.Infow("unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000473 exitChannel <- 1
474 }
475 }
476 }()
477
478 code := <-exitChannel
479 return code
480}
481
482func main() {
483 start := time.Now()
484
485 cf := config.NewAdapterFlags()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000486 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000487 cf.ParseCommandArguments()
488
489 // Setup logging
490
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000491 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700492 if err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000493 logger.Fatalf("Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700494 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000495
496 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000497 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000498 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
499 }
500
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000501 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000502 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000503 logger.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000504 }
505
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000506 log.SetAllLogLevel(logLevel)
507
508 realMain() //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000509
Himani Chawla4d908332020-08-31 12:30:20 +0530510 defer func() {
511 _ = log.CleanUp()
512 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000513 // Print version / build information and exit
514 if cf.DisplayVersionOnly {
515 printVersion(defaultAppName)
516 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000517 }
Himani Chawla4d908332020-08-31 12:30:20 +0530518 logger.Infow("config", log.Fields{"StartName": defaultAppName})
519 logger.Infow("config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
520 logger.Infow("config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000521
522 // Print banner if specified
523 if cf.Banner {
524 printBanner()
525 }
526
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000527 logger.Infow("config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000528
529 ctx, cancel := context.WithCancel(context.Background())
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000530 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000531
532 ad := newAdapter(cf)
533
534 p := &probe.Probe{}
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000535 logger.Infow("resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000536
537 go p.ListenAndServe(fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000538 logger.Infow("probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000539
540 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
541
542 go func() {
543 err := ad.start(probeCtx)
544 // If this operation returns an error
545 // cancel all operations using this context
546 if err != nil {
547 cancel()
548 }
549 }()
550
551 code := waitForExit(ctx)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000552 logger.Infow("received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000553
554 // Cleanup before leaving
555 ad.stop(ctx)
556
557 elapsed := time.Since(start)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000558 logger.Infow("run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
559 //logger.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000560}