blob: 1b1807e5e068f6a18b7fb684785c2046dcb28ce6 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010027 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000028 "syscall"
29 "time"
30
dbainbri4d3a0dc2020-12-02 00:33:42 +000031 "github.com/opencord/voltha-lib-go/v4/pkg/adapters"
32 "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
33 com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
34 conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
35 "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
Himani Chawlac07fda02020-12-09 16:21:21 +053036 "github.com/opencord/voltha-lib-go/v4/pkg/events"
37 "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
dbainbri4d3a0dc2020-12-02 00:33:42 +000038 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
39 "github.com/opencord/voltha-lib-go/v4/pkg/log"
40 "github.com/opencord/voltha-lib-go/v4/pkg/probe"
41 "github.com/opencord/voltha-lib-go/v4/pkg/version"
42 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
43 "github.com/opencord/voltha-protos/v4/go/voltha"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000044
Matteo Scandolo761f7512020-11-23 15:52:40 -080045 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
46 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/onuadaptercore"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000047)
48
49type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053050 //defaultAppName string
Holger Hildebrandtfa074992020-03-27 15:42:06 +000051 instanceID string
52 config *config.AdapterFlags
53 iAdapter adapters.IAdapter // from Voltha interface adapters
54 kafkaClient kafka.Client
55 kvClient kvstore.Client
56 kip kafka.InterContainerProxy
57 coreProxy adapterif.CoreProxy
58 adapterProxy adapterif.AdapterProxy
Himani Chawlac07fda02020-12-09 16:21:21 +053059 eventProxy eventif.EventProxy
Holger Hildebrandtfa074992020-03-27 15:42:06 +000060 halted bool
61 exitChannel chan int
62 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
63}
64
Holger Hildebrandtfa074992020-03-27 15:42:06 +000065func newAdapter(cf *config.AdapterFlags) *adapter {
66 var a adapter
67 a.instanceID = cf.InstanceID
68 a.config = cf
69 a.halted = false
70 a.exitChannel = make(chan int, 1)
71 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
72 return &a
73}
74
75func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000076 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000077 var err error
78
79 var p *probe.Probe
80 if value := ctx.Value(probe.ProbeContextKey); value != nil {
81 if _, ok := value.(*probe.Probe); ok {
82 p = value.(*probe.Probe)
83 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000084 ctx,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000085 "message-bus",
86 "kv-store",
87 "container-proxy",
88 "core-request-handler",
89 "register-with-core",
90 )
91 }
92 }
93
94 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +000095 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
96 if err = a.setKVClient(ctx); err != nil {
97 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000098 }
99
100 if p != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000101 p.UpdateStatus(ctx, "kv-store", probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000102 }
103
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000104 // Setup Log Config
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800105 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000106 go conf.StartLogLevelConfigProcessing(cm, ctx)
107
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000108 // Setup Kafka Client
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800109 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterAddress); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000110 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000111 }
112
113 if p != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000114 p.UpdateStatus(ctx, "message-bus", probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000115 }
116
117 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
118 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000119 logger.Fatalw(ctx, "error-starting-inter-container-proxy", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000120 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
121 return err
122 }
123
124 // Create the core proxy to handle requests to the Core
dbainbri4d3a0dc2020-12-02 00:33:42 +0000125 a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000126
Andrea Campanella961734c2021-01-18 11:44:47 +0100127 logger.Debugw(ctx, "create adapter proxy", log.Fields{"CoreTopic": a.config.CoreTopic})
dbainbri4d3a0dc2020-12-02 00:33:42 +0000128 a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000129
130 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530131 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000132
133 // Create the open ONU interface adapter
134 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800135 a.config, cm); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000136 logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000137 }
138
139 // Register the core request handler
140 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000141 logger.Fatalw(ctx, "error-setting-core-request-handler", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000142 }
143
144 // Register this adapter to the Core - retries indefinitely
145 if err = a.registerWithCore(ctx, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000146 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000147 }
148
149 // check the readiness and liveliness and update the probe status
150 a.checkServicesReadiness(ctx)
151 return err
152}
153
154func (a *adapter) stop(ctx context.Context) {
155 // Stop leadership tracking
156 a.halted = true
157
158 // send exit signal
159 a.exitChannel <- 0
160
161 // Cleanup - applies only if we had a kvClient
162 if a.kvClient != nil {
163 // Release all reservations
164 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000165 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000166 }
167 // Close the DB connection
dbainbri4d3a0dc2020-12-02 00:33:42 +0000168 a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000169 }
170
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000171 if a.kip != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000172 a.kip.Stop(ctx)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000173 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000174}
175
176// #############################################
177// Adapter Utility methods ##### begin #########
178
dbainbri4d3a0dc2020-12-02 00:33:42 +0000179func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
180 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000181 switch storeType {
182 case "consul":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000183 return kvstore.NewConsulClient(ctx, address, timeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000184 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000185 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000186 }
187 return nil, errors.New("unsupported-kv-store")
188}
189
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800190func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000191
dbainbri4d3a0dc2020-12-02 00:33:42 +0000192 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000193
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000194 switch clientType {
195 case "sarama":
196 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000197 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000198 kafka.ProducerReturnOnErrors(true),
199 kafka.ProducerReturnOnSuccess(true),
200 kafka.ProducerMaxRetries(6),
201 kafka.ProducerRetryBackoff(time.Millisecond*30),
202 kafka.MetadatMaxRetries(15)), nil
203 }
204
205 return nil, errors.New("unsupported-client-type")
206}
207
dbainbri4d3a0dc2020-12-02 00:33:42 +0000208func (a *adapter) setKVClient(ctx context.Context) error {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800209 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000210 if err != nil {
211 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000212 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000213 return err
214 }
215 a.kvClient = client
216 return nil
217}
218
219func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800220 logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"addr": a.config.KafkaAdapterAddress, "topic": a.config.Topic})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000221 var err error
mpagenkoaf801632020-07-03 10:00:42 +0000222 /* address config update acc. to [VOL-2736] */
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000223 kip := kafka.NewInterContainerProxy(
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800224 kafka.InterContainerAddress(a.config.KafkaAdapterAddress),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000225 kafka.MsgClient(a.kafkaClient),
226 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000227 count := 0
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000228 for {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000229 if err = kip.Start(ctx); err != nil {
230 logger.Warnw(ctx, "error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000231 if retries == count {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000232 return nil, err
233 }
234 count++
235 // Take a nap before retrying
236 time.Sleep(2 * time.Second)
237 } else {
238 break
239 }
240 }
241 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000242 logger.Info(ctx, "common-messaging-proxy-created")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000243 return kip, nil
244}
245
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000246func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
Himani Chawlac07fda02020-12-09 16:21:21 +0530247 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800248 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000249 var err error
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800250 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000251
252 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000253 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000254 return nil, err
255 }
256
dbainbri4d3a0dc2020-12-02 00:33:42 +0000257 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000258 return sAcONU, nil
259}
260
261func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000262 logger.Info(ctx, "setting-request-handler")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000263 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000264 if err := a.kip.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
265 logger.Errorw(ctx, "request-handler-setup-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000266 return err
267
268 }
269 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000270 logger.Info(ctx, "request-handler-setup-done")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000271 return nil
272}
273
274func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000275 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100276 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000277 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000278 "adapterID": adapterID,
279 "currentReplica": a.config.CurrentReplica,
280 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100281 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000282 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700283 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100284 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
285 Vendor: "VOLTHA OpenONUGo",
286 Version: version.VersionInfo.Version,
Matteo Scandoloefbec272020-11-17 10:33:09 -0800287 Endpoint: a.config.Topic,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700288 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000289 CurrentReplica: int32(a.config.CurrentReplica),
290 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700291 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000292 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100293 VendorIds: vendorIdsList,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000294 Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
295 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
296 AcceptsAddRemoveFlowUpdates: true}}
297 deviceTypes := &voltha.DeviceTypes{Items: types}
298 count := 0
299 for {
300 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000301 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000302 if retries == count {
303 return err
304 }
305 count++
306 // Take a nap before retrying
307 time.Sleep(2 * time.Second)
308 } else {
309 break
310 }
311 }
312 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000313 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000314 return nil
315}
316
317/**
318This function checks the liveliness and readiness of the kakfa and kv-client services
319and update the status in the probe.
320*/
321func (a *adapter) checkServicesReadiness(ctx context.Context) {
322 // checks the kafka readiness
323 go a.checkKafkaReadiness(ctx)
324
325 // checks the kv-store readiness
326 go a.checkKvStoreReadiness(ctx)
327}
328
329/**
330This function checks the liveliness and readiness of the kv-store service
331and update the status in the probe.
332*/
333func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
334 // dividing the live probe interval by 2 to get updated status every 30s
335 timeout := a.config.LiveProbeInterval / 2
336 kvStoreChannel := make(chan bool, 1)
337
338 // Default false to check the liveliness.
339 kvStoreChannel <- false
340 for {
341 timeoutTimer := time.NewTimer(timeout)
342 select {
343 case liveliness := <-kvStoreChannel:
344 if !liveliness {
345 // kv-store not reachable or down, updating the status to not ready state
346 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
347 timeout = a.config.NotLiveProbeInterval
348 } else {
349 // kv-store is reachable , updating the status to running state
350 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
351 timeout = a.config.LiveProbeInterval / 2
352 }
353 // Check if the timer has expired or not
354 if !timeoutTimer.Stop() {
355 <-timeoutTimer.C
356 }
357 case <-timeoutTimer.C:
358 // Check the status of the kv-store
dbainbri4d3a0dc2020-12-02 00:33:42 +0000359 logger.Info(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000360 if a.kvClient.IsConnectionUp(ctx) {
361 kvStoreChannel <- true
362 } else {
363 kvStoreChannel <- false
364 }
365 }
366 }
367}
368
369/**
370This function checks the liveliness and readiness of the kafka service
371and update the status in the probe.
372*/
373func (a *adapter) checkKafkaReadiness(ctx context.Context) {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000374 livelinessChannel := a.kafkaClient.EnableLivenessChannel(ctx, true)
375 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(ctx, true)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000376 timeout := a.config.LiveProbeInterval
377 for {
378 timeoutTimer := time.NewTimer(timeout)
379
380 select {
381 case healthiness := <-healthinessChannel:
382 if !healthiness {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000383 // logger.Fatal will call os.Exit(1) to terminate
dbainbri4d3a0dc2020-12-02 00:33:42 +0000384 logger.Fatal(ctx, "Kafka service has become unhealthy")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000385 }
386 case liveliness := <-livelinessChannel:
387 if !liveliness {
388 // kafka not reachable or down, updating the status to not ready state
389 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
390 timeout = a.config.NotLiveProbeInterval
391 } else {
392 // kafka is reachable , updating the status to running state
393 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
394 timeout = a.config.LiveProbeInterval
395 }
396 // Check if the timer has expired or not
397 if !timeoutTimer.Stop() {
398 <-timeoutTimer.C
399 }
400 case <-timeoutTimer.C:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000401 logger.Info(ctx, "kafka-proxy-liveness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000402 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
403 // the liveness probe may wait (and block) writing to our channel.
dbainbri4d3a0dc2020-12-02 00:33:42 +0000404 err := a.kafkaClient.SendLiveness(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000405 if err != nil {
406 // Catch possible error case if sending liveness after Sarama has been stopped.
dbainbri4d3a0dc2020-12-02 00:33:42 +0000407 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000408 }
409 }
410 }
411}
412
413// Adapter Utility methods ##### end #########
414// #############################################
415
dbainbri4d3a0dc2020-12-02 00:33:42 +0000416func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000417 if version.VersionInfo.Version == "unknown-version" {
418 content, err := ioutil.ReadFile("VERSION")
419 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100420 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000421 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000422 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000423 }
424 return version.VersionInfo.Version
425}
426
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000427func printVersion(appName string) {
428 fmt.Println(appName)
429 fmt.Println(version.VersionInfo.String(" "))
430}
431
432func printBanner() {
433 fmt.Println(" ____ ____ ___ ___ _ ")
434 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
435 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
436 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
437 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
438 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
439 fmt.Println(" | | __| |")
440 fmt.Println(" |_| |____/")
441 fmt.Println(" ")
442}
443
444func waitForExit(ctx context.Context) int {
445 signalChannel := make(chan os.Signal, 1)
446 signal.Notify(signalChannel,
447 syscall.SIGHUP,
448 syscall.SIGINT,
449 syscall.SIGTERM,
450 syscall.SIGQUIT)
451
452 exitChannel := make(chan int)
453
454 go func() {
455 select {
456 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000457 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000458 exitChannel <- 2
459 case s := <-signalChannel:
460 switch s {
461 case syscall.SIGHUP,
462 syscall.SIGINT,
463 syscall.SIGTERM,
464 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000465 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000466 exitChannel <- 0
467 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000468 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000469 exitChannel <- 1
470 }
471 }
472 }()
473
474 code := <-exitChannel
475 return code
476}
477
478func main() {
479 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000480 ctx, cancel := context.WithCancel(context.Background())
481 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000482
483 cf := config.NewAdapterFlags()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000484 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000485 cf.ParseCommandArguments()
486
487 // Setup logging
488
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000489 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700490 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000491 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700492 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000493
494 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000495 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000496 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000497 }
498
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000499 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000500 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000501 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000502 }
503
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000504 log.SetAllLogLevel(logLevel)
505
dbainbri4d3a0dc2020-12-02 00:33:42 +0000506 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000507
Himani Chawla4d908332020-08-31 12:30:20 +0530508 defer func() {
509 _ = log.CleanUp()
510 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000511 // Print version / build information and exit
512 if cf.DisplayVersionOnly {
513 printVersion(defaultAppName)
514 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000515 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000516 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
517 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
518 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000519
520 // Print banner if specified
521 if cf.Banner {
522 printBanner()
523 }
524
dbainbri4d3a0dc2020-12-02 00:33:42 +0000525 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000526
527 ad := newAdapter(cf)
528
529 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000530 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000531
dbainbri4d3a0dc2020-12-02 00:33:42 +0000532 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
533 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000534
535 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
536
dbainbri4d3a0dc2020-12-02 00:33:42 +0000537 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
538 if err != nil {
539 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
540 } else {
541 defer log.TerminateTracing(closer)
542 }
543
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000544 go func() {
545 err := ad.start(probeCtx)
546 // If this operation returns an error
547 // cancel all operations using this context
548 if err != nil {
549 cancel()
550 }
551 }()
552
553 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000554 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000555
556 // Cleanup before leaving
557 ad.stop(ctx)
558
559 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000560 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
561 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000562}