blob: b7867bb84034a6153dd3947d30661a92c26060d8 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050020from __future__ import absolute_import
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050021import argparse
22import os
23import time
Matt Jeanneret08a8e862019-12-20 14:02:32 -050024import types
Rohan Agrawalac066a02020-03-09 12:33:58 +000025import sys
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050026
27import arrow
28import yaml
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050029import socketserver
Matt Jeanneret08a8e862019-12-20 14:02:32 -050030import configparser
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000031
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050032from simplejson import dumps
Neha Sharma61446d32020-03-23 14:32:31 +000033from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050034from twisted.internet.task import LoopingCall
35from zope.interface import implementer
36
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000037from pyvoltha.common.structlog_setup import setup_logging, update_logging, string_to_int
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050038from pyvoltha.common.utils.asleep import asleep
39from pyvoltha.common.utils.deferred_utils import TimeOutError
40from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
41from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050042 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050043from pyvoltha.common.utils.registry import registry, IComponent
44from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
45from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
46from pyvoltha.adapters.kafka.core_proxy import CoreProxy
47from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050048 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050049from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050050from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050051
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050052from brcm_openomci_onu_adapter import BrcmOpenomciOnuAdapter
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000053from probe import Probe
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000054from pyvoltha.adapters.log_controller import LogController, KV_STORE_DATA_PATH_PREFIX
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000055
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050056defs = dict(
Matt Jeanneret08a8e862019-12-20 14:02:32 -050057 build_info_file='./BUILDINFO',
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050058 config=os.environ.get('CONFIG', './openonu.yml'),
59 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
60 '0-9]+)\..*$'),
61 consul=os.environ.get('CONSUL', 'localhost:8500'),
62 name=os.environ.get('NAME', 'openonu'),
63 vendor=os.environ.get('VENDOR', 'Voltha Project'),
64 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
65 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
66 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
67 etcd=os.environ.get('ETCD', 'localhost:2379'),
68 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
Devmalya Paulffc89df2019-07-31 17:43:13 -040069 event_topic=os.environ.get('EVENT_TOPIC', 'voltha.events'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050070 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
71 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
72 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
73 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
74 backend=os.environ.get('BACKEND', 'none'),
75 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
76 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
Rohan Agrawalac066a02020-03-09 12:33:58 +000077 probe=os.environ.get('PROBE', ':8080'),
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000078 log_level=os.environ.get('LOG_LEVEL', 'WARN'),
79 component_name=os.environ.get('COMPONENT_NAME', "adapter-open-onu")
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050080)
81
82
83def parse_args():
84 parser = argparse.ArgumentParser()
85
86 _help = ('Path to openonu.yml config file (default: %s). '
87 'If relative, it is relative to main.py of openonu adapter.'
88 % defs['config'])
89 parser.add_argument('-c', '--config',
90 dest='config',
91 action='store',
92 default=defs['config'],
93 help=_help)
94
95 _help = 'Regular expression for extracting conatiner number from ' \
96 'container name (default: %s)' % defs['container_name_regex']
97 parser.add_argument('-X', '--container-number-extractor',
98 dest='container_name_regex',
99 action='store',
100 default=defs['container_name_regex'],
101 help=_help)
102
103 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
104 parser.add_argument('-C', '--consul',
105 dest='consul',
106 action='store',
107 default=defs['consul'],
108 help=_help)
109
110 _help = 'name of this adapter (default: %s)' % defs['name']
111 parser.add_argument('-na', '--name',
112 dest='name',
113 action='store',
114 default=defs['name'],
115 help=_help)
116
117 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
118 parser.add_argument('-ven', '--vendor',
119 dest='vendor',
120 action='store',
121 default=defs['vendor'],
122 help=_help)
123
124 _help = 'supported device type of this adapter (default: %s)' % defs[
125 'device_type']
126 parser.add_argument('-dt', '--device_type',
127 dest='device_type',
128 action='store',
129 default=defs['device_type'],
130 help=_help)
131
132 _help = 'specifies whether the device type accepts bulk flow updates ' \
133 'adapter (default: %s)' % defs['accept_bulk_flow']
134 parser.add_argument('-abf', '--accept_bulk_flow',
135 dest='accept_bulk_flow',
136 action='store',
137 default=defs['accept_bulk_flow'],
138 help=_help)
139
140 _help = 'specifies whether the device type accepts add/remove flow ' \
141 '(default: %s)' % defs['accept_atomic_flow']
142 parser.add_argument('-aaf', '--accept_atomic_flow',
143 dest='accept_atomic_flow',
144 action='store',
145 default=defs['accept_atomic_flow'],
146 help=_help)
147
148 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
149 parser.add_argument('-e', '--etcd',
150 dest='etcd',
151 action='store',
152 default=defs['etcd'],
153 help=_help)
154
155 _help = ('unique string id of this container instance (default: %s)'
156 % defs['instance_id'])
157 parser.add_argument('-i', '--instance-id',
158 dest='instance_id',
159 action='store',
160 default=defs['instance_id'],
161 help=_help)
162
163 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
164 parser.add_argument('-I', '--interface',
165 dest='interface',
166 action='store',
167 default=defs['interface'],
168 help=_help)
169
170 _help = 'omit startup banner log lines'
171 parser.add_argument('-n', '--no-banner',
172 dest='no_banner',
173 action='store_true',
174 default=False,
175 help=_help)
176
177 _help = 'do not emit periodic heartbeat log messages'
178 parser.add_argument('-N', '--no-heartbeat',
179 dest='no_heartbeat',
180 action='store_true',
181 default=False,
182 help=_help)
183
Rohan Agrawalac066a02020-03-09 12:33:58 +0000184 _help = 'enable logging'
185 parser.add_argument('-l', '--log_level',
186 dest='log_level',
187 action='store',
188 default=defs['log_level'],
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500189 help=_help)
190
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000191 _help = 'get the component name'
192 parser.add_argument('-cn', '--component_name',
193 dest='env',
194 action='store',
195 help=_help)
196
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500197 _help = ('use docker container name as conatiner instance id'
198 ' (overrides -i/--instance-id option)')
199 parser.add_argument('--instance-id-is-container-name',
200 dest='instance_id_is_container_name',
201 action='store_true',
202 default=False,
203 help=_help)
204
205 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
206 'If not '
207 'specified (None), the address from the config file is used'
208 % defs['kafka_adapter'])
209 parser.add_argument('-KA', '--kafka_adapter',
210 dest='kafka_adapter',
211 action='store',
212 default=defs['kafka_adapter'],
213 help=_help)
214
215 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
216 'If not '
217 'specified (None), the address from the config file is used'
218 % defs['kafka_cluster'])
219 parser.add_argument('-KC', '--kafka_cluster',
220 dest='kafka_cluster',
221 action='store',
222 default=defs['kafka_cluster'],
223 help=_help)
224
225 _help = 'backend to use for config persitence'
226 parser.add_argument('-b', '--backend',
227 default=defs['backend'],
228 choices=['none', 'consul', 'etcd'],
229 help=_help)
230
231 _help = 'topic of core on the kafka bus'
232 parser.add_argument('-ct', '--core_topic',
233 dest='core_topic',
234 action='store',
235 default=defs['core_topic'],
236 help=_help)
237
Devmalya Paulffc89df2019-07-31 17:43:13 -0400238 _help = 'topic of events on the kafka bus'
239 parser.add_argument('-et', '--event_topic',
240 dest='event_topic',
241 action='store',
242 default=defs['event_topic'],
243 help=_help)
244
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000245 _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
246 parser.add_argument(
247 '-P', '--probe', dest='probe', action='store',
248 default=defs['probe'],
249 help=_help)
250
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500251 args = parser.parse_args()
252
253 # post-processing
254
255 if args.instance_id_is_container_name:
256 args.instance_id = get_my_containers_name()
257
258 return args
259
260
261def load_config(args):
262 path = args.config
263 if path.startswith('.'):
264 dir = os.path.dirname(os.path.abspath(__file__))
265 path = os.path.join(dir, path)
266 path = os.path.abspath(path)
267 with open(path) as fd:
268 config = yaml.load(fd)
269 return config
270
271
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500272def get_build_info():
273 path = defs['build_info_file']
274 if not path.startswith('/'):
275 dir = os.path.dirname(os.path.abspath(__file__))
276 path = os.path.join(dir, path)
277 path = os.path.abspath(path)
278 build_info = configparser.ConfigParser()
279 build_info.read(path)
280 results = types.SimpleNamespace(
281 version=build_info.get('buildinfo', 'version', fallback='unknown'),
282 vcs_ref=build_info.get('buildinfo', 'vcs_ref', fallback='unknown'),
283 vcs_dirty=build_info.get('buildinfo', 'vcs_dirty', fallback='unknown'),
284 build_time=build_info.get('buildinfo', 'build_time', fallback='unknown')
285 )
286 return results
287
288
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500289def print_banner(log):
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500290 log.info(' ___________ _____ _ _ _____ _ _ _ _ ')
291 log.info(' | _ | ___ \ ___| \ | | _ | \ | | | | | ')
292 log.info(' | | | | |_/ / |__ | \| | | | | \| | | | | ')
293 log.info(' | | | | __/| __|| . ` | | | | . ` | | | | ')
294 log.info(' \ \_/ / | | |___| |\ \ \_/ / |\ | |_| | ')
295 log.info(' \___/\_| \____/\_| \_/\___/\_| \_/\___/ ')
296 log.info(' ')
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500297
298
299@implementer(IComponent)
300class Main(object):
301
302 def __init__(self):
303
304 self.args = args = parse_args()
305 self.config = load_config(args)
306
Matteo Scandolod8d73172019-11-26 12:15:15 -0700307 # log levels in python are:
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000308 # 1 - DEBUG => verbosity_adjust = 10
309 # 2 - INFO => verbosity_adjust = 20
310 # 3 - WARNING => verbosity_adjust = 30
311 # 4 - ERROR => verbosity_adjust = 40
312 # 5 - CRITICAL => verbosity_adjust = 50
Matteo Scandolod8d73172019-11-26 12:15:15 -0700313
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000314 verbosity_adjust = string_to_int(str(args.log_level))
315
316 if verbosity_adjust == 0:
317 raise ValueError("Invalid loglevel is given: " + str(args.log_level))
318 sys.exit(1)
Rohan Agrawalac066a02020-03-09 12:33:58 +0000319
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500320 self.log = setup_logging(self.config.get('logging', {}),
321 args.instance_id,
322 verbosity_adjust=verbosity_adjust)
323 self.log.info('container-number-extractor',
324 regex=args.container_name_regex)
325
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500326 self.build_info = get_build_info()
327 self.log.info('OpenONU-Adapter-Version', build_version=self.build_info)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500328
329 if not args.no_banner:
330 print_banner(self.log)
331
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000332 self.etcd_host = str(args.etcd).split(':')[0]
333 self.etcd_port = str(args.etcd).split(':')[1]
334
335 self.controller = LogController(self.etcd_host, self.etcd_port)
336
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500337 self.adapter = None
338 # Create a unique instance id using the passed-in instance id and
339 # UTC timestamp
340 current_time = arrow.utcnow().timestamp
341 self.instance_id = self.args.instance_id + '_' + str(current_time)
342
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500343 self.core_topic = str(args.core_topic)
344 self.event_topic = str(args.event_topic)
345 self.listening_topic = str(args.name)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500346 self.startup_components()
347
348 if not args.no_heartbeat:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500349 self.start_kafka_cluster_heartbeat(self.instance_id)
350
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500351 def start(self):
352 self.start_reactor() # will not return except Keyboard interrupt
353
354 def stop(self):
355 pass
356
357 def get_args(self):
358 """Allow access to command line args"""
359 return self.args
360
361 def get_config(self):
362 """Allow access to content of config file"""
363 return self.config
364
365 def _get_adapter_config(self):
366 cfg = AdapterConfig()
367 return cfg
368
369 @inlineCallbacks
370 def startup_components(self):
371 try:
372 self.log.info('starting-internal-components',
373 consul=self.args.consul,
374 etcd=self.args.etcd)
375
376 registry.register('main', self)
377
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500378 yield registry.register(
379 'kafka_cluster_proxy',
380 KafkaProxy(
381 self.args.consul,
382 self.args.kafka_cluster,
383 config=self.config.get('kafka-cluster-proxy', {})
384 )
385 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000386 Probe.kafka_cluster_proxy_running = True
Neha Sharma61446d32020-03-23 14:32:31 +0000387 Probe.kafka_proxy_faulty = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500388
389 config = self._get_adapter_config()
390
391 self.core_proxy = CoreProxy(
392 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500393 default_core_topic=self.core_topic,
Devmalya Paulffc89df2019-07-31 17:43:13 -0400394 default_event_topic=self.event_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500395 my_listening_topic=self.listening_topic)
396
397 self.adapter_proxy = AdapterProxy(
398 kafka_proxy=None,
399 core_topic=self.core_topic,
400 my_listening_topic=self.listening_topic)
401
402 self.adapter = BrcmOpenomciOnuAdapter(
403 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500404 config=config,
405 build_info=self.build_info)
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500406
Matt Jeannereta32441c2019-03-07 05:16:37 -0500407 self.adapter.start()
408
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500409 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
410 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500411
412 yield registry.register(
413 'kafka_adapter_proxy',
414 IKafkaMessagingProxy(
415 kafka_host_port=self.args.kafka_adapter,
416 # TODO: Add KV Store object reference
417 kv_store=self.args.backend,
418 default_topic=self.args.name,
419 group_id_prefix=self.args.instance_id,
420 target_cls=openonu_request_handler
421 )
422 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000423 Probe.kafka_adapter_proxy_running = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500424
425 self.core_proxy.kafka_proxy = get_messaging_proxy()
426 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
427
428 # retry for ever
429 res = yield self._register_with_core(-1)
Neha Sharma61446d32020-03-23 14:32:31 +0000430 Probe.adapter_registered_with_core = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500431
432 self.log.info('started-internal-services')
433
434 except Exception as e:
435 self.log.exception('Failure-to-start-all-components', e=e)
436
437 @inlineCallbacks
438 def shutdown_components(self):
439 """Execute before the reactor is shut down"""
440 self.log.info('exiting-on-keyboard-interrupt')
441 for component in reversed(registry.iterate()):
442 yield component.stop()
443
Neha Sharma61446d32020-03-23 14:32:31 +0000444 self.server.shutdown()
445
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500446 import threading
447 self.log.info('THREADS:')
448 main_thread = threading.current_thread()
449 for t in threading.enumerate():
450 if t is main_thread:
451 continue
452 if not t.isDaemon():
453 continue
454 self.log.info('joining thread {} {}'.format(
455 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
456 t.join()
457
458 def start_reactor(self):
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000459 from twisted.internet import reactor, defer
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500460 reactor.callWhenRunning(
461 lambda: self.log.info('twisted-reactor-started'))
462 reactor.addSystemEventTrigger('before', 'shutdown',
463 self.shutdown_components)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000464 reactor.callInThread(self.start_probe)
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000465 defer.maybeDeferred(self.controller.start_watch_log_config_change, self.args.instance_id, str(self.args.log_level))
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500466 reactor.run()
467
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000468 def start_probe(self):
469 args = self.args
470 host = args.probe.split(':')[0]
471 port = args.probe.split(':')[1]
Neha Sharma61446d32020-03-23 14:32:31 +0000472 socketserver.TCPServer.allow_reuse_address = True
473 self.server = socketserver.TCPServer((host, int(port)), Probe)
474 self.server.serve_forever()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000475
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500476 @inlineCallbacks
477 def _register_with_core(self, retries):
478 while 1:
479 try:
480 resp = yield self.core_proxy.register(
481 self.adapter.adapter_descriptor(),
482 self.adapter.device_types())
483 if resp:
484 self.log.info('registered-with-core',
485 coreId=resp.instance_id)
486
487 returnValue(resp)
488 except TimeOutError as e:
489 self.log.warn("timeout-when-registering-with-core", e=e)
490 if retries == 0:
491 self.log.exception("no-more-retries", e=e)
492 raise
493 else:
494 retries = retries if retries < 0 else retries - 1
495 yield asleep(defs['retry_interval'])
496 except Exception as e:
497 self.log.exception("failed-registration", e=e)
498 raise
499
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500500 # Temporary function to send a heartbeat message to the external kafka
501 # broker
502 def start_kafka_cluster_heartbeat(self, instance_id):
503 # For heartbeat we will send a message to a specific "voltha-heartbeat"
504 # topic. The message is a protocol buf
505 # message
506 message = dict(
507 type='heartbeat',
508 adapter=self.args.name,
509 instance=instance_id,
510 ip=get_my_primary_local_ipv4()
511 )
512 topic = defs['heartbeat_topic']
513
Neha Sharma61446d32020-03-23 14:32:31 +0000514 def send_heartbeat_msg():
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500515 try:
516 kafka_cluster_proxy = get_kafka_proxy()
Neha Sharma61446d32020-03-23 14:32:31 +0000517 if kafka_cluster_proxy:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500518 message['ts'] = arrow.utcnow().timestamp
Neha Sharma61446d32020-03-23 14:32:31 +0000519 self.log.debug('sending-kafka-heartbeat-message')
520
521 # Creating a handler to receive the message callbacks
522 df = Deferred()
523 df.addCallback(self.process_kafka_alive_state_update)
524 kafka_cluster_proxy.register_alive_state_update(df)
525 kafka_cluster_proxy.send_heartbeat_message(topic, dumps(message))
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500526 else:
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000527 Probe.kafka_cluster_proxy_running = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500528 self.log.error('kafka-proxy-unavailable')
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500529 except Exception as e:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500530 self.log.exception('failed-sending-message-heartbeat', e=e)
531
Neha Sharma61446d32020-03-23 14:32:31 +0000532 def check_heartbeat_delivery():
533 try:
534 kafka_cluster_proxy = get_kafka_proxy()
535 if kafka_cluster_proxy:
536 kafka_cluster_proxy.check_heartbeat_delivery()
537 except Exception as e:
538 self.log.exception('failed-checking-heartbeat-delivery', e=e)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500539
Neha Sharma61446d32020-03-23 14:32:31 +0000540 def schedule_periodic_heartbeat():
541 try:
542 # Sending the heartbeat message in a loop
543 lc_heartbeat = LoopingCall(send_heartbeat_msg)
544 lc_heartbeat.start(10)
545 # Polling the delivery status more frequently to get early notification
546 lc_poll = LoopingCall(check_heartbeat_delivery)
547 lc_poll.start(2)
548 except Exception as e:
549 self.log.exception('failed-kafka-heartbeat-startup', e=e)
550
551 from twisted.internet import reactor
552 # Delaying heartbeat initially to let kafka connection be established
553 reactor.callLater(5, schedule_periodic_heartbeat)
554
555 # Receiving the callback and updating the probe accordingly
556 def process_kafka_alive_state_update(self, alive_state):
557 self.log.debug('process-kafka-alive-state-update', alive_state=alive_state)
558 Probe.kafka_cluster_proxy_running = alive_state
559
560 kafka_cluster_proxy = get_kafka_proxy()
561 if kafka_cluster_proxy:
562 Probe.kafka_proxy_faulty = kafka_cluster_proxy.is_faulty()
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500563
564if __name__ == '__main__':
565 Main().start()