blob: 8bff56d7f205cac7318bc133d2fe598189085e2d [file] [log] [blame]
Chip Boling8e042f62019-02-12 16:14:34 -06001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Adtran ONU Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
32from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
37 get_my_primary_interface
38from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
43 get_messaging_proxy
44from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from adtran_onu import AdtranOnuAdapter
46from pyvoltha.protos import third_party
47from pyvoltha.protos.adapter_pb2 import AdapterConfig
Chip Bolingd2d7a4d2019-03-14 14:34:56 -050048import sys
Chip Boling8e042f62019-02-12 16:14:34 -060049
50_ = third_party
51
52
Chip Bolingd2d7a4d2019-03-14 14:34:56 -050053defs=dict(
54 version_file='/voltha/VERSION',
55 config=os.environ.get('CONFIG', './adapters-adtran-onu.yml'),
Chip Boling8e042f62019-02-12 16:14:34 -060056 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
57 '0-9]+)\..*$'),
58 consul=os.environ.get('CONSUL', 'localhost:8500'),
59 name=os.environ.get('NAME', 'adtran_onu'),
60 vendor=os.environ.get('VENDOR', 'Voltha Project'),
61 device_type=os.environ.get('DEVICE_TYPE', 'adtran_onu'),
62 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
63 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
64 etcd=os.environ.get('ETCD', 'localhost:2379'),
65 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
66 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
67 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
68 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
69 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
70 backend=os.environ.get('BACKEND', 'none'),
71 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
72 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
73
74 # Following are for debugging
75 debug_enabled=True,
76 debug_host='work.bcsw.net',
77 # debug_host='10.0.2.15',
Chip Bolingd2d7a4d2019-03-14 14:34:56 -050078 debug_port=8765,
Chip Boling8e042f62019-02-12 16:14:34 -060079)
80
81
82def parse_args():
83 parser = argparse.ArgumentParser()
84
85 _help = ('Path to adapters-adtran_onu.yml config file (default: %s). '
86 'If relative, it is relative to main.py of Adtran ONU adapter.'
87 % defs['config'])
88 parser.add_argument('-c', '--config',
89 dest='config',
90 action='store',
91 default=defs['config'],
92 help=_help)
93
94 _help = 'Regular expression for extracting container number from ' \
95 'container name (default: %s)' % defs['container_name_regex']
96 parser.add_argument('-X', '--container-number-extractor',
97 dest='container_name_regex',
98 action='store',
99 default=defs['container_name_regex'],
100 help=_help)
101
102 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
103 parser.add_argument('-C', '--consul',
104 dest='consul',
105 action='store',
106 default=defs['consul'],
107 help=_help)
108
109 _help = 'name of this adapter (default: %s)' % defs['name']
110 parser.add_argument('-na', '--name',
111 dest='name',
112 action='store',
113 default=defs['name'],
114 help=_help)
115
116 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
117 parser.add_argument('-ven', '--vendor',
118 dest='vendor',
119 action='store',
120 default=defs['vendor'],
121 help=_help)
122
123 _help = 'supported device type of this adapter (default: %s)' % defs[
124 'device_type']
125 parser.add_argument('-dt', '--device_type',
126 dest='device_type',
127 action='store',
128 default=defs['device_type'],
129 help=_help)
130
131 _help = 'specifies whether the device type accepts bulk flow updates ' \
132 'adapter (default: %s)' % defs['accept_bulk_flow']
133 parser.add_argument('-abf', '--accept_bulk_flow',
134 dest='accept_bulk_flow',
135 action='store',
136 default=defs['accept_bulk_flow'],
137 help=_help)
138
139 _help = 'specifies whether the device type accepts add/remove flow ' \
140 '(default: %s)' % defs['accept_atomic_flow']
141 parser.add_argument('-aaf', '--accept_atomic_flow',
142 dest='accept_atomic_flow',
143 action='store',
144 default=defs['accept_atomic_flow'],
145 help=_help)
146
147 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
148 parser.add_argument('-e', '--etcd',
149 dest='etcd',
150 action='store',
151 default=defs['etcd'],
152 help=_help)
153
154 _help = ('unique string id of this container instance (default: %s)'
155 % defs['instance_id'])
156 parser.add_argument('-i', '--instance-id',
157 dest='instance_id',
158 action='store',
159 default=defs['instance_id'],
160 help=_help)
161
162 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
163 parser.add_argument('-I', '--interface',
164 dest='interface',
165 action='store',
166 default=defs['interface'],
167 help=_help)
168
169 _help = 'omit startup banner log lines'
170 parser.add_argument('-n', '--no-banner',
171 dest='no_banner',
172 action='store_true',
173 default=False,
174 help=_help)
175
176 _help = 'do not emit periodic heartbeat log messages'
177 parser.add_argument('-N', '--no-heartbeat',
178 dest='no_heartbeat',
179 action='store_true',
180 default=False,
181 help=_help)
182
183 _help = "suppress debug and info logs"
184 parser.add_argument('-q', '--quiet',
185 dest='quiet',
186 action='count',
187 help=_help)
188
189 _help = 'enable verbose logging'
190 parser.add_argument('-v', '--verbose',
191 dest='verbose',
192 action='count',
193 help=_help)
194
195 _help = ('use docker container name as container instance id'
196 ' (overrides -i/--instance-id option)')
197 parser.add_argument('--instance-id-is-container-name',
198 dest='instance_id_is_container_name',
199 action='store_true',
200 default=False,
201 help=_help)
202
203 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
204 'If not '
205 'specified (None), the address from the config file is used'
206 % defs['kafka_adapter'])
207 parser.add_argument('-KA', '--kafka_adapter',
208 dest='kafka_adapter',
209 action='store',
210 default=defs['kafka_adapter'],
211 help=_help)
212
213 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
214 'If not '
215 'specified (None), the address from the config file is used'
216 % defs['kafka_cluster'])
217 parser.add_argument('-KC', '--kafka_cluster',
218 dest='kafka_cluster',
219 action='store',
220 default=defs['kafka_cluster'],
221 help=_help)
222
223 _help = 'backend to use for config persistence'
224 parser.add_argument('-b', '--backend',
225 default=defs['backend'],
226 choices=['none', 'consul', 'etcd'],
227 help=_help)
228
229 _help = 'topic of core on the kafka bus'
230 parser.add_argument('-ct', '--core_topic',
231 dest='core_topic',
232 action='store',
233 default=defs['core_topic'],
234 help=_help)
235
236 _help = 'Enable remote python debug'
237 parser.add_argument('-de', '--debug_enabled',
238 dest='debug_enabled',
239 action='store_true',
240 default=defs['debug_enabled'],
241 help=_help)
242
243 _help = 'remote debug hostname or IP address'
244 parser.add_argument('-dh', '--debug_host',
245 dest='debug_host',
246 action='store',
247 default=defs['debug_host'],
248 help=_help)
249
250 _help = 'remote debug port number'
251 parser.add_argument('-dp', '--debug_port',
252 dest='debug_port',
253 action='store',
254 default=defs['debug_port'],
255 help=_help)
256
257 args = parser.parse_args()
258
259 # post-processing
260
261 if args.instance_id_is_container_name:
262 args.instance_id = get_my_containers_name()
263
264 return args
265
266
267def setup_remote_debug(host, port, logger):
268 try:
269 import sys
270 sys.path.append('/voltha/pydevd/pycharm-debug.egg')
271 import pydevd
272 # Initial breakpoint
273
274 pydevd.settrace(host, port=port, stdoutToServer=True, stderrToServer=True, suspend=False)
275
276 except ImportError:
277 logger.error('Error importing pydevd package')
278 logger.error('REMOTE DEBUGGING will not be supported in this run...')
279 # Continue on, you do not want to completely kill VOLTHA, you just need to fix it.
280
281 except AttributeError:
282 logger.error('Attribute error. Perhaps try to explicitly set PYTHONPATH to'
283 'pydevd directory and rlogger.errorun again?')
284 logger.error('REMOTE DEBUGGING will not be supported in this run...')
285 # Continue on, you do not want to completely kill VOLTHA, you just need to fix it.
286
287 except:
288 import sys
289 logger.error("pydevd startup exception: %s" % sys.exc_info()[0])
290 print('REMOTE DEBUGGING will not be supported in this run...')
291
292
293def load_config(args):
294 path = args.config
295 if path.startswith('.'):
296 dir = os.path.dirname(os.path.abspath(__file__))
297 path = os.path.join(dir, path)
298 path = os.path.abspath(path)
299 with open(path) as fd:
300 config = yaml.load(fd)
301 return config
302
303
304def print_banner(log):
305 log.info(" _____ _______ _____ _ _ ____ _ _______ ")
306 log.info(" /\ | __ \__ __| __ \ /\ | \ | | / __ \| \ | | | | |")
307 log.info(" / \ | | | | | | | |__) | / \ | \| | | | | | \| | | | |")
308 log.info(" / /\ \ | | | | | | | _ / / /\ \ | . ` | | | | | . ` | | | |")
309 log.info(" / ____ \| |__| | | | | | \ \ / ____ \| |\ | | |__| | |\ | |__| |")
310 log.info(" /_/ \_\_____/ |_| |_| _\_\/_/ \_\_| \_| \____/|_| \_|\____/ ")
311 log.info(" /\ | | | | ")
312 log.info(" / \ __| | __ _ _ __ | |_ ___ _ __ ")
313 log.info(" / /\ \ / _` |/ _` | '_ \| __/ _ \ '__| ")
314 log.info(" / ____ \ (_| | (_| | |_) | || __/ | ")
315 log.info(" /_/ \_\__,_|\__,_| .__/ \__\___|_| ")
316 log.info(" | | ")
317 log.info(" |_| ")
318 log.info("")
319 log.info('(to stop: press Ctrl-C)')
320
321
322@implementer(IComponent)
323class Main(object):
324
325 def __init__(self):
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500326 try:
327 self.args = args = parse_args()
328 self.config = load_config(args)
Chip Boling8e042f62019-02-12 16:14:34 -0600329
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500330 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
331 self.log = setup_logging(self.config.get('logging', {}),
332 args.instance_id,
333 verbosity_adjust=verbosity_adjust)
334 self.log.info('container-number-extractor',
335 regex=args.container_name_regex)
Chip Boling8e042f62019-02-12 16:14:34 -0600336
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500337 if args.debug_enabled:
338 setup_remote_debug(args.debug_host, args.debug_port, self.log)
Chip Boling8e042f62019-02-12 16:14:34 -0600339
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500340 self.adtran_onu_adapter_version = self.get_version()
341 self.log.info('ADTRAN-ONU-Adapter-Version', version=self.adtran_onu_adapter_version)
Chip Boling8e042f62019-02-12 16:14:34 -0600342
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500343 if not args.no_banner:
344 print_banner(self.log)
Chip Boling8e042f62019-02-12 16:14:34 -0600345
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500346 self.adapter = None
347 self.core_proxy = None
348 self.adapter_proxy = None
Chip Boling8e042f62019-02-12 16:14:34 -0600349
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500350 # Create a unique instance id using the passed-in instance id and
351 # UTC timestamp
352 current_time = arrow.utcnow().timestamp
353 self.instance_id = self.args.instance_id + '_' + str(current_time)
Chip Boling8e042f62019-02-12 16:14:34 -0600354
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500355 self.core_topic = args.core_topic
356 self.listening_topic = args.name
357 self.startup_components()
Chip Boling8e042f62019-02-12 16:14:34 -0600358
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500359 if not args.no_heartbeat:
360 self.start_heartbeat()
361 self.start_kafka_cluster_heartbeat(self.instance_id)
Chip Boling8e042f62019-02-12 16:14:34 -0600362
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500363 except Exception as e:
364 self.log.exception('unhandled-exception', e=e)
365 for tick in xrange(0, 30):
366 import time
367 time.sleep(2)
Chip Boling8e042f62019-02-12 16:14:34 -0600368
369 def get_version(self):
370 path = defs['version_file']
371 if not path.startswith('/'):
372 dir = os.path.dirname(os.path.abspath(__file__))
373 path = os.path.join(dir, path)
374
375 path = os.path.abspath(path)
376 version_file = open(path, 'r')
377 v = version_file.read()
378
379 # Use Version to validate the version string - exception will be raised
380 # if the version is invalid
381 Version(v)
382
383 version_file.close()
384 return v
385
386 def start(self):
387 self.start_reactor() # will not return except Keyboard interrupt
388
389 def stop(self):
390 pass
391
392 def get_args(self):
393 """Allow access to command line args"""
394 return self.args
395
396 def get_config(self):
397 """Allow access to content of config file"""
398 return self.config
399
400 def _get_adapter_config(self):
401 cfg = AdapterConfig()
402 return cfg
403
404 @inlineCallbacks
405 def startup_components(self):
406 try:
407 self.log.info('starting-internal-components',
408 consul=self.args.consul,
409 etcd=self.args.etcd)
410
411 registry.register('main', self)
412
413 # Update the logger to output the vcore id.
414 self.log = update_logging(instance_id=self.instance_id,
415 vcore_id=None)
416
417 yield registry.register(
418 'kafka_cluster_proxy',
419 KafkaProxy(
420 self.args.consul,
421 self.args.kafka_cluster,
422 config=self.config.get('kafka-cluster-proxy', {})
423 )
424 ).start()
425
426 config = self._get_adapter_config()
427
428 self.core_proxy = CoreProxy(
429 kafka_proxy=None,
430 core_topic=self.core_topic,
431 my_listening_topic=self.listening_topic)
432
433 self.adapter_proxy = AdapterProxy(
434 kafka_proxy=None,
435 core_topic=self.core_topic,
436 my_listening_topic=self.listening_topic)
437
438 self.adapter = AdtranOnuAdapter(core_proxy=self.core_proxy,
439 adapter_proxy=self.adapter_proxy,
440 config=config)
441
442 adtran_request_handler = AdapterRequestFacade(adapter=self.adapter)
443
444 yield registry.register(
445 'kafka_adapter_proxy',
446 IKafkaMessagingProxy(
447 kafka_host_port=self.args.kafka_adapter,
448 # TODO: Add KV Store object reference
449 kv_store=self.args.backend,
450 default_topic=self.args.name,
451 group_id_prefix=self.args.instance_id,
452 target_cls=adtran_request_handler
453 )
454 ).start()
455
456 self.core_proxy.kafka_proxy = get_messaging_proxy()
457 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
458
459 # retry for ever
460 res = yield self._register_with_core(-1)
461
462 self.log.info('started-internal-services')
463
464 except Exception as e:
465 self.log.exception('Failure-to-start-all-components', e=e)
466
467 @inlineCallbacks
468 def shutdown_components(self):
469 """Execute before the reactor is shut down"""
470 self.log.info('exiting-on-keyboard-interrupt')
471 for component in reversed(registry.iterate()):
472 yield component.stop()
473
474 import threading
475 self.log.info('THREADS:')
476 main_thread = threading.current_thread()
477 for t in threading.enumerate():
478 if t is main_thread:
479 continue
480 if not t.isDaemon():
481 continue
482 self.log.info('joining thread {} {}'.format(
483 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
484 t.join()
485
486 def start_reactor(self):
487 from twisted.internet import reactor
488 reactor.callWhenRunning(
489 lambda: self.log.info('twisted-reactor-started'))
490 reactor.addSystemEventTrigger('before', 'shutdown',
491 self.shutdown_components)
492 reactor.run()
493
494 @inlineCallbacks
495 def _register_with_core(self, retries):
496 while 1:
497 try:
498 resp = yield self.core_proxy.register(
499 self.adapter.adapter_descriptor(),
500 self.adapter.device_types())
501 if resp:
502 self.log.info('registered-with-core',
503 coreId=resp.instance_id)
504 returnValue(resp)
505 except TimeOutError as e:
506 self.log.warn("timeout-when-registering-with-core", e=e)
507 if retries == 0:
508 self.log.exception("no-more-retries", e=e)
509 raise
510 else:
511 retries = retries if retries < 0 else retries - 1
512 yield asleep(defs['retry_interval'])
513 except Exception as e:
514 self.log.exception("failed-registration", e=e)
515 raise
516
517 def start_heartbeat(self):
518
519 t0 = time.time()
520 t0s = time.ctime(t0)
521
522 def heartbeat():
523 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
524
525 lc = LoopingCall(heartbeat)
526 lc.start(10)
527
528 # Temporary function to send a heartbeat message to the external kafka
529 # broker
530 def start_kafka_cluster_heartbeat(self, instance_id):
531 # For heartbeat we will send a message to a specific "voltha-heartbeat"
532 # topic. The message is a protocol buf
533 # message
534 message = dict(
535 type='heartbeat',
536 adapter=self.args.name,
537 instance=instance_id,
538 ip=get_my_primary_local_ipv4()
539 )
540 topic = defs['heartbeat_topic']
541
542 def send_msg(start_time):
543 try:
544 kafka_cluster_proxy = get_kafka_proxy()
545 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
546 # self.log.debug('kafka-proxy-available')
547 message['ts'] = arrow.utcnow().timestamp
548 message['uptime'] = time.time() - start_time
549 # self.log.debug('start-kafka-heartbeat')
550 kafka_cluster_proxy.send_message(topic, dumps(message))
551 else:
552 self.log.error('kafka-proxy-unavailable')
553 except Exception, err:
554 self.log.exception('failed-sending-message-heartbeat', e=err)
555
556 try:
557 t0 = time.time()
558 lc = LoopingCall(send_msg, t0)
559 lc.start(10)
560 except Exception, e:
561 self.log.exception('failed-kafka-heartbeat', e=e)
562
563
564if __name__ == '__main__':
565 Main().start()