blob: e82c155c262497e7b0e69ef0627557ff64bedcfc [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
27 "strconv"
28 "syscall"
29 "time"
30
dbainbri4d3a0dc2020-12-02 00:33:42 +000031 "github.com/opencord/voltha-lib-go/v4/pkg/adapters"
32 "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
33 com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
34 conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
35 "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
36 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v4/pkg/log"
38 "github.com/opencord/voltha-lib-go/v4/pkg/probe"
39 "github.com/opencord/voltha-lib-go/v4/pkg/version"
40 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
41 "github.com/opencord/voltha-protos/v4/go/voltha"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000042
Matteo Scandolo761f7512020-11-23 15:52:40 -080043 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
44 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/onuadaptercore"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000045)
46
47type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053048 //defaultAppName string
Holger Hildebrandtfa074992020-03-27 15:42:06 +000049 instanceID string
50 config *config.AdapterFlags
51 iAdapter adapters.IAdapter // from Voltha interface adapters
52 kafkaClient kafka.Client
53 kvClient kvstore.Client
54 kip kafka.InterContainerProxy
55 coreProxy adapterif.CoreProxy
56 adapterProxy adapterif.AdapterProxy
57 eventProxy adapterif.EventProxy
58 halted bool
59 exitChannel chan int
60 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
61}
62
Holger Hildebrandtfa074992020-03-27 15:42:06 +000063func newAdapter(cf *config.AdapterFlags) *adapter {
64 var a adapter
65 a.instanceID = cf.InstanceID
66 a.config = cf
67 a.halted = false
68 a.exitChannel = make(chan int, 1)
69 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
70 return &a
71}
72
73func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000074 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000075 var err error
76
77 var p *probe.Probe
78 if value := ctx.Value(probe.ProbeContextKey); value != nil {
79 if _, ok := value.(*probe.Probe); ok {
80 p = value.(*probe.Probe)
81 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000082 ctx,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000083 "message-bus",
84 "kv-store",
85 "container-proxy",
86 "core-request-handler",
87 "register-with-core",
88 )
89 }
90 }
91
92 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +000093 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
94 if err = a.setKVClient(ctx); err != nil {
95 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000096 }
97
98 if p != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +000099 p.UpdateStatus(ctx, "kv-store", probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000100 }
101
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000102 // Setup Log Config
mpagenkoaf801632020-07-03 10:00:42 +0000103 /* address config update acc. to [VOL-2736] */
104 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000105 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000106 go conf.StartLogLevelConfigProcessing(cm, ctx)
107
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000108 // Setup Kafka Client
dbainbri4d3a0dc2020-12-02 00:33:42 +0000109 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
110 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000111 }
112
113 if p != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000114 p.UpdateStatus(ctx, "message-bus", probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000115 }
116
117 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
118 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000119 logger.Fatalw(ctx, "error-starting-inter-container-proxy", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000120 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
121 return err
122 }
123
124 // Create the core proxy to handle requests to the Core
dbainbri4d3a0dc2020-12-02 00:33:42 +0000125 a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000126
dbainbri4d3a0dc2020-12-02 00:33:42 +0000127 logger.Debugw(ctx, "create adapter proxy", log.Fields{"OltTopic": a.config.OltTopic, "CoreTopic": a.config.CoreTopic})
128 a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000129
130 // Create the event proxy to post events to KAFKA
131 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
132
133 // Create the open ONU interface adapter
134 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800135 a.config, cm); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000136 logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000137 }
138
139 // Register the core request handler
140 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000141 logger.Fatalw(ctx, "error-setting-core-request-handler", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000142 }
143
144 // Register this adapter to the Core - retries indefinitely
145 if err = a.registerWithCore(ctx, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000146 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000147 }
148
149 // check the readiness and liveliness and update the probe status
150 a.checkServicesReadiness(ctx)
151 return err
152}
153
154func (a *adapter) stop(ctx context.Context) {
155 // Stop leadership tracking
156 a.halted = true
157
158 // send exit signal
159 a.exitChannel <- 0
160
161 // Cleanup - applies only if we had a kvClient
162 if a.kvClient != nil {
163 // Release all reservations
164 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000165 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000166 }
167 // Close the DB connection
dbainbri4d3a0dc2020-12-02 00:33:42 +0000168 a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000169 }
170
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000171 if a.kip != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000172 a.kip.Stop(ctx)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000173 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000174}
175
176// #############################################
177// Adapter Utility methods ##### begin #########
178
dbainbri4d3a0dc2020-12-02 00:33:42 +0000179func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
180 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000181 switch storeType {
182 case "consul":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000183 return kvstore.NewConsulClient(ctx, address, timeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000184 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000185 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000186 }
187 return nil, errors.New("unsupported-kv-store")
188}
189
dbainbri4d3a0dc2020-12-02 00:33:42 +0000190func newKafkaClient(ctx context.Context, clientType, host string, port int) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000191
dbainbri4d3a0dc2020-12-02 00:33:42 +0000192 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000193 /* address config update acc. to [VOL-2736] */
194 addr := host + ":" + strconv.Itoa(port)
195
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000196 switch clientType {
197 case "sarama":
198 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000199 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000200 kafka.ProducerReturnOnErrors(true),
201 kafka.ProducerReturnOnSuccess(true),
202 kafka.ProducerMaxRetries(6),
203 kafka.ProducerRetryBackoff(time.Millisecond*30),
204 kafka.MetadatMaxRetries(15)), nil
205 }
206
207 return nil, errors.New("unsupported-client-type")
208}
209
dbainbri4d3a0dc2020-12-02 00:33:42 +0000210func (a *adapter) setKVClient(ctx context.Context) error {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000211 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000212 client, err := newKVClient(ctx, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000213 if err != nil {
214 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000215 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000216 return err
217 }
218 a.kvClient = client
219 return nil
220}
221
222func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000223 logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000224 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
225 var err error
mpagenkoaf801632020-07-03 10:00:42 +0000226 /* address config update acc. to [VOL-2736] */
227 addr := a.config.KafkaAdapterHost + ":" + strconv.Itoa(a.config.KafkaAdapterPort)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000228 kip := kafka.NewInterContainerProxy(
mpagenkoaf801632020-07-03 10:00:42 +0000229 kafka.InterContainerAddress(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000230 kafka.MsgClient(a.kafkaClient),
231 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000232 count := 0
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000233 for {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000234 if err = kip.Start(ctx); err != nil {
235 logger.Warnw(ctx, "error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000236 if retries == count {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000237 return nil, err
238 }
239 count++
240 // Take a nap before retrying
241 time.Sleep(2 * time.Second)
242 } else {
243 break
244 }
245 }
246 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000247 logger.Info(ctx, "common-messaging-proxy-created")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000248 return kip, nil
249}
250
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000251func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
252 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800253 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000254 var err error
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800255 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000256
257 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000258 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000259 return nil, err
260 }
261
dbainbri4d3a0dc2020-12-02 00:33:42 +0000262 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000263 return sAcONU, nil
264}
265
266func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000267 logger.Info(ctx, "setting-request-handler")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000268 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000269 if err := a.kip.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
270 logger.Errorw(ctx, "request-handler-setup-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000271 return err
272
273 }
274 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000275 logger.Info(ctx, "request-handler-setup-done")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000276 return nil
277}
278
279func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000280 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000281 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000282 "adapterID": adapterID,
283 "currentReplica": a.config.CurrentReplica,
284 "totalReplicas": a.config.TotalReplicas,
285 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700286 adapterDescription := &voltha.Adapter{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000287 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000288 Vendor: "VOLTHA OpenONUGo",
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700289 Version: version.VersionInfo.Version,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000290 // TODO once we'll be ready to support multiple versions of the adapter
291 // the Endpoint will have to change to `brcm_openomci_onu_<currentReplica`>
Matteo Scandoloefbec272020-11-17 10:33:09 -0800292 Endpoint: a.config.Topic,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700293 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000294 CurrentReplica: int32(a.config.CurrentReplica),
295 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700296 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000297 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Holger Hildebrandt6fd90c32020-09-28 10:47:41 +0000298 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG", "ADTN"},
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000299 Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
300 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
301 AcceptsAddRemoveFlowUpdates: true}}
302 deviceTypes := &voltha.DeviceTypes{Items: types}
303 count := 0
304 for {
305 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000306 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000307 if retries == count {
308 return err
309 }
310 count++
311 // Take a nap before retrying
312 time.Sleep(2 * time.Second)
313 } else {
314 break
315 }
316 }
317 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000318 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000319 return nil
320}
321
322/**
323This function checks the liveliness and readiness of the kakfa and kv-client services
324and update the status in the probe.
325*/
326func (a *adapter) checkServicesReadiness(ctx context.Context) {
327 // checks the kafka readiness
328 go a.checkKafkaReadiness(ctx)
329
330 // checks the kv-store readiness
331 go a.checkKvStoreReadiness(ctx)
332}
333
334/**
335This function checks the liveliness and readiness of the kv-store service
336and update the status in the probe.
337*/
338func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
339 // dividing the live probe interval by 2 to get updated status every 30s
340 timeout := a.config.LiveProbeInterval / 2
341 kvStoreChannel := make(chan bool, 1)
342
343 // Default false to check the liveliness.
344 kvStoreChannel <- false
345 for {
346 timeoutTimer := time.NewTimer(timeout)
347 select {
348 case liveliness := <-kvStoreChannel:
349 if !liveliness {
350 // kv-store not reachable or down, updating the status to not ready state
351 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
352 timeout = a.config.NotLiveProbeInterval
353 } else {
354 // kv-store is reachable , updating the status to running state
355 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
356 timeout = a.config.LiveProbeInterval / 2
357 }
358 // Check if the timer has expired or not
359 if !timeoutTimer.Stop() {
360 <-timeoutTimer.C
361 }
362 case <-timeoutTimer.C:
363 // Check the status of the kv-store
dbainbri4d3a0dc2020-12-02 00:33:42 +0000364 logger.Info(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000365 if a.kvClient.IsConnectionUp(ctx) {
366 kvStoreChannel <- true
367 } else {
368 kvStoreChannel <- false
369 }
370 }
371 }
372}
373
374/**
375This function checks the liveliness and readiness of the kafka service
376and update the status in the probe.
377*/
378func (a *adapter) checkKafkaReadiness(ctx context.Context) {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000379 livelinessChannel := a.kafkaClient.EnableLivenessChannel(ctx, true)
380 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(ctx, true)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000381 timeout := a.config.LiveProbeInterval
382 for {
383 timeoutTimer := time.NewTimer(timeout)
384
385 select {
386 case healthiness := <-healthinessChannel:
387 if !healthiness {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000388 // logger.Fatal will call os.Exit(1) to terminate
dbainbri4d3a0dc2020-12-02 00:33:42 +0000389 logger.Fatal(ctx, "Kafka service has become unhealthy")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000390 }
391 case liveliness := <-livelinessChannel:
392 if !liveliness {
393 // kafka not reachable or down, updating the status to not ready state
394 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
395 timeout = a.config.NotLiveProbeInterval
396 } else {
397 // kafka is reachable , updating the status to running state
398 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
399 timeout = a.config.LiveProbeInterval
400 }
401 // Check if the timer has expired or not
402 if !timeoutTimer.Stop() {
403 <-timeoutTimer.C
404 }
405 case <-timeoutTimer.C:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000406 logger.Info(ctx, "kafka-proxy-liveness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000407 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
408 // the liveness probe may wait (and block) writing to our channel.
dbainbri4d3a0dc2020-12-02 00:33:42 +0000409 err := a.kafkaClient.SendLiveness(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000410 if err != nil {
411 // Catch possible error case if sending liveness after Sarama has been stopped.
dbainbri4d3a0dc2020-12-02 00:33:42 +0000412 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000413 }
414 }
415 }
416}
417
418// Adapter Utility methods ##### end #########
419// #############################################
420
dbainbri4d3a0dc2020-12-02 00:33:42 +0000421func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000422 if version.VersionInfo.Version == "unknown-version" {
423 content, err := ioutil.ReadFile("VERSION")
424 if err == nil {
425 return (string(content))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000426 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000427 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000428 }
429 return version.VersionInfo.Version
430}
431
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000432func printVersion(appName string) {
433 fmt.Println(appName)
434 fmt.Println(version.VersionInfo.String(" "))
435}
436
437func printBanner() {
438 fmt.Println(" ____ ____ ___ ___ _ ")
439 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
440 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
441 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
442 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
443 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
444 fmt.Println(" | | __| |")
445 fmt.Println(" |_| |____/")
446 fmt.Println(" ")
447}
448
449func waitForExit(ctx context.Context) int {
450 signalChannel := make(chan os.Signal, 1)
451 signal.Notify(signalChannel,
452 syscall.SIGHUP,
453 syscall.SIGINT,
454 syscall.SIGTERM,
455 syscall.SIGQUIT)
456
457 exitChannel := make(chan int)
458
459 go func() {
460 select {
461 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000462 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000463 exitChannel <- 2
464 case s := <-signalChannel:
465 switch s {
466 case syscall.SIGHUP,
467 syscall.SIGINT,
468 syscall.SIGTERM,
469 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000470 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000471 exitChannel <- 0
472 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000473 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000474 exitChannel <- 1
475 }
476 }
477 }()
478
479 code := <-exitChannel
480 return code
481}
482
483func main() {
484 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000485 ctx, cancel := context.WithCancel(context.Background())
486 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000487
488 cf := config.NewAdapterFlags()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000489 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000490 cf.ParseCommandArguments()
491
492 // Setup logging
493
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000494 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700495 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000496 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700497 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000498
499 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000500 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000501 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000502 }
503
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000504 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000505 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000506 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000507 }
508
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000509 log.SetAllLogLevel(logLevel)
510
dbainbri4d3a0dc2020-12-02 00:33:42 +0000511 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000512
Himani Chawla4d908332020-08-31 12:30:20 +0530513 defer func() {
514 _ = log.CleanUp()
515 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000516 // Print version / build information and exit
517 if cf.DisplayVersionOnly {
518 printVersion(defaultAppName)
519 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000520 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000521 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
522 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
523 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000524
525 // Print banner if specified
526 if cf.Banner {
527 printBanner()
528 }
529
dbainbri4d3a0dc2020-12-02 00:33:42 +0000530 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000531
532 ad := newAdapter(cf)
533
534 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000535 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000536
dbainbri4d3a0dc2020-12-02 00:33:42 +0000537 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
538 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000539
540 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
541
dbainbri4d3a0dc2020-12-02 00:33:42 +0000542 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
543 if err != nil {
544 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
545 } else {
546 defer log.TerminateTracing(closer)
547 }
548
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000549 go func() {
550 err := ad.start(probeCtx)
551 // If this operation returns an error
552 // cancel all operations using this context
553 if err != nil {
554 cancel()
555 }
556 }()
557
558 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000559 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000560
561 // Cleanup before leaving
562 ad.stop(ctx)
563
564 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000565 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
566 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000567}