blob: 5a2383228c86bc5de3918cdfcb0e2c3dbc981e25 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050020from __future__ import absolute_import
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050021import argparse
22import os
23import time
Matt Jeanneret08a8e862019-12-20 14:02:32 -050024import types
Rohan Agrawalac066a02020-03-09 12:33:58 +000025import sys
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050026
27import arrow
28import yaml
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050029import socketserver
Matt Jeanneret08a8e862019-12-20 14:02:32 -050030import configparser
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000031
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050032from simplejson import dumps
Neha Sharma61446d32020-03-23 14:32:31 +000033from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050034from twisted.internet.task import LoopingCall
35from zope.interface import implementer
36
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050037from pyvoltha.common.structlog_setup import setup_logging, update_logging
38from pyvoltha.common.utils.asleep import asleep
39from pyvoltha.common.utils.deferred_utils import TimeOutError
40from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
41from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050042 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050043from pyvoltha.common.utils.registry import registry, IComponent
44from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
45from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
46from pyvoltha.adapters.kafka.core_proxy import CoreProxy
47from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050048 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050049from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050050from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050051
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050052from brcm_openomci_onu_adapter import BrcmOpenomciOnuAdapter
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000053from probe import Probe
54
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050055defs = dict(
Matt Jeanneret08a8e862019-12-20 14:02:32 -050056 build_info_file='./BUILDINFO',
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050057 config=os.environ.get('CONFIG', './openonu.yml'),
58 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
59 '0-9]+)\..*$'),
60 consul=os.environ.get('CONSUL', 'localhost:8500'),
61 name=os.environ.get('NAME', 'openonu'),
62 vendor=os.environ.get('VENDOR', 'Voltha Project'),
63 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
64 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
65 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
66 etcd=os.environ.get('ETCD', 'localhost:2379'),
67 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
Devmalya Paulffc89df2019-07-31 17:43:13 -040068 event_topic=os.environ.get('EVENT_TOPIC', 'voltha.events'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050069 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
70 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
71 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
72 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
73 backend=os.environ.get('BACKEND', 'none'),
74 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
75 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
Rohan Agrawalac066a02020-03-09 12:33:58 +000076 probe=os.environ.get('PROBE', ':8080'),
77 log_level=os.environ.get('LOG_LEVEL', 'WARN')
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050078)
79
80
81def parse_args():
82 parser = argparse.ArgumentParser()
83
84 _help = ('Path to openonu.yml config file (default: %s). '
85 'If relative, it is relative to main.py of openonu adapter.'
86 % defs['config'])
87 parser.add_argument('-c', '--config',
88 dest='config',
89 action='store',
90 default=defs['config'],
91 help=_help)
92
93 _help = 'Regular expression for extracting conatiner number from ' \
94 'container name (default: %s)' % defs['container_name_regex']
95 parser.add_argument('-X', '--container-number-extractor',
96 dest='container_name_regex',
97 action='store',
98 default=defs['container_name_regex'],
99 help=_help)
100
101 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
102 parser.add_argument('-C', '--consul',
103 dest='consul',
104 action='store',
105 default=defs['consul'],
106 help=_help)
107
108 _help = 'name of this adapter (default: %s)' % defs['name']
109 parser.add_argument('-na', '--name',
110 dest='name',
111 action='store',
112 default=defs['name'],
113 help=_help)
114
115 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
116 parser.add_argument('-ven', '--vendor',
117 dest='vendor',
118 action='store',
119 default=defs['vendor'],
120 help=_help)
121
122 _help = 'supported device type of this adapter (default: %s)' % defs[
123 'device_type']
124 parser.add_argument('-dt', '--device_type',
125 dest='device_type',
126 action='store',
127 default=defs['device_type'],
128 help=_help)
129
130 _help = 'specifies whether the device type accepts bulk flow updates ' \
131 'adapter (default: %s)' % defs['accept_bulk_flow']
132 parser.add_argument('-abf', '--accept_bulk_flow',
133 dest='accept_bulk_flow',
134 action='store',
135 default=defs['accept_bulk_flow'],
136 help=_help)
137
138 _help = 'specifies whether the device type accepts add/remove flow ' \
139 '(default: %s)' % defs['accept_atomic_flow']
140 parser.add_argument('-aaf', '--accept_atomic_flow',
141 dest='accept_atomic_flow',
142 action='store',
143 default=defs['accept_atomic_flow'],
144 help=_help)
145
146 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
147 parser.add_argument('-e', '--etcd',
148 dest='etcd',
149 action='store',
150 default=defs['etcd'],
151 help=_help)
152
153 _help = ('unique string id of this container instance (default: %s)'
154 % defs['instance_id'])
155 parser.add_argument('-i', '--instance-id',
156 dest='instance_id',
157 action='store',
158 default=defs['instance_id'],
159 help=_help)
160
161 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
162 parser.add_argument('-I', '--interface',
163 dest='interface',
164 action='store',
165 default=defs['interface'],
166 help=_help)
167
168 _help = 'omit startup banner log lines'
169 parser.add_argument('-n', '--no-banner',
170 dest='no_banner',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = 'do not emit periodic heartbeat log messages'
176 parser.add_argument('-N', '--no-heartbeat',
177 dest='no_heartbeat',
178 action='store_true',
179 default=False,
180 help=_help)
181
Rohan Agrawalac066a02020-03-09 12:33:58 +0000182 _help = 'enable logging'
183 parser.add_argument('-l', '--log_level',
184 dest='log_level',
185 action='store',
186 default=defs['log_level'],
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500187 help=_help)
188
189 _help = ('use docker container name as conatiner instance id'
190 ' (overrides -i/--instance-id option)')
191 parser.add_argument('--instance-id-is-container-name',
192 dest='instance_id_is_container_name',
193 action='store_true',
194 default=False,
195 help=_help)
196
197 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
198 'If not '
199 'specified (None), the address from the config file is used'
200 % defs['kafka_adapter'])
201 parser.add_argument('-KA', '--kafka_adapter',
202 dest='kafka_adapter',
203 action='store',
204 default=defs['kafka_adapter'],
205 help=_help)
206
207 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
208 'If not '
209 'specified (None), the address from the config file is used'
210 % defs['kafka_cluster'])
211 parser.add_argument('-KC', '--kafka_cluster',
212 dest='kafka_cluster',
213 action='store',
214 default=defs['kafka_cluster'],
215 help=_help)
216
217 _help = 'backend to use for config persitence'
218 parser.add_argument('-b', '--backend',
219 default=defs['backend'],
220 choices=['none', 'consul', 'etcd'],
221 help=_help)
222
223 _help = 'topic of core on the kafka bus'
224 parser.add_argument('-ct', '--core_topic',
225 dest='core_topic',
226 action='store',
227 default=defs['core_topic'],
228 help=_help)
229
Devmalya Paulffc89df2019-07-31 17:43:13 -0400230 _help = 'topic of events on the kafka bus'
231 parser.add_argument('-et', '--event_topic',
232 dest='event_topic',
233 action='store',
234 default=defs['event_topic'],
235 help=_help)
236
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000237 _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
238 parser.add_argument(
239 '-P', '--probe', dest='probe', action='store',
240 default=defs['probe'],
241 help=_help)
242
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500243 args = parser.parse_args()
244
245 # post-processing
246
247 if args.instance_id_is_container_name:
248 args.instance_id = get_my_containers_name()
249
250 return args
251
252
253def load_config(args):
254 path = args.config
255 if path.startswith('.'):
256 dir = os.path.dirname(os.path.abspath(__file__))
257 path = os.path.join(dir, path)
258 path = os.path.abspath(path)
259 with open(path) as fd:
260 config = yaml.load(fd)
261 return config
262
263
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500264def get_build_info():
265 path = defs['build_info_file']
266 if not path.startswith('/'):
267 dir = os.path.dirname(os.path.abspath(__file__))
268 path = os.path.join(dir, path)
269 path = os.path.abspath(path)
270 build_info = configparser.ConfigParser()
271 build_info.read(path)
272 results = types.SimpleNamespace(
273 version=build_info.get('buildinfo', 'version', fallback='unknown'),
274 vcs_ref=build_info.get('buildinfo', 'vcs_ref', fallback='unknown'),
275 vcs_dirty=build_info.get('buildinfo', 'vcs_dirty', fallback='unknown'),
276 build_time=build_info.get('buildinfo', 'build_time', fallback='unknown')
277 )
278 return results
279
280
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500281def print_banner(log):
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500282 log.info(' ___________ _____ _ _ _____ _ _ _ _ ')
283 log.info(' | _ | ___ \ ___| \ | | _ | \ | | | | | ')
284 log.info(' | | | | |_/ / |__ | \| | | | | \| | | | | ')
285 log.info(' | | | | __/| __|| . ` | | | | . ` | | | | ')
286 log.info(' \ \_/ / | | |___| |\ \ \_/ / |\ | |_| | ')
287 log.info(' \___/\_| \____/\_| \_/\___/\_| \_/\___/ ')
288 log.info(' ')
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500289
290
291@implementer(IComponent)
292class Main(object):
293
294 def __init__(self):
295
296 self.args = args = parse_args()
297 self.config = load_config(args)
298
Matteo Scandolod8d73172019-11-26 12:15:15 -0700299 # log levels in python are:
300 # 1 - DEBUG => verbosity_adjust = 0
301 # 2 - INFO => verbosity_adjust = 1
302 # 3 - WARNING => verbosity_adjust = 2
303 # 4 - ERROR
304 # 5 - CRITICAL
305 # If no flags are set we want to stick with INFO,
306 # if verbose is set we want to go down to DEBUG
307 # if quiet is set we want to go up to WARNING
308 # if you set both, you're doing something non-sense and you'll be back at INFO
309
Rohan Agrawalac066a02020-03-09 12:33:58 +0000310 verbosity_adjust = self.string_to_int(str(args.log_level))
311 if verbosity_adjust == -1:
312 print("Invalid loglevel is given: " + str(args.log_level))
313 sys.exit(0)
314
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500315 self.log = setup_logging(self.config.get('logging', {}),
316 args.instance_id,
317 verbosity_adjust=verbosity_adjust)
318 self.log.info('container-number-extractor',
319 regex=args.container_name_regex)
320
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500321 self.build_info = get_build_info()
322 self.log.info('OpenONU-Adapter-Version', build_version=self.build_info)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500323
324 if not args.no_banner:
325 print_banner(self.log)
326
327 self.adapter = None
328 # Create a unique instance id using the passed-in instance id and
329 # UTC timestamp
330 current_time = arrow.utcnow().timestamp
331 self.instance_id = self.args.instance_id + '_' + str(current_time)
332
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500333 self.core_topic = str(args.core_topic)
334 self.event_topic = str(args.event_topic)
335 self.listening_topic = str(args.name)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500336 self.startup_components()
337
338 if not args.no_heartbeat:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500339 self.start_kafka_cluster_heartbeat(self.instance_id)
340
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500341 def start(self):
342 self.start_reactor() # will not return except Keyboard interrupt
343
Rohan Agrawalac066a02020-03-09 12:33:58 +0000344 def string_to_int(self, loglevel):
345 l = loglevel.upper()
346 if l == "DEBUG": return 0
347 elif l == "INFO": return 1
348 elif l == "WARN": return 2
349 elif l == "ERROR": return 3
350 elif l == "CRITICAL": return 4
351 else: return -1
352
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500353 def stop(self):
354 pass
355
356 def get_args(self):
357 """Allow access to command line args"""
358 return self.args
359
360 def get_config(self):
361 """Allow access to content of config file"""
362 return self.config
363
364 def _get_adapter_config(self):
365 cfg = AdapterConfig()
366 return cfg
367
368 @inlineCallbacks
369 def startup_components(self):
370 try:
371 self.log.info('starting-internal-components',
372 consul=self.args.consul,
373 etcd=self.args.etcd)
374
375 registry.register('main', self)
376
377 # Update the logger to output the vcore id.
378 self.log = update_logging(instance_id=self.instance_id,
379 vcore_id=None)
380
381 yield registry.register(
382 'kafka_cluster_proxy',
383 KafkaProxy(
384 self.args.consul,
385 self.args.kafka_cluster,
386 config=self.config.get('kafka-cluster-proxy', {})
387 )
388 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000389 Probe.kafka_cluster_proxy_running = True
Neha Sharma61446d32020-03-23 14:32:31 +0000390 Probe.kafka_proxy_faulty = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500391
392 config = self._get_adapter_config()
393
394 self.core_proxy = CoreProxy(
395 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500396 default_core_topic=self.core_topic,
Devmalya Paulffc89df2019-07-31 17:43:13 -0400397 default_event_topic=self.event_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500398 my_listening_topic=self.listening_topic)
399
400 self.adapter_proxy = AdapterProxy(
401 kafka_proxy=None,
402 core_topic=self.core_topic,
403 my_listening_topic=self.listening_topic)
404
405 self.adapter = BrcmOpenomciOnuAdapter(
406 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500407 config=config,
408 build_info=self.build_info)
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500409
Matt Jeannereta32441c2019-03-07 05:16:37 -0500410 self.adapter.start()
411
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500412 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
413 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500414
415 yield registry.register(
416 'kafka_adapter_proxy',
417 IKafkaMessagingProxy(
418 kafka_host_port=self.args.kafka_adapter,
419 # TODO: Add KV Store object reference
420 kv_store=self.args.backend,
421 default_topic=self.args.name,
422 group_id_prefix=self.args.instance_id,
423 target_cls=openonu_request_handler
424 )
425 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000426 Probe.kafka_adapter_proxy_running = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500427
428 self.core_proxy.kafka_proxy = get_messaging_proxy()
429 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
430
431 # retry for ever
432 res = yield self._register_with_core(-1)
Neha Sharma61446d32020-03-23 14:32:31 +0000433 Probe.adapter_registered_with_core = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500434
435 self.log.info('started-internal-services')
436
437 except Exception as e:
438 self.log.exception('Failure-to-start-all-components', e=e)
439
440 @inlineCallbacks
441 def shutdown_components(self):
442 """Execute before the reactor is shut down"""
443 self.log.info('exiting-on-keyboard-interrupt')
444 for component in reversed(registry.iterate()):
445 yield component.stop()
446
Neha Sharma61446d32020-03-23 14:32:31 +0000447 self.server.shutdown()
448
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500449 import threading
450 self.log.info('THREADS:')
451 main_thread = threading.current_thread()
452 for t in threading.enumerate():
453 if t is main_thread:
454 continue
455 if not t.isDaemon():
456 continue
457 self.log.info('joining thread {} {}'.format(
458 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
459 t.join()
460
461 def start_reactor(self):
462 from twisted.internet import reactor
463 reactor.callWhenRunning(
464 lambda: self.log.info('twisted-reactor-started'))
465 reactor.addSystemEventTrigger('before', 'shutdown',
466 self.shutdown_components)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000467 reactor.callInThread(self.start_probe)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500468 reactor.run()
469
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000470 def start_probe(self):
471 args = self.args
472 host = args.probe.split(':')[0]
473 port = args.probe.split(':')[1]
Neha Sharma61446d32020-03-23 14:32:31 +0000474 socketserver.TCPServer.allow_reuse_address = True
475 self.server = socketserver.TCPServer((host, int(port)), Probe)
476 self.server.serve_forever()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000477
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500478 @inlineCallbacks
479 def _register_with_core(self, retries):
480 while 1:
481 try:
482 resp = yield self.core_proxy.register(
483 self.adapter.adapter_descriptor(),
484 self.adapter.device_types())
485 if resp:
486 self.log.info('registered-with-core',
487 coreId=resp.instance_id)
488
489 returnValue(resp)
490 except TimeOutError as e:
491 self.log.warn("timeout-when-registering-with-core", e=e)
492 if retries == 0:
493 self.log.exception("no-more-retries", e=e)
494 raise
495 else:
496 retries = retries if retries < 0 else retries - 1
497 yield asleep(defs['retry_interval'])
498 except Exception as e:
499 self.log.exception("failed-registration", e=e)
500 raise
501
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500502 # Temporary function to send a heartbeat message to the external kafka
503 # broker
504 def start_kafka_cluster_heartbeat(self, instance_id):
505 # For heartbeat we will send a message to a specific "voltha-heartbeat"
506 # topic. The message is a protocol buf
507 # message
508 message = dict(
509 type='heartbeat',
510 adapter=self.args.name,
511 instance=instance_id,
512 ip=get_my_primary_local_ipv4()
513 )
514 topic = defs['heartbeat_topic']
515
Neha Sharma61446d32020-03-23 14:32:31 +0000516 def send_heartbeat_msg():
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500517 try:
518 kafka_cluster_proxy = get_kafka_proxy()
Neha Sharma61446d32020-03-23 14:32:31 +0000519 if kafka_cluster_proxy:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500520 message['ts'] = arrow.utcnow().timestamp
Neha Sharma61446d32020-03-23 14:32:31 +0000521 self.log.debug('sending-kafka-heartbeat-message')
522
523 # Creating a handler to receive the message callbacks
524 df = Deferred()
525 df.addCallback(self.process_kafka_alive_state_update)
526 kafka_cluster_proxy.register_alive_state_update(df)
527 kafka_cluster_proxy.send_heartbeat_message(topic, dumps(message))
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500528 else:
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000529 Probe.kafka_cluster_proxy_running = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500530 self.log.error('kafka-proxy-unavailable')
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500531 except Exception as e:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500532 self.log.exception('failed-sending-message-heartbeat', e=e)
533
Neha Sharma61446d32020-03-23 14:32:31 +0000534 def check_heartbeat_delivery():
535 try:
536 kafka_cluster_proxy = get_kafka_proxy()
537 if kafka_cluster_proxy:
538 kafka_cluster_proxy.check_heartbeat_delivery()
539 except Exception as e:
540 self.log.exception('failed-checking-heartbeat-delivery', e=e)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500541
Neha Sharma61446d32020-03-23 14:32:31 +0000542 def schedule_periodic_heartbeat():
543 try:
544 # Sending the heartbeat message in a loop
545 lc_heartbeat = LoopingCall(send_heartbeat_msg)
546 lc_heartbeat.start(10)
547 # Polling the delivery status more frequently to get early notification
548 lc_poll = LoopingCall(check_heartbeat_delivery)
549 lc_poll.start(2)
550 except Exception as e:
551 self.log.exception('failed-kafka-heartbeat-startup', e=e)
552
553 from twisted.internet import reactor
554 # Delaying heartbeat initially to let kafka connection be established
555 reactor.callLater(5, schedule_periodic_heartbeat)
556
557 # Receiving the callback and updating the probe accordingly
558 def process_kafka_alive_state_update(self, alive_state):
559 self.log.debug('process-kafka-alive-state-update', alive_state=alive_state)
560 Probe.kafka_cluster_proxy_running = alive_state
561
562 kafka_cluster_proxy = get_kafka_proxy()
563 if kafka_cluster_proxy:
564 Probe.kafka_proxy_faulty = kafka_cluster_proxy.is_faulty()
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500565
566if __name__ == '__main__':
567 Main().start()