blob: 506321b50c6e7bddb928ae1a23b21edf8bbffcf2 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000017//Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000024 "io/ioutil"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
27 "strconv"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010028 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000029 "syscall"
30 "time"
31
dbainbri4d3a0dc2020-12-02 00:33:42 +000032 "github.com/opencord/voltha-lib-go/v4/pkg/adapters"
33 "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
34 com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
35 conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
36 "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
Himani Chawlac07fda02020-12-09 16:21:21 +053037 "github.com/opencord/voltha-lib-go/v4/pkg/events"
38 "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
dbainbri4d3a0dc2020-12-02 00:33:42 +000039 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
40 "github.com/opencord/voltha-lib-go/v4/pkg/log"
41 "github.com/opencord/voltha-lib-go/v4/pkg/probe"
42 "github.com/opencord/voltha-lib-go/v4/pkg/version"
43 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
44 "github.com/opencord/voltha-protos/v4/go/voltha"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000045
Matteo Scandolo761f7512020-11-23 15:52:40 -080046 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
47 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/onuadaptercore"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000048)
49
50type adapter struct {
Himani Chawla4d908332020-08-31 12:30:20 +053051 //defaultAppName string
Holger Hildebrandtfa074992020-03-27 15:42:06 +000052 instanceID string
53 config *config.AdapterFlags
54 iAdapter adapters.IAdapter // from Voltha interface adapters
55 kafkaClient kafka.Client
56 kvClient kvstore.Client
57 kip kafka.InterContainerProxy
58 coreProxy adapterif.CoreProxy
59 adapterProxy adapterif.AdapterProxy
Himani Chawlac07fda02020-12-09 16:21:21 +053060 eventProxy eventif.EventProxy
Holger Hildebrandtfa074992020-03-27 15:42:06 +000061 halted bool
62 exitChannel chan int
63 receiverChannels []<-chan *ic.InterContainerMessage //from inter-container
64}
65
Holger Hildebrandtfa074992020-03-27 15:42:06 +000066func newAdapter(cf *config.AdapterFlags) *adapter {
67 var a adapter
68 a.instanceID = cf.InstanceID
69 a.config = cf
70 a.halted = false
71 a.exitChannel = make(chan int, 1)
72 a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
73 return &a
74}
75
76func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000077 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000078 var err error
79
80 var p *probe.Probe
81 if value := ctx.Value(probe.ProbeContextKey); value != nil {
82 if _, ok := value.(*probe.Probe); ok {
83 p = value.(*probe.Probe)
84 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000085 ctx,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000086 "message-bus",
87 "kv-store",
88 "container-proxy",
89 "core-request-handler",
90 "register-with-core",
91 )
92 }
93 }
94
95 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +000096 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
97 if err = a.setKVClient(ctx); err != nil {
98 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +000099 }
100
101 if p != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000102 p.UpdateStatus(ctx, "kv-store", probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000103 }
104
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000105 // Setup Log Config
mpagenkoaf801632020-07-03 10:00:42 +0000106 /* address config update acc. to [VOL-2736] */
107 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000108 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000109 go conf.StartLogLevelConfigProcessing(cm, ctx)
110
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000111 // Setup Kafka Client
dbainbri4d3a0dc2020-12-02 00:33:42 +0000112 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
113 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000114 }
115
116 if p != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000117 p.UpdateStatus(ctx, "message-bus", probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000118 }
119
120 // Start the common InterContainer Proxy - retries as per program arguments or indefinitely per default
121 if a.kip, err = a.startInterContainerProxy(ctx, a.config.KafkaReconnectRetries); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000122 logger.Fatalw(ctx, "error-starting-inter-container-proxy", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000123 //aborting the complete processing here (does notmake sense after set Retry number [else use -1 for infinite])
124 return err
125 }
126
127 // Create the core proxy to handle requests to the Core
dbainbri4d3a0dc2020-12-02 00:33:42 +0000128 a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000129
Andrea Campanella961734c2021-01-18 11:44:47 +0100130 logger.Debugw(ctx, "create adapter proxy", log.Fields{"CoreTopic": a.config.CoreTopic})
dbainbri4d3a0dc2020-12-02 00:33:42 +0000131 a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000132
133 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530134 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000135
136 // Create the open ONU interface adapter
137 if a.iAdapter, err = a.startVolthaInterfaceAdapter(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800138 a.config, cm); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000139 logger.Fatalw(ctx, "error-starting-volthaInterfaceAdapter for OpenOnt", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000140 }
141
142 // Register the core request handler
143 if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000144 logger.Fatalw(ctx, "error-setting-core-request-handler", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000145 }
146
147 // Register this adapter to the Core - retries indefinitely
148 if err = a.registerWithCore(ctx, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000149 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000150 }
151
152 // check the readiness and liveliness and update the probe status
153 a.checkServicesReadiness(ctx)
154 return err
155}
156
157func (a *adapter) stop(ctx context.Context) {
158 // Stop leadership tracking
159 a.halted = true
160
161 // send exit signal
162 a.exitChannel <- 0
163
164 // Cleanup - applies only if we had a kvClient
165 if a.kvClient != nil {
166 // Release all reservations
167 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000168 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000169 }
170 // Close the DB connection
dbainbri4d3a0dc2020-12-02 00:33:42 +0000171 a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000172 }
173
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000174 if a.kip != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000175 a.kip.Stop(ctx)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000176 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000177}
178
179// #############################################
180// Adapter Utility methods ##### begin #########
181
dbainbri4d3a0dc2020-12-02 00:33:42 +0000182func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
183 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000184 switch storeType {
185 case "consul":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000186 return kvstore.NewConsulClient(ctx, address, timeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000187 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000188 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000189 }
190 return nil, errors.New("unsupported-kv-store")
191}
192
dbainbri4d3a0dc2020-12-02 00:33:42 +0000193func newKafkaClient(ctx context.Context, clientType, host string, port int) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000194
dbainbri4d3a0dc2020-12-02 00:33:42 +0000195 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000196 /* address config update acc. to [VOL-2736] */
197 addr := host + ":" + strconv.Itoa(port)
198
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000199 switch clientType {
200 case "sarama":
201 return kafka.NewSaramaClient(
mpagenkoaf801632020-07-03 10:00:42 +0000202 kafka.Address(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000203 kafka.ProducerReturnOnErrors(true),
204 kafka.ProducerReturnOnSuccess(true),
205 kafka.ProducerMaxRetries(6),
206 kafka.ProducerRetryBackoff(time.Millisecond*30),
207 kafka.MetadatMaxRetries(15)), nil
208 }
209
210 return nil, errors.New("unsupported-client-type")
211}
212
dbainbri4d3a0dc2020-12-02 00:33:42 +0000213func (a *adapter) setKVClient(ctx context.Context) error {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000214 addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000215 client, err := newKVClient(ctx, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000216 if err != nil {
217 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000218 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000219 return err
220 }
221 a.kvClient = client
222 return nil
223}
224
225func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000226 logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000227 "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
228 var err error
mpagenkoaf801632020-07-03 10:00:42 +0000229 /* address config update acc. to [VOL-2736] */
230 addr := a.config.KafkaAdapterHost + ":" + strconv.Itoa(a.config.KafkaAdapterPort)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000231 kip := kafka.NewInterContainerProxy(
mpagenkoaf801632020-07-03 10:00:42 +0000232 kafka.InterContainerAddress(addr),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000233 kafka.MsgClient(a.kafkaClient),
234 kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000235 count := 0
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000236 for {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000237 if err = kip.Start(ctx); err != nil {
238 logger.Warnw(ctx, "error-starting-messaging-proxy", log.Fields{"error": err, "retry": retries, "count": count})
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000239 if retries == count {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000240 return nil, err
241 }
242 count++
243 // Take a nap before retrying
244 time.Sleep(2 * time.Second)
245 } else {
246 break
247 }
248 }
249 probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000250 logger.Info(ctx, "common-messaging-proxy-created")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000251 return kip, nil
252}
253
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000254func (a *adapter) startVolthaInterfaceAdapter(ctx context.Context, kip kafka.InterContainerProxy,
Himani Chawlac07fda02020-12-09 16:21:21 +0530255 cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800256 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000257 var err error
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800258 sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000259
260 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000261 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000262 return nil, err
263 }
264
dbainbri4d3a0dc2020-12-02 00:33:42 +0000265 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000266 return sAcONU, nil
267}
268
269func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000270 logger.Info(ctx, "setting-request-handler")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000271 requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000272 if err := a.kip.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
273 logger.Errorw(ctx, "request-handler-setup-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000274 return err
275
276 }
277 probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000278 logger.Info(ctx, "request-handler-setup-done")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000279 return nil
280}
281
282func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000283 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100284 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000285 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000286 "adapterID": adapterID,
287 "currentReplica": a.config.CurrentReplica,
288 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100289 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000290 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700291 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100292 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
293 Vendor: "VOLTHA OpenONUGo",
294 Version: version.VersionInfo.Version,
Matteo Scandoloefbec272020-11-17 10:33:09 -0800295 Endpoint: a.config.Topic,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700296 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000297 CurrentReplica: int32(a.config.CurrentReplica),
298 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700299 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000300 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100301 VendorIds: vendorIdsList,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000302 Adapter: "brcm_openomci_onu", // Name of the adapter that handles device type
303 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
304 AcceptsAddRemoveFlowUpdates: true}}
305 deviceTypes := &voltha.DeviceTypes{Items: types}
306 count := 0
307 for {
308 if err := a.coreProxy.RegisterAdapter(context.TODO(), adapterDescription, deviceTypes); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000309 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000310 if retries == count {
311 return err
312 }
313 count++
314 // Take a nap before retrying
315 time.Sleep(2 * time.Second)
316 } else {
317 break
318 }
319 }
320 probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000321 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000322 return nil
323}
324
325/**
326This function checks the liveliness and readiness of the kakfa and kv-client services
327and update the status in the probe.
328*/
329func (a *adapter) checkServicesReadiness(ctx context.Context) {
330 // checks the kafka readiness
331 go a.checkKafkaReadiness(ctx)
332
333 // checks the kv-store readiness
334 go a.checkKvStoreReadiness(ctx)
335}
336
337/**
338This function checks the liveliness and readiness of the kv-store service
339and update the status in the probe.
340*/
341func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
342 // dividing the live probe interval by 2 to get updated status every 30s
343 timeout := a.config.LiveProbeInterval / 2
344 kvStoreChannel := make(chan bool, 1)
345
346 // Default false to check the liveliness.
347 kvStoreChannel <- false
348 for {
349 timeoutTimer := time.NewTimer(timeout)
350 select {
351 case liveliness := <-kvStoreChannel:
352 if !liveliness {
353 // kv-store not reachable or down, updating the status to not ready state
354 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
355 timeout = a.config.NotLiveProbeInterval
356 } else {
357 // kv-store is reachable , updating the status to running state
358 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
359 timeout = a.config.LiveProbeInterval / 2
360 }
361 // Check if the timer has expired or not
362 if !timeoutTimer.Stop() {
363 <-timeoutTimer.C
364 }
365 case <-timeoutTimer.C:
366 // Check the status of the kv-store
dbainbri4d3a0dc2020-12-02 00:33:42 +0000367 logger.Info(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000368 if a.kvClient.IsConnectionUp(ctx) {
369 kvStoreChannel <- true
370 } else {
371 kvStoreChannel <- false
372 }
373 }
374 }
375}
376
377/**
378This function checks the liveliness and readiness of the kafka service
379and update the status in the probe.
380*/
381func (a *adapter) checkKafkaReadiness(ctx context.Context) {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000382 livelinessChannel := a.kafkaClient.EnableLivenessChannel(ctx, true)
383 healthinessChannel := a.kafkaClient.EnableHealthinessChannel(ctx, true)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000384 timeout := a.config.LiveProbeInterval
385 for {
386 timeoutTimer := time.NewTimer(timeout)
387
388 select {
389 case healthiness := <-healthinessChannel:
390 if !healthiness {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000391 // logger.Fatal will call os.Exit(1) to terminate
dbainbri4d3a0dc2020-12-02 00:33:42 +0000392 logger.Fatal(ctx, "Kafka service has become unhealthy")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000393 }
394 case liveliness := <-livelinessChannel:
395 if !liveliness {
396 // kafka not reachable or down, updating the status to not ready state
397 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
398 timeout = a.config.NotLiveProbeInterval
399 } else {
400 // kafka is reachable , updating the status to running state
401 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
402 timeout = a.config.LiveProbeInterval
403 }
404 // Check if the timer has expired or not
405 if !timeoutTimer.Stop() {
406 <-timeoutTimer.C
407 }
408 case <-timeoutTimer.C:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000409 logger.Info(ctx, "kafka-proxy-liveness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000410 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
411 // the liveness probe may wait (and block) writing to our channel.
dbainbri4d3a0dc2020-12-02 00:33:42 +0000412 err := a.kafkaClient.SendLiveness(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000413 if err != nil {
414 // Catch possible error case if sending liveness after Sarama has been stopped.
dbainbri4d3a0dc2020-12-02 00:33:42 +0000415 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000416 }
417 }
418 }
419}
420
421// Adapter Utility methods ##### end #########
422// #############################################
423
dbainbri4d3a0dc2020-12-02 00:33:42 +0000424func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000425 if version.VersionInfo.Version == "unknown-version" {
426 content, err := ioutil.ReadFile("VERSION")
427 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100428 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000429 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000430 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000431 }
432 return version.VersionInfo.Version
433}
434
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000435func printVersion(appName string) {
436 fmt.Println(appName)
437 fmt.Println(version.VersionInfo.String(" "))
438}
439
440func printBanner() {
441 fmt.Println(" ____ ____ ___ ___ _ ")
442 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
443 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
444 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
445 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
446 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
447 fmt.Println(" | | __| |")
448 fmt.Println(" |_| |____/")
449 fmt.Println(" ")
450}
451
452func waitForExit(ctx context.Context) int {
453 signalChannel := make(chan os.Signal, 1)
454 signal.Notify(signalChannel,
455 syscall.SIGHUP,
456 syscall.SIGINT,
457 syscall.SIGTERM,
458 syscall.SIGQUIT)
459
460 exitChannel := make(chan int)
461
462 go func() {
463 select {
464 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000465 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000466 exitChannel <- 2
467 case s := <-signalChannel:
468 switch s {
469 case syscall.SIGHUP,
470 syscall.SIGINT,
471 syscall.SIGTERM,
472 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000473 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000474 exitChannel <- 0
475 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000476 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000477 exitChannel <- 1
478 }
479 }
480 }()
481
482 code := <-exitChannel
483 return code
484}
485
486func main() {
487 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000488 ctx, cancel := context.WithCancel(context.Background())
489 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000490
491 cf := config.NewAdapterFlags()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000492 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000493 cf.ParseCommandArguments()
494
495 // Setup logging
496
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000497 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700498 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000499 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700500 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000501
502 // Setup default logger - applies for packages that do not have specific logger set
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000503 if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000504 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000505 }
506
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000507 // Update all loggers (provisioned via init) with a common field
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000508 if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000509 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000510 }
511
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000512 log.SetAllLogLevel(logLevel)
513
dbainbri4d3a0dc2020-12-02 00:33:42 +0000514 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000515
Himani Chawla4d908332020-08-31 12:30:20 +0530516 defer func() {
517 _ = log.CleanUp()
518 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000519 // Print version / build information and exit
520 if cf.DisplayVersionOnly {
521 printVersion(defaultAppName)
522 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000523 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000524 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
525 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
526 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000527
528 // Print banner if specified
529 if cf.Banner {
530 printBanner()
531 }
532
dbainbri4d3a0dc2020-12-02 00:33:42 +0000533 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000534
535 ad := newAdapter(cf)
536
537 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000538 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000539
dbainbri4d3a0dc2020-12-02 00:33:42 +0000540 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
541 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000542
543 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
544
dbainbri4d3a0dc2020-12-02 00:33:42 +0000545 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
546 if err != nil {
547 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
548 } else {
549 defer log.TerminateTracing(closer)
550 }
551
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000552 go func() {
553 err := ad.start(probeCtx)
554 // If this operation returns an error
555 // cancel all operations using this context
556 if err != nil {
557 cancel()
558 }
559 }()
560
561 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000562 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000563
564 // Cleanup before leaving
565 ad.stop(ctx)
566
567 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000568 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
569 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000570}