blob: def8e64db13b48fc0682fe58d063b0741cd1ef5e [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050020from __future__ import absolute_import
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050021import argparse
22import os
23import time
Matt Jeanneret08a8e862019-12-20 14:02:32 -050024import types
Rohan Agrawalac066a02020-03-09 12:33:58 +000025import sys
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050026
27import arrow
28import yaml
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050029import socketserver
Matt Jeanneret08a8e862019-12-20 14:02:32 -050030import configparser
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000031
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050032from simplejson import dumps
Neha Sharma61446d32020-03-23 14:32:31 +000033from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050034from twisted.internet.task import LoopingCall
35from zope.interface import implementer
36
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000037from pyvoltha.common.structlog_setup import setup_logging, update_logging, string_to_int
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050038from pyvoltha.common.utils.asleep import asleep
39from pyvoltha.common.utils.deferred_utils import TimeOutError
40from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
41from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050042 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050043from pyvoltha.common.utils.registry import registry, IComponent
44from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
45from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
46from pyvoltha.adapters.kafka.core_proxy import CoreProxy
47from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050048 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050049from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050050from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050051
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050052from brcm_openomci_onu_adapter import BrcmOpenomciOnuAdapter
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000053from probe import Probe
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000054from pyvoltha.adapters.log_controller import LogController, KV_STORE_DATA_PATH_PREFIX
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000055
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050056defs = dict(
Matt Jeanneret08a8e862019-12-20 14:02:32 -050057 build_info_file='./BUILDINFO',
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050058 config=os.environ.get('CONFIG', './openonu.yml'),
59 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
60 '0-9]+)\..*$'),
61 consul=os.environ.get('CONSUL', 'localhost:8500'),
Matteo Scandolobb3317d2020-03-31 10:49:42 -070062 name=os.environ.get('NAME', 'brcm_openomci_onu'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050063 vendor=os.environ.get('VENDOR', 'Voltha Project'),
64 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
65 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
66 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
Mahir Gunyel45610b42020-03-16 17:29:01 -070067 accept_incremental_evto_update=os.environ.get('ACCEPT_INCREMENTAL_EVTO_UPDATE', False),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050068 etcd=os.environ.get('ETCD', 'localhost:2379'),
69 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
Devmalya Paulffc89df2019-07-31 17:43:13 -040070 event_topic=os.environ.get('EVENT_TOPIC', 'voltha.events'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050071 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
72 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
73 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
74 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
75 backend=os.environ.get('BACKEND', 'none'),
76 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
77 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
Rohan Agrawalac066a02020-03-09 12:33:58 +000078 probe=os.environ.get('PROBE', ':8080'),
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000079 log_level=os.environ.get('LOG_LEVEL', 'WARN'),
Matteo Scandolobb3317d2020-03-31 10:49:42 -070080 current_replica=1,
81 total_replicas=1,
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000082 component_name=os.environ.get('COMPONENT_NAME', "adapter-open-onu")
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050083)
84
85
86def parse_args():
87 parser = argparse.ArgumentParser()
88
89 _help = ('Path to openonu.yml config file (default: %s). '
90 'If relative, it is relative to main.py of openonu adapter.'
91 % defs['config'])
92 parser.add_argument('-c', '--config',
93 dest='config',
94 action='store',
95 default=defs['config'],
96 help=_help)
97
98 _help = 'Regular expression for extracting conatiner number from ' \
99 'container name (default: %s)' % defs['container_name_regex']
100 parser.add_argument('-X', '--container-number-extractor',
101 dest='container_name_regex',
102 action='store',
103 default=defs['container_name_regex'],
104 help=_help)
105
106 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
107 parser.add_argument('-C', '--consul',
108 dest='consul',
109 action='store',
110 default=defs['consul'],
111 help=_help)
112
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700113 # NOTE this is really the adapter type
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500114 _help = 'name of this adapter (default: %s)' % defs['name']
115 parser.add_argument('-na', '--name',
116 dest='name',
117 action='store',
118 default=defs['name'],
119 help=_help)
120
121 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
122 parser.add_argument('-ven', '--vendor',
123 dest='vendor',
124 action='store',
125 default=defs['vendor'],
126 help=_help)
127
128 _help = 'supported device type of this adapter (default: %s)' % defs[
129 'device_type']
130 parser.add_argument('-dt', '--device_type',
131 dest='device_type',
132 action='store',
133 default=defs['device_type'],
134 help=_help)
135
136 _help = 'specifies whether the device type accepts bulk flow updates ' \
137 'adapter (default: %s)' % defs['accept_bulk_flow']
138 parser.add_argument('-abf', '--accept_bulk_flow',
139 dest='accept_bulk_flow',
140 action='store',
141 default=defs['accept_bulk_flow'],
142 help=_help)
143
144 _help = 'specifies whether the device type accepts add/remove flow ' \
145 '(default: %s)' % defs['accept_atomic_flow']
146 parser.add_argument('-aaf', '--accept_atomic_flow',
147 dest='accept_atomic_flow',
148 action='store',
149 default=defs['accept_atomic_flow'],
150 help=_help)
151
Mahir Gunyel45610b42020-03-16 17:29:01 -0700152 _help = 'specifies whether the adapter accepts incremental EVTO updates ' \
153 '(default: %s)' % defs['accept_incremental_evto_update']
154 parser.add_argument('-aie', '--accept_incremental_evto_update',
155 dest='accept_incremental_evto_update',
156 action='store',
157 default=defs['accept_incremental_evto_update'],
158 help=_help)
159
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500160 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
161 parser.add_argument('-e', '--etcd',
162 dest='etcd',
163 action='store',
164 default=defs['etcd'],
165 help=_help)
166
167 _help = ('unique string id of this container instance (default: %s)'
168 % defs['instance_id'])
169 parser.add_argument('-i', '--instance-id',
170 dest='instance_id',
171 action='store',
172 default=defs['instance_id'],
173 help=_help)
174
175 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
176 parser.add_argument('-I', '--interface',
177 dest='interface',
178 action='store',
179 default=defs['interface'],
180 help=_help)
181
182 _help = 'omit startup banner log lines'
183 parser.add_argument('-n', '--no-banner',
184 dest='no_banner',
185 action='store_true',
186 default=False,
187 help=_help)
188
189 _help = 'do not emit periodic heartbeat log messages'
190 parser.add_argument('-N', '--no-heartbeat',
191 dest='no_heartbeat',
192 action='store_true',
193 default=False,
194 help=_help)
195
Rohan Agrawalac066a02020-03-09 12:33:58 +0000196 _help = 'enable logging'
197 parser.add_argument('-l', '--log_level',
198 dest='log_level',
199 action='store',
200 default=defs['log_level'],
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500201 help=_help)
202
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000203 _help = 'get the component name'
204 parser.add_argument('-cn', '--component_name',
205 dest='env',
206 action='store',
207 help=_help)
208
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500209 _help = ('use docker container name as conatiner instance id'
210 ' (overrides -i/--instance-id option)')
211 parser.add_argument('--instance-id-is-container-name',
212 dest='instance_id_is_container_name',
213 action='store_true',
214 default=False,
215 help=_help)
216
217 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
218 'If not '
219 'specified (None), the address from the config file is used'
220 % defs['kafka_adapter'])
221 parser.add_argument('-KA', '--kafka_adapter',
222 dest='kafka_adapter',
223 action='store',
224 default=defs['kafka_adapter'],
225 help=_help)
226
227 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
228 'If not '
229 'specified (None), the address from the config file is used'
230 % defs['kafka_cluster'])
231 parser.add_argument('-KC', '--kafka_cluster',
232 dest='kafka_cluster',
233 action='store',
234 default=defs['kafka_cluster'],
235 help=_help)
236
237 _help = 'backend to use for config persitence'
238 parser.add_argument('-b', '--backend',
239 default=defs['backend'],
240 choices=['none', 'consul', 'etcd'],
241 help=_help)
242
243 _help = 'topic of core on the kafka bus'
244 parser.add_argument('-ct', '--core_topic',
245 dest='core_topic',
246 action='store',
247 default=defs['core_topic'],
248 help=_help)
249
Devmalya Paulffc89df2019-07-31 17:43:13 -0400250 _help = 'topic of events on the kafka bus'
251 parser.add_argument('-et', '--event_topic',
252 dest='event_topic',
253 action='store',
254 default=defs['event_topic'],
255 help=_help)
256
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000257 _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
258 parser.add_argument(
259 '-P', '--probe', dest='probe', action='store',
260 default=defs['probe'],
261 help=_help)
262
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700263 _help = 'Replica number of this particular instance (default: %s)' % defs['current_replica']
264 parser.add_argument(
265 '--currentReplica', dest='current_replica', action='store',
266 default=defs['current_replica'],
267 type=int,
268 help=_help)
269
270 _help = 'Total number of instances for this adapter (default: %s)' % defs['total_replicas']
271 parser.add_argument(
272 '--totalReplicas', dest='total_replicas', action='store',
273 default=defs['total_replicas'],
274 type=int,
275 help=_help)
276
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500277 args = parser.parse_args()
278
279 # post-processing
280
281 if args.instance_id_is_container_name:
282 args.instance_id = get_my_containers_name()
283
284 return args
285
286
287def load_config(args):
288 path = args.config
289 if path.startswith('.'):
290 dir = os.path.dirname(os.path.abspath(__file__))
291 path = os.path.join(dir, path)
292 path = os.path.abspath(path)
293 with open(path) as fd:
294 config = yaml.load(fd)
295 return config
296
297
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500298def get_build_info():
299 path = defs['build_info_file']
300 if not path.startswith('/'):
301 dir = os.path.dirname(os.path.abspath(__file__))
302 path = os.path.join(dir, path)
303 path = os.path.abspath(path)
304 build_info = configparser.ConfigParser()
305 build_info.read(path)
306 results = types.SimpleNamespace(
307 version=build_info.get('buildinfo', 'version', fallback='unknown'),
308 vcs_ref=build_info.get('buildinfo', 'vcs_ref', fallback='unknown'),
309 vcs_dirty=build_info.get('buildinfo', 'vcs_dirty', fallback='unknown'),
310 build_time=build_info.get('buildinfo', 'build_time', fallback='unknown')
311 )
312 return results
313
314
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500315def print_banner(log):
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500316 log.info(' ___________ _____ _ _ _____ _ _ _ _ ')
317 log.info(' | _ | ___ \ ___| \ | | _ | \ | | | | | ')
318 log.info(' | | | | |_/ / |__ | \| | | | | \| | | | | ')
319 log.info(' | | | | __/| __|| . ` | | | | . ` | | | | ')
320 log.info(' \ \_/ / | | |___| |\ \ \_/ / |\ | |_| | ')
321 log.info(' \___/\_| \____/\_| \_/\___/\_| \_/\___/ ')
322 log.info(' ')
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500323
324
325@implementer(IComponent)
326class Main(object):
327
328 def __init__(self):
329
330 self.args = args = parse_args()
331 self.config = load_config(args)
332
Matteo Scandolod8d73172019-11-26 12:15:15 -0700333 # log levels in python are:
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000334 # 1 - DEBUG => verbosity_adjust = 10
335 # 2 - INFO => verbosity_adjust = 20
336 # 3 - WARNING => verbosity_adjust = 30
337 # 4 - ERROR => verbosity_adjust = 40
338 # 5 - CRITICAL => verbosity_adjust = 50
Matteo Scandolod8d73172019-11-26 12:15:15 -0700339
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000340 verbosity_adjust = string_to_int(str(args.log_level))
341
342 if verbosity_adjust == 0:
343 raise ValueError("Invalid loglevel is given: " + str(args.log_level))
344 sys.exit(1)
Rohan Agrawalac066a02020-03-09 12:33:58 +0000345
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500346 self.log = setup_logging(self.config.get('logging', {}),
347 args.instance_id,
348 verbosity_adjust=verbosity_adjust)
349 self.log.info('container-number-extractor',
350 regex=args.container_name_regex)
351
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500352 self.build_info = get_build_info()
353 self.log.info('OpenONU-Adapter-Version', build_version=self.build_info)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500354
355 if not args.no_banner:
356 print_banner(self.log)
357
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000358 self.etcd_host = str(args.etcd).split(':')[0]
359 self.etcd_port = str(args.etcd).split(':')[1]
360
361 self.controller = LogController(self.etcd_host, self.etcd_port)
362
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500363 self.adapter = None
364 # Create a unique instance id using the passed-in instance id and
365 # UTC timestamp
366 current_time = arrow.utcnow().timestamp
367 self.instance_id = self.args.instance_id + '_' + str(current_time)
368
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500369 self.core_topic = str(args.core_topic)
370 self.event_topic = str(args.event_topic)
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700371 self.listening_topic = "%s_%s" % (args.name, args.current_replica)
372 self.id = "%s_%s" % (args.name, args.current_replica)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500373 self.startup_components()
374
375 if not args.no_heartbeat:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500376 self.start_kafka_cluster_heartbeat(self.instance_id)
377
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500378 def start(self):
379 self.start_reactor() # will not return except Keyboard interrupt
380
381 def stop(self):
382 pass
383
384 def get_args(self):
385 """Allow access to command line args"""
386 return self.args
387
388 def get_config(self):
389 """Allow access to content of config file"""
390 return self.config
391
392 def _get_adapter_config(self):
393 cfg = AdapterConfig()
394 return cfg
395
396 @inlineCallbacks
397 def startup_components(self):
398 try:
399 self.log.info('starting-internal-components',
400 consul=self.args.consul,
401 etcd=self.args.etcd)
402
403 registry.register('main', self)
404
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500405 yield registry.register(
406 'kafka_cluster_proxy',
407 KafkaProxy(
408 self.args.consul,
409 self.args.kafka_cluster,
410 config=self.config.get('kafka-cluster-proxy', {})
411 )
412 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000413 Probe.kafka_cluster_proxy_running = True
Neha Sharma61446d32020-03-23 14:32:31 +0000414 Probe.kafka_proxy_faulty = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500415
416 config = self._get_adapter_config()
417
418 self.core_proxy = CoreProxy(
419 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500420 default_core_topic=self.core_topic,
Devmalya Paulffc89df2019-07-31 17:43:13 -0400421 default_event_topic=self.event_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500422 my_listening_topic=self.listening_topic)
423
424 self.adapter_proxy = AdapterProxy(
425 kafka_proxy=None,
426 core_topic=self.core_topic,
427 my_listening_topic=self.listening_topic)
428
429 self.adapter = BrcmOpenomciOnuAdapter(
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700430 id=self.id,
431 core_proxy=self.core_proxy,
432 adapter_proxy=self.adapter_proxy,
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500433 config=config,
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700434 build_info=self.build_info,
435 current_replica=self.args.current_replica,
436 total_replicas=self.args.total_replicas,
437 endpoint=self.listening_topic
438 )
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500439
Matt Jeannereta32441c2019-03-07 05:16:37 -0500440 self.adapter.start()
441
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500442 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
443 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500444
445 yield registry.register(
446 'kafka_adapter_proxy',
447 IKafkaMessagingProxy(
448 kafka_host_port=self.args.kafka_adapter,
449 # TODO: Add KV Store object reference
450 kv_store=self.args.backend,
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700451 default_topic=self.listening_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500452 group_id_prefix=self.args.instance_id,
453 target_cls=openonu_request_handler
454 )
455 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000456 Probe.kafka_adapter_proxy_running = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500457
458 self.core_proxy.kafka_proxy = get_messaging_proxy()
459 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
460
461 # retry for ever
462 res = yield self._register_with_core(-1)
Neha Sharma61446d32020-03-23 14:32:31 +0000463 Probe.adapter_registered_with_core = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500464
465 self.log.info('started-internal-services')
466
467 except Exception as e:
468 self.log.exception('Failure-to-start-all-components', e=e)
469
470 @inlineCallbacks
471 def shutdown_components(self):
472 """Execute before the reactor is shut down"""
473 self.log.info('exiting-on-keyboard-interrupt')
474 for component in reversed(registry.iterate()):
475 yield component.stop()
476
Neha Sharma61446d32020-03-23 14:32:31 +0000477 self.server.shutdown()
478
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500479 import threading
480 self.log.info('THREADS:')
481 main_thread = threading.current_thread()
482 for t in threading.enumerate():
483 if t is main_thread:
484 continue
485 if not t.isDaemon():
486 continue
487 self.log.info('joining thread {} {}'.format(
488 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
489 t.join()
490
491 def start_reactor(self):
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000492 from twisted.internet import reactor, defer
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500493 reactor.callWhenRunning(
494 lambda: self.log.info('twisted-reactor-started'))
495 reactor.addSystemEventTrigger('before', 'shutdown',
496 self.shutdown_components)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000497 reactor.callInThread(self.start_probe)
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000498 defer.maybeDeferred(self.controller.start_watch_log_config_change, self.args.instance_id, str(self.args.log_level))
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500499 reactor.run()
500
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000501 def start_probe(self):
502 args = self.args
503 host = args.probe.split(':')[0]
504 port = args.probe.split(':')[1]
Neha Sharma61446d32020-03-23 14:32:31 +0000505 socketserver.TCPServer.allow_reuse_address = True
506 self.server = socketserver.TCPServer((host, int(port)), Probe)
507 self.server.serve_forever()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000508
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500509 @inlineCallbacks
510 def _register_with_core(self, retries):
511 while 1:
512 try:
513 resp = yield self.core_proxy.register(
514 self.adapter.adapter_descriptor(),
515 self.adapter.device_types())
516 if resp:
517 self.log.info('registered-with-core',
518 coreId=resp.instance_id)
519
520 returnValue(resp)
521 except TimeOutError as e:
522 self.log.warn("timeout-when-registering-with-core", e=e)
523 if retries == 0:
524 self.log.exception("no-more-retries", e=e)
525 raise
526 else:
527 retries = retries if retries < 0 else retries - 1
528 yield asleep(defs['retry_interval'])
529 except Exception as e:
530 self.log.exception("failed-registration", e=e)
531 raise
532
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500533 # Temporary function to send a heartbeat message to the external kafka
534 # broker
535 def start_kafka_cluster_heartbeat(self, instance_id):
536 # For heartbeat we will send a message to a specific "voltha-heartbeat"
537 # topic. The message is a protocol buf
538 # message
539 message = dict(
540 type='heartbeat',
541 adapter=self.args.name,
542 instance=instance_id,
543 ip=get_my_primary_local_ipv4()
544 )
545 topic = defs['heartbeat_topic']
546
Neha Sharma61446d32020-03-23 14:32:31 +0000547 def send_heartbeat_msg():
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500548 try:
549 kafka_cluster_proxy = get_kafka_proxy()
Neha Sharma61446d32020-03-23 14:32:31 +0000550 if kafka_cluster_proxy:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500551 message['ts'] = arrow.utcnow().timestamp
Neha Sharma61446d32020-03-23 14:32:31 +0000552 self.log.debug('sending-kafka-heartbeat-message')
553
554 # Creating a handler to receive the message callbacks
555 df = Deferred()
556 df.addCallback(self.process_kafka_alive_state_update)
557 kafka_cluster_proxy.register_alive_state_update(df)
558 kafka_cluster_proxy.send_heartbeat_message(topic, dumps(message))
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500559 else:
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000560 Probe.kafka_cluster_proxy_running = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500561 self.log.error('kafka-proxy-unavailable')
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500562 except Exception as e:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500563 self.log.exception('failed-sending-message-heartbeat', e=e)
564
Neha Sharma61446d32020-03-23 14:32:31 +0000565 def check_heartbeat_delivery():
566 try:
567 kafka_cluster_proxy = get_kafka_proxy()
568 if kafka_cluster_proxy:
569 kafka_cluster_proxy.check_heartbeat_delivery()
570 except Exception as e:
571 self.log.exception('failed-checking-heartbeat-delivery', e=e)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500572
Neha Sharma61446d32020-03-23 14:32:31 +0000573 def schedule_periodic_heartbeat():
574 try:
575 # Sending the heartbeat message in a loop
576 lc_heartbeat = LoopingCall(send_heartbeat_msg)
577 lc_heartbeat.start(10)
578 # Polling the delivery status more frequently to get early notification
579 lc_poll = LoopingCall(check_heartbeat_delivery)
580 lc_poll.start(2)
581 except Exception as e:
582 self.log.exception('failed-kafka-heartbeat-startup', e=e)
583
584 from twisted.internet import reactor
585 # Delaying heartbeat initially to let kafka connection be established
586 reactor.callLater(5, schedule_periodic_heartbeat)
587
588 # Receiving the callback and updating the probe accordingly
589 def process_kafka_alive_state_update(self, alive_state):
590 self.log.debug('process-kafka-alive-state-update', alive_state=alive_state)
591 Probe.kafka_cluster_proxy_running = alive_state
592
593 kafka_cluster_proxy = get_kafka_proxy()
594 if kafka_cluster_proxy:
595 Probe.kafka_proxy_faulty = kafka_cluster_proxy.is_faulty()
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500596
597if __name__ == '__main__':
598 Main().start()