blob: 80995a464afc9c2599af9dfd77b39e0f09c516be [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
27 "strconv"
28 "syscall"
29 "time"
30
dbainbri4d3a0dc2020-12-02 00:33:42 +000031 "github.com/opencord/voltha-lib-go/v4/pkg/adapters"
32 "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
33 com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
34 conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
35 "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
36 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
37 "github.com/opencord/voltha-lib-go/v4/pkg/log"
38 "github.com/opencord/voltha-lib-go/v4/pkg/probe"
39 "github.com/opencord/voltha-lib-go/v4/pkg/version"
40 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
41 "github.com/opencord/voltha-protos/v4/go/voltha"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000042
Matteo Scandolo761f7512020-11-23 15:52:40 -080043 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
44 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/onuadaptercore"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000045)
46
47type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053048 //defaultAppName string
Holger Hildebrandtfa074992020-03-27 15:42:06 +000049 instanceID string
50 config *config.AdapterFlags
51 iAdapter adapters.IAdapter // from Voltha interface adapters
52 kafkaClient kafka.Client
53 kvClient kvstore.Client
54 kip kafka.InterContainerProxy
55 coreProxy adapterif.CoreProxy
56 adapterProxy adapterif.AdapterProxy
57 eventProxy adapterif.EventProxy
58 halted bool
59 exitChannel chan int
60 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
61}
62
Holger Hildebrandtfa074992020-03-27 15:42:06 +000063func newAdapter(cf *config.AdapterFlags) *adapter {
64 var a adapter
65 a.instanceID = cf.InstanceID
66 a.config = cf
67 a.halted = false
68 a.exitChannel = make(chan int, 1)
69 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
70 return &a
71}
72
73func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000074 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000075 var err error
76
77 var p *probe.Probe
78 if value := ctx.Value(probe.ProbeContextKey); value != nil {
79 if _, ok := value.(*probe.Probe); ok {
80 p = value.(*probe.Probe)
81 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000082 ctx,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000083 "message-bus",
84 "kv-store",
85 "container-proxy",
86 "core-request-handler",
87 "register-with-core",
88 )
89 }
90 }
91
92 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +000093 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
94 if err = a.setKVClient(ctx); err != nil {
95 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000096 }
97
98 if p != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +000099 p.UpdateStatus(ctx, "kv-store", probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000100 }
101
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000102 // Setup Log Config
mpagenkoaf801632020-07-03 10:00:42 +0000103 /* address config update acc. to [VOL-2736] */
104 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000105 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000106 go conf.StartLogLevelConfigProcessing(cm, ctx)
107
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000108 // Setup Kafka Client
dbainbri4d3a0dc2020-12-02 00:33:42 +0000109 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
110 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000111 }
112
113 if p != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000114 p.UpdateStatus(ctx, "message-bus", probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000115 }
116
117 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
118 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000119 logger.Fatalw(ctx, "error-starting-inter-container-proxy", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000120 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
121 return err
122 }
123
124 // Create the core proxy to handle requests to the Core
dbainbri4d3a0dc2020-12-02 00:33:42 +0000125 a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000126
dbainbri4d3a0dc2020-12-02 00:33:42 +0000127 logger.Debugw(ctx, "create adapter proxy", log.Fields{"OltTopic": a.config.OltTopic, "CoreTopic": a.config.CoreTopic})
128 a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000129
130 // Create the event proxy to post events to KAFKA
131 a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
132
133 // Create the open ONU interface adapter
134 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800135 a.config, cm); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000136 logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000137 }
138
139 // Register the core request handler
140 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000141 logger.Fatalw(ctx, "error-setting-core-request-handler", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000142 }
143
144 // Register this adapter to the Core - retries indefinitely
145 if err = a.registerWithCore(ctx, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000146 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000147 }
148
149 // check the readiness and liveliness and update the probe status
150 a.checkServicesReadiness(ctx)
151 return err
152}
153
154func (a *adapter) stop(ctx context.Context) {
155 // Stop leadership tracking
156 a.halted = true
157
158 // send exit signal
159 a.exitChannel <- 0
160
161 // Cleanup - applies only if we had a kvClient
162 if a.kvClient != nil {
163 // Release all reservations
164 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000165 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000166 }
167 // Close the DB connection
dbainbri4d3a0dc2020-12-02 00:33:42 +0000168 a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000169 }
170
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000171 if a.kip != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000172 a.kip.Stop(ctx)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000173 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000174}
175
176// #############################################
177// Adapter Utility methods ##### begin #########
178
dbainbri4d3a0dc2020-12-02 00:33:42 +0000179func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
180 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000181 switch storeType {
182 case "consul":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000183 return kvstore.NewConsulClient(ctx, address, timeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000184 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000185 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000186 }
187 return nil, errors.New("unsupported-kv-store")
188}
189
dbainbri4d3a0dc2020-12-02 00:33:42 +0000190func newKafkaClient(ctx context.Context, clientType, host string, port int) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000191
dbainbri4d3a0dc2020-12-02 00:33:42 +0000192 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000193 /* address config update acc. to [VOL-2736] */
194 addr := host + ":" + strconv.Itoa(port)
195
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000196 switch clientType {
197 case "sarama":
198 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000199 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000200 kafka.ProducerReturnOnErrors(true),
201 kafka.ProducerReturnOnSuccess(true),
202 kafka.ProducerMaxRetries(6),
203 kafka.ProducerRetryBackoff(time.Millisecond*30),
204 kafka.MetadatMaxRetries(15)), nil
205 }
206
207 return nil, errors.New("unsupported-client-type")
208}
209
dbainbri4d3a0dc2020-12-02 00:33:42 +0000210func (a *adapter) setKVClient(ctx context.Context) error {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000211 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000212 client, err := newKVClient(ctx, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000213 if err != nil {
214 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000215 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000216 return err
217 }
218 a.kvClient = client
219 return nil
220}
221
222func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000223 logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000224 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
225 var err error
mpagenkoaf801632020-07-03 10:00:42 +0000226 /* address config update acc. to [VOL-2736] */
227 addr := a.config.KafkaAdapterHost + ":" + strconv.Itoa(a.config.KafkaAdapterPort)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000228 kip := kafka.NewInterContainerProxy(
mpagenkoaf801632020-07-03 10:00:42 +0000229 kafka.InterContainerAddress(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000230 kafka.MsgClient(a.kafkaClient),
231 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000232 count := 0
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000233 for {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000234 if err = kip.Start(ctx); err != nil {
235 logger.Warnw(ctx, "error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000236 if retries == count {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000237 return nil, err
238 }
239 count++
240 // Take a nap before retrying
241 time.Sleep(2 * time.Second)
242 } else {
243 break
244 }
245 }
246 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000247 logger.Info(ctx, "common-messaging-proxy-created")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000248 return kip, nil
249}
250
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000251func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
252 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800253 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000254 var err error
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800255 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000256
257 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000258 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000259 return nil, err
260 }
261
dbainbri4d3a0dc2020-12-02 00:33:42 +0000262 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000263 return sAcONU, nil
264}
265
266func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000267 logger.Info(ctx, "setting-request-handler")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000268 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000269 if err := a.kip.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
270 logger.Errorw(ctx, "request-handler-setup-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000271 return err
272
273 }
274 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000275 logger.Info(ctx, "request-handler-setup-done")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000276 return nil
277}
278
279func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000280 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000281 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000282 "adapterID": adapterID,
283 "currentReplica": a.config.CurrentReplica,
284 "totalReplicas": a.config.TotalReplicas,
285 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700286 adapterDescription := &voltha.Adapter{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000287 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000288 Vendor: "VOLTHA OpenONUGo",
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700289 Version: version.VersionInfo.Version,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000290 // TODO once we'll be ready to support multiple versions of the adapter
291 // the Endpoint will have to change to `brcm_openomci_onu_<currentReplica`>
Matteo Scandoloefbec272020-11-17 10:33:09 -0800292 Endpoint: a.config.Topic,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700293 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000294 CurrentReplica: int32(a.config.CurrentReplica),
295 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700296 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000297 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanellac82ad742020-12-11 16:02:36 +0100298 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM",
299 "ARPX", "DACM", "ERSN", "HWTC", "CIGG", "ADTN", "ARCA", "AVMG"},
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000300 Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
301 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
302 AcceptsAddRemoveFlowUpdates: true}}
303 deviceTypes := &voltha.DeviceTypes{Items: types}
304 count := 0
305 for {
306 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000307 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000308 if retries == count {
309 return err
310 }
311 count++
312 // Take a nap before retrying
313 time.Sleep(2 * time.Second)
314 } else {
315 break
316 }
317 }
318 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000319 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000320 return nil
321}
322
323/**
324This function checks the liveliness and readiness of the kakfa and kv-client services
325and update the status in the probe.
326*/
327func (a *adapter) checkServicesReadiness(ctx context.Context) {
328 // checks the kafka readiness
329 go a.checkKafkaReadiness(ctx)
330
331 // checks the kv-store readiness
332 go a.checkKvStoreReadiness(ctx)
333}
334
335/**
336This function checks the liveliness and readiness of the kv-store service
337and update the status in the probe.
338*/
339func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
340 // dividing the live probe interval by 2 to get updated status every 30s
341 timeout := a.config.LiveProbeInterval / 2
342 kvStoreChannel := make(chan bool, 1)
343
344 // Default false to check the liveliness.
345 kvStoreChannel <- false
346 for {
347 timeoutTimer := time.NewTimer(timeout)
348 select {
349 case liveliness := <-kvStoreChannel:
350 if !liveliness {
351 // kv-store not reachable or down, updating the status to not ready state
352 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
353 timeout = a.config.NotLiveProbeInterval
354 } else {
355 // kv-store is reachable , updating the status to running state
356 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
357 timeout = a.config.LiveProbeInterval / 2
358 }
359 // Check if the timer has expired or not
360 if !timeoutTimer.Stop() {
361 <-timeoutTimer.C
362 }
363 case <-timeoutTimer.C:
364 // Check the status of the kv-store
dbainbri4d3a0dc2020-12-02 00:33:42 +0000365 logger.Info(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000366 if a.kvClient.IsConnectionUp(ctx) {
367 kvStoreChannel <- true
368 } else {
369 kvStoreChannel <- false
370 }
371 }
372 }
373}
374
375/**
376This function checks the liveliness and readiness of the kafka service
377and update the status in the probe.
378*/
379func (a *adapter) checkKafkaReadiness(ctx context.Context) {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000380 livelinessChannel := a.kafkaClient.EnableLivenessChannel(ctx, true)
381 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(ctx, true)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000382 timeout := a.config.LiveProbeInterval
383 for {
384 timeoutTimer := time.NewTimer(timeout)
385
386 select {
387 case healthiness := <-healthinessChannel:
388 if !healthiness {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000389 // logger.Fatal will call os.Exit(1) to terminate
dbainbri4d3a0dc2020-12-02 00:33:42 +0000390 logger.Fatal(ctx, "Kafka service has become unhealthy")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000391 }
392 case liveliness := <-livelinessChannel:
393 if !liveliness {
394 // kafka not reachable or down, updating the status to not ready state
395 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
396 timeout = a.config.NotLiveProbeInterval
397 } else {
398 // kafka is reachable , updating the status to running state
399 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
400 timeout = a.config.LiveProbeInterval
401 }
402 // Check if the timer has expired or not
403 if !timeoutTimer.Stop() {
404 <-timeoutTimer.C
405 }
406 case <-timeoutTimer.C:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000407 logger.Info(ctx, "kafka-proxy-liveness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000408 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
409 // the liveness probe may wait (and block) writing to our channel.
dbainbri4d3a0dc2020-12-02 00:33:42 +0000410 err := a.kafkaClient.SendLiveness(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000411 if err != nil {
412 // Catch possible error case if sending liveness after Sarama has been stopped.
dbainbri4d3a0dc2020-12-02 00:33:42 +0000413 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000414 }
415 }
416 }
417}
418
419// Adapter Utility methods ##### end #########
420// #############################################
421
dbainbri4d3a0dc2020-12-02 00:33:42 +0000422func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000423 if version.VersionInfo.Version == "unknown-version" {
424 content, err := ioutil.ReadFile("VERSION")
425 if err == nil {
426 return (string(content))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000427 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000428 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000429 }
430 return version.VersionInfo.Version
431}
432
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000433func printVersion(appName string) {
434 fmt.Println(appName)
435 fmt.Println(version.VersionInfo.String(" "))
436}
437
438func printBanner() {
439 fmt.Println(" ____ ____ ___ ___ _ ")
440 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
441 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
442 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
443 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
444 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
445 fmt.Println(" | | __| |")
446 fmt.Println(" |_| |____/")
447 fmt.Println(" ")
448}
449
450func waitForExit(ctx context.Context) int {
451 signalChannel := make(chan os.Signal, 1)
452 signal.Notify(signalChannel,
453 syscall.SIGHUP,
454 syscall.SIGINT,
455 syscall.SIGTERM,
456 syscall.SIGQUIT)
457
458 exitChannel := make(chan int)
459
460 go func() {
461 select {
462 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000463 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000464 exitChannel <- 2
465 case s := <-signalChannel:
466 switch s {
467 case syscall.SIGHUP,
468 syscall.SIGINT,
469 syscall.SIGTERM,
470 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000471 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000472 exitChannel <- 0
473 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000474 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000475 exitChannel <- 1
476 }
477 }
478 }()
479
480 code := <-exitChannel
481 return code
482}
483
484func main() {
485 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000486 ctx, cancel := context.WithCancel(context.Background())
487 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000488
489 cf := config.NewAdapterFlags()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000490 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000491 cf.ParseCommandArguments()
492
493 // Setup logging
494
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000495 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700496 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000497 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700498 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000499
500 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000501 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000502 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000503 }
504
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000505 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000506 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000507 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000508 }
509
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000510 log.SetAllLogLevel(logLevel)
511
dbainbri4d3a0dc2020-12-02 00:33:42 +0000512 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000513
Himani Chawla4d908332020-08-31 12:30:20 +0530514 defer func() {
515 _ = log.CleanUp()
516 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000517 // Print version / build information and exit
518 if cf.DisplayVersionOnly {
519 printVersion(defaultAppName)
520 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000521 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000522 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
523 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
524 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000525
526 // Print banner if specified
527 if cf.Banner {
528 printBanner()
529 }
530
dbainbri4d3a0dc2020-12-02 00:33:42 +0000531 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000532
533 ad := newAdapter(cf)
534
535 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000536 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000537
dbainbri4d3a0dc2020-12-02 00:33:42 +0000538 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
539 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000540
541 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
542
dbainbri4d3a0dc2020-12-02 00:33:42 +0000543 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
544 if err != nil {
545 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
546 } else {
547 defer log.TerminateTracing(closer)
548 }
549
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000550 go func() {
551 err := ad.start(probeCtx)
552 // If this operation returns an error
553 // cancel all operations using this context
554 if err != nil {
555 cancel()
556 }
557 }()
558
559 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000560 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000561
562 // Cleanup before leaving
563 ad.stop(ctx)
564
565 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000566 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
567 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000568}