blob: e19f320f2a63a81b211f120fcb5b37be6b997202 [file] [log] [blame]
Chip Boling8e042f62019-02-12 16:14:34 -06001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Adtran ONU Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
32from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
37 get_my_primary_interface
38from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
43 get_messaging_proxy
44from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from adtran_onu import AdtranOnuAdapter
46from pyvoltha.protos import third_party
47from pyvoltha.protos.adapter_pb2 import AdapterConfig
48
49_ = third_party
50
51
52defs = dict(
53 version_file='./VERSION',
54 config=os.environ.get('CONFIG', './adapters-adtran_onu.yml'),
55 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
56 '0-9]+)\..*$'),
57 consul=os.environ.get('CONSUL', 'localhost:8500'),
58 name=os.environ.get('NAME', 'adtran_onu'),
59 vendor=os.environ.get('VENDOR', 'Voltha Project'),
60 device_type=os.environ.get('DEVICE_TYPE', 'adtran_onu'),
61 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
62 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
63 etcd=os.environ.get('ETCD', 'localhost:2379'),
64 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
65 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
66 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
67 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
68 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
69 backend=os.environ.get('BACKEND', 'none'),
70 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
71 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
72
73 # Following are for debugging
74 debug_enabled=True,
75 debug_host='work.bcsw.net',
76 # debug_host='10.0.2.15',
77 debug_port=5678,
78)
79
80
81def parse_args():
82 parser = argparse.ArgumentParser()
83
84 _help = ('Path to adapters-adtran_onu.yml config file (default: %s). '
85 'If relative, it is relative to main.py of Adtran ONU adapter.'
86 % defs['config'])
87 parser.add_argument('-c', '--config',
88 dest='config',
89 action='store',
90 default=defs['config'],
91 help=_help)
92
93 _help = 'Regular expression for extracting container number from ' \
94 'container name (default: %s)' % defs['container_name_regex']
95 parser.add_argument('-X', '--container-number-extractor',
96 dest='container_name_regex',
97 action='store',
98 default=defs['container_name_regex'],
99 help=_help)
100
101 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
102 parser.add_argument('-C', '--consul',
103 dest='consul',
104 action='store',
105 default=defs['consul'],
106 help=_help)
107
108 _help = 'name of this adapter (default: %s)' % defs['name']
109 parser.add_argument('-na', '--name',
110 dest='name',
111 action='store',
112 default=defs['name'],
113 help=_help)
114
115 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
116 parser.add_argument('-ven', '--vendor',
117 dest='vendor',
118 action='store',
119 default=defs['vendor'],
120 help=_help)
121
122 _help = 'supported device type of this adapter (default: %s)' % defs[
123 'device_type']
124 parser.add_argument('-dt', '--device_type',
125 dest='device_type',
126 action='store',
127 default=defs['device_type'],
128 help=_help)
129
130 _help = 'specifies whether the device type accepts bulk flow updates ' \
131 'adapter (default: %s)' % defs['accept_bulk_flow']
132 parser.add_argument('-abf', '--accept_bulk_flow',
133 dest='accept_bulk_flow',
134 action='store',
135 default=defs['accept_bulk_flow'],
136 help=_help)
137
138 _help = 'specifies whether the device type accepts add/remove flow ' \
139 '(default: %s)' % defs['accept_atomic_flow']
140 parser.add_argument('-aaf', '--accept_atomic_flow',
141 dest='accept_atomic_flow',
142 action='store',
143 default=defs['accept_atomic_flow'],
144 help=_help)
145
146 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
147 parser.add_argument('-e', '--etcd',
148 dest='etcd',
149 action='store',
150 default=defs['etcd'],
151 help=_help)
152
153 _help = ('unique string id of this container instance (default: %s)'
154 % defs['instance_id'])
155 parser.add_argument('-i', '--instance-id',
156 dest='instance_id',
157 action='store',
158 default=defs['instance_id'],
159 help=_help)
160
161 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
162 parser.add_argument('-I', '--interface',
163 dest='interface',
164 action='store',
165 default=defs['interface'],
166 help=_help)
167
168 _help = 'omit startup banner log lines'
169 parser.add_argument('-n', '--no-banner',
170 dest='no_banner',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = 'do not emit periodic heartbeat log messages'
176 parser.add_argument('-N', '--no-heartbeat',
177 dest='no_heartbeat',
178 action='store_true',
179 default=False,
180 help=_help)
181
182 _help = "suppress debug and info logs"
183 parser.add_argument('-q', '--quiet',
184 dest='quiet',
185 action='count',
186 help=_help)
187
188 _help = 'enable verbose logging'
189 parser.add_argument('-v', '--verbose',
190 dest='verbose',
191 action='count',
192 help=_help)
193
194 _help = ('use docker container name as container instance id'
195 ' (overrides -i/--instance-id option)')
196 parser.add_argument('--instance-id-is-container-name',
197 dest='instance_id_is_container_name',
198 action='store_true',
199 default=False,
200 help=_help)
201
202 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
203 'If not '
204 'specified (None), the address from the config file is used'
205 % defs['kafka_adapter'])
206 parser.add_argument('-KA', '--kafka_adapter',
207 dest='kafka_adapter',
208 action='store',
209 default=defs['kafka_adapter'],
210 help=_help)
211
212 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
213 'If not '
214 'specified (None), the address from the config file is used'
215 % defs['kafka_cluster'])
216 parser.add_argument('-KC', '--kafka_cluster',
217 dest='kafka_cluster',
218 action='store',
219 default=defs['kafka_cluster'],
220 help=_help)
221
222 _help = 'backend to use for config persistence'
223 parser.add_argument('-b', '--backend',
224 default=defs['backend'],
225 choices=['none', 'consul', 'etcd'],
226 help=_help)
227
228 _help = 'topic of core on the kafka bus'
229 parser.add_argument('-ct', '--core_topic',
230 dest='core_topic',
231 action='store',
232 default=defs['core_topic'],
233 help=_help)
234
235 _help = 'Enable remote python debug'
236 parser.add_argument('-de', '--debug_enabled',
237 dest='debug_enabled',
238 action='store_true',
239 default=defs['debug_enabled'],
240 help=_help)
241
242 _help = 'remote debug hostname or IP address'
243 parser.add_argument('-dh', '--debug_host',
244 dest='debug_host',
245 action='store',
246 default=defs['debug_host'],
247 help=_help)
248
249 _help = 'remote debug port number'
250 parser.add_argument('-dp', '--debug_port',
251 dest='debug_port',
252 action='store',
253 default=defs['debug_port'],
254 help=_help)
255
256 args = parser.parse_args()
257
258 # post-processing
259
260 if args.instance_id_is_container_name:
261 args.instance_id = get_my_containers_name()
262
263 return args
264
265
266def setup_remote_debug(host, port, logger):
267 try:
268 import sys
269 sys.path.append('/voltha/pydevd/pycharm-debug.egg')
270 import pydevd
271 # Initial breakpoint
272
273 pydevd.settrace(host, port=port, stdoutToServer=True, stderrToServer=True, suspend=False)
274
275 except ImportError:
276 logger.error('Error importing pydevd package')
277 logger.error('REMOTE DEBUGGING will not be supported in this run...')
278 # Continue on, you do not want to completely kill VOLTHA, you just need to fix it.
279
280 except AttributeError:
281 logger.error('Attribute error. Perhaps try to explicitly set PYTHONPATH to'
282 'pydevd directory and rlogger.errorun again?')
283 logger.error('REMOTE DEBUGGING will not be supported in this run...')
284 # Continue on, you do not want to completely kill VOLTHA, you just need to fix it.
285
286 except:
287 import sys
288 logger.error("pydevd startup exception: %s" % sys.exc_info()[0])
289 print('REMOTE DEBUGGING will not be supported in this run...')
290
291
292def load_config(args):
293 path = args.config
294 if path.startswith('.'):
295 dir = os.path.dirname(os.path.abspath(__file__))
296 path = os.path.join(dir, path)
297 path = os.path.abspath(path)
298 with open(path) as fd:
299 config = yaml.load(fd)
300 return config
301
302
303def print_banner(log):
304 log.info(" _____ _______ _____ _ _ ____ _ _______ ")
305 log.info(" /\ | __ \__ __| __ \ /\ | \ | | / __ \| \ | | | | |")
306 log.info(" / \ | | | | | | | |__) | / \ | \| | | | | | \| | | | |")
307 log.info(" / /\ \ | | | | | | | _ / / /\ \ | . ` | | | | | . ` | | | |")
308 log.info(" / ____ \| |__| | | | | | \ \ / ____ \| |\ | | |__| | |\ | |__| |")
309 log.info(" /_/ \_\_____/ |_| |_| _\_\/_/ \_\_| \_| \____/|_| \_|\____/ ")
310 log.info(" /\ | | | | ")
311 log.info(" / \ __| | __ _ _ __ | |_ ___ _ __ ")
312 log.info(" / /\ \ / _` |/ _` | '_ \| __/ _ \ '__| ")
313 log.info(" / ____ \ (_| | (_| | |_) | || __/ | ")
314 log.info(" /_/ \_\__,_|\__,_| .__/ \__\___|_| ")
315 log.info(" | | ")
316 log.info(" |_| ")
317 log.info("")
318 log.info('(to stop: press Ctrl-C)')
319
320
321@implementer(IComponent)
322class Main(object):
323
324 def __init__(self):
325
326 self.args = args = parse_args()
327 self.config = load_config(args)
328
329 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
330 self.log = setup_logging(self.config.get('logging', {}),
331 args.instance_id,
332 verbosity_adjust=verbosity_adjust)
333 self.log.info('container-number-extractor',
334 regex=args.container_name_regex)
335
336 if args.debug_enabled:
337 setup_remote_debug(args.debug_host, args.debug_port, self.log)
338
339 self.adtran_onu_adapter_version = self.get_version()
340 self.log.info('ADTRAN-ONU-Adapter-Version', version=self.adtran_onu_adapter_version)
341
342 if not args.no_banner:
343 print_banner(self.log)
344
345 self.adapter = None
346 self.core_proxy = None
347 self.adapter_proxy = None
348
349 # Create a unique instance id using the passed-in instance id and
350 # UTC timestamp
351 current_time = arrow.utcnow().timestamp
352 self.instance_id = self.args.instance_id + '_' + str(current_time)
353
354 self.core_topic = args.core_topic
355 self.listening_topic = args.name
356 self.startup_components()
357
358 if not args.no_heartbeat:
359 self.start_heartbeat()
360 self.start_kafka_cluster_heartbeat(self.instance_id)
361
362 def get_version(self):
363 path = defs['version_file']
364 if not path.startswith('/'):
365 dir = os.path.dirname(os.path.abspath(__file__))
366 path = os.path.join(dir, path)
367
368 path = os.path.abspath(path)
369 version_file = open(path, 'r')
370 v = version_file.read()
371
372 # Use Version to validate the version string - exception will be raised
373 # if the version is invalid
374 Version(v)
375
376 version_file.close()
377 return v
378
379 def start(self):
380 self.start_reactor() # will not return except Keyboard interrupt
381
382 def stop(self):
383 pass
384
385 def get_args(self):
386 """Allow access to command line args"""
387 return self.args
388
389 def get_config(self):
390 """Allow access to content of config file"""
391 return self.config
392
393 def _get_adapter_config(self):
394 cfg = AdapterConfig()
395 return cfg
396
397 @inlineCallbacks
398 def startup_components(self):
399 try:
400 self.log.info('starting-internal-components',
401 consul=self.args.consul,
402 etcd=self.args.etcd)
403
404 registry.register('main', self)
405
406 # Update the logger to output the vcore id.
407 self.log = update_logging(instance_id=self.instance_id,
408 vcore_id=None)
409
410 yield registry.register(
411 'kafka_cluster_proxy',
412 KafkaProxy(
413 self.args.consul,
414 self.args.kafka_cluster,
415 config=self.config.get('kafka-cluster-proxy', {})
416 )
417 ).start()
418
419 config = self._get_adapter_config()
420
421 self.core_proxy = CoreProxy(
422 kafka_proxy=None,
423 core_topic=self.core_topic,
424 my_listening_topic=self.listening_topic)
425
426 self.adapter_proxy = AdapterProxy(
427 kafka_proxy=None,
428 core_topic=self.core_topic,
429 my_listening_topic=self.listening_topic)
430
431 self.adapter = AdtranOnuAdapter(core_proxy=self.core_proxy,
432 adapter_proxy=self.adapter_proxy,
433 config=config)
434
435 adtran_request_handler = AdapterRequestFacade(adapter=self.adapter)
436
437 yield registry.register(
438 'kafka_adapter_proxy',
439 IKafkaMessagingProxy(
440 kafka_host_port=self.args.kafka_adapter,
441 # TODO: Add KV Store object reference
442 kv_store=self.args.backend,
443 default_topic=self.args.name,
444 group_id_prefix=self.args.instance_id,
445 target_cls=adtran_request_handler
446 )
447 ).start()
448
449 self.core_proxy.kafka_proxy = get_messaging_proxy()
450 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
451
452 # retry for ever
453 res = yield self._register_with_core(-1)
454
455 self.log.info('started-internal-services')
456
457 except Exception as e:
458 self.log.exception('Failure-to-start-all-components', e=e)
459
460 @inlineCallbacks
461 def shutdown_components(self):
462 """Execute before the reactor is shut down"""
463 self.log.info('exiting-on-keyboard-interrupt')
464 for component in reversed(registry.iterate()):
465 yield component.stop()
466
467 import threading
468 self.log.info('THREADS:')
469 main_thread = threading.current_thread()
470 for t in threading.enumerate():
471 if t is main_thread:
472 continue
473 if not t.isDaemon():
474 continue
475 self.log.info('joining thread {} {}'.format(
476 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
477 t.join()
478
479 def start_reactor(self):
480 from twisted.internet import reactor
481 reactor.callWhenRunning(
482 lambda: self.log.info('twisted-reactor-started'))
483 reactor.addSystemEventTrigger('before', 'shutdown',
484 self.shutdown_components)
485 reactor.run()
486
487 @inlineCallbacks
488 def _register_with_core(self, retries):
489 while 1:
490 try:
491 resp = yield self.core_proxy.register(
492 self.adapter.adapter_descriptor(),
493 self.adapter.device_types())
494 if resp:
495 self.log.info('registered-with-core',
496 coreId=resp.instance_id)
497 returnValue(resp)
498 except TimeOutError as e:
499 self.log.warn("timeout-when-registering-with-core", e=e)
500 if retries == 0:
501 self.log.exception("no-more-retries", e=e)
502 raise
503 else:
504 retries = retries if retries < 0 else retries - 1
505 yield asleep(defs['retry_interval'])
506 except Exception as e:
507 self.log.exception("failed-registration", e=e)
508 raise
509
510 def start_heartbeat(self):
511
512 t0 = time.time()
513 t0s = time.ctime(t0)
514
515 def heartbeat():
516 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
517
518 lc = LoopingCall(heartbeat)
519 lc.start(10)
520
521 # Temporary function to send a heartbeat message to the external kafka
522 # broker
523 def start_kafka_cluster_heartbeat(self, instance_id):
524 # For heartbeat we will send a message to a specific "voltha-heartbeat"
525 # topic. The message is a protocol buf
526 # message
527 message = dict(
528 type='heartbeat',
529 adapter=self.args.name,
530 instance=instance_id,
531 ip=get_my_primary_local_ipv4()
532 )
533 topic = defs['heartbeat_topic']
534
535 def send_msg(start_time):
536 try:
537 kafka_cluster_proxy = get_kafka_proxy()
538 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
539 # self.log.debug('kafka-proxy-available')
540 message['ts'] = arrow.utcnow().timestamp
541 message['uptime'] = time.time() - start_time
542 # self.log.debug('start-kafka-heartbeat')
543 kafka_cluster_proxy.send_message(topic, dumps(message))
544 else:
545 self.log.error('kafka-proxy-unavailable')
546 except Exception, err:
547 self.log.exception('failed-sending-message-heartbeat', e=err)
548
549 try:
550 t0 = time.time()
551 lc = LoopingCall(send_msg, t0)
552 lc.start(10)
553 except Exception, e:
554 self.log.exception('failed-kafka-heartbeat', e=e)
555
556
557if __name__ == '__main__':
558 Main().start()