blob: bea91bfdbd581ac9f1e267a15d0da7cab11fa942 [file] [log] [blame]
Chip Bolingf5af85d2019-02-12 15:36:17 -06001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Adtran OLT Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
32from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
37 get_my_primary_interface
38from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
43 get_messaging_proxy
44from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from adtran_olt import AdtranOltAdapter
46from pyvoltha.protos import third_party
47from pyvoltha.protos.adapter_pb2 import AdapterConfig
48
49_ = third_party
50
51
Chip Bolingd2d7a4d2019-03-14 14:34:56 -050052defs=dict(
53 version_file='/voltha/VERSION',
54 config=os.environ.get('CONFIG', './adapters-adtran-olt.yml'),
Chip Bolingf5af85d2019-02-12 15:36:17 -060055 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
56 '0-9]+)\..*$'),
57 consul=os.environ.get('CONSUL', 'localhost:8500'),
58 name=os.environ.get('NAME', 'adtran_olt'),
59 vendor=os.environ.get('VENDOR', 'Voltha Project'),
60 device_type=os.environ.get('DEVICE_TYPE', 'adtran_olt'),
61 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
62 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
63 etcd=os.environ.get('ETCD', 'localhost:2379'),
64 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
65 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
66 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
Chip Bolingd2d7a4d2019-03-14 14:34:56 -050067 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
68 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
Chip Bolingf5af85d2019-02-12 15:36:17 -060069 backend=os.environ.get('BACKEND', 'none'),
70 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
71 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
72
73 # Following are for debugging
74 debug_enabled=True,
75 debug_host='work.bcsw.net',
76 # debug_host='10.0.2.15',
Chip Bolingd2d7a4d2019-03-14 14:34:56 -050077 debug_port=5678,
Chip Bolingf5af85d2019-02-12 15:36:17 -060078)
79
80
81def parse_args():
82 parser = argparse.ArgumentParser()
83
84 _help = ('Path to adapters-adtran_olt.yml config file (default: %s). '
85 'If relative, it is relative to main.py of Adtran OLT adapter.'
86 % defs['config'])
87 parser.add_argument('-c', '--config',
88 dest='config',
89 action='store',
90 default=defs['config'],
91 help=_help)
92
93 _help = 'Regular expression for extracting container number from ' \
94 'container name (default: %s)' % defs['container_name_regex']
95 parser.add_argument('-X', '--container-number-extractor',
96 dest='container_name_regex',
97 action='store',
98 default=defs['container_name_regex'],
99 help=_help)
100
101 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
102 parser.add_argument('-C', '--consul',
103 dest='consul',
104 action='store',
105 default=defs['consul'],
106 help=_help)
107
108 _help = 'name of this adapter (default: %s)' % defs['name']
109 parser.add_argument('-na', '--name',
110 dest='name',
111 action='store',
112 default=defs['name'],
113 help=_help)
114
115 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
116 parser.add_argument('-ven', '--vendor',
117 dest='vendor',
118 action='store',
119 default=defs['vendor'],
120 help=_help)
121
122 _help = 'supported device type of this adapter (default: %s)' % defs[
123 'device_type']
124 parser.add_argument('-dt', '--device_type',
125 dest='device_type',
126 action='store',
127 default=defs['device_type'],
128 help=_help)
129
130 _help = 'specifies whether the device type accepts bulk flow updates ' \
131 'adapter (default: %s)' % defs['accept_bulk_flow']
132 parser.add_argument('-abf', '--accept_bulk_flow',
133 dest='accept_bulk_flow',
134 action='store',
135 default=defs['accept_bulk_flow'],
136 help=_help)
137
138 _help = 'specifies whether the device type accepts add/remove flow ' \
139 '(default: %s)' % defs['accept_atomic_flow']
140 parser.add_argument('-aaf', '--accept_atomic_flow',
141 dest='accept_atomic_flow',
142 action='store',
143 default=defs['accept_atomic_flow'],
144 help=_help)
145
146 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
147 parser.add_argument('-e', '--etcd',
148 dest='etcd',
149 action='store',
150 default=defs['etcd'],
151 help=_help)
152
153 _help = ('unique string id of this container instance (default: %s)'
154 % defs['instance_id'])
155 parser.add_argument('-i', '--instance-id',
156 dest='instance_id',
157 action='store',
158 default=defs['instance_id'],
159 help=_help)
160
161 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
162 parser.add_argument('-I', '--interface',
163 dest='interface',
164 action='store',
165 default=defs['interface'],
166 help=_help)
167
168 _help = 'omit startup banner log lines'
169 parser.add_argument('-n', '--no-banner',
170 dest='no_banner',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = 'do not emit periodic heartbeat log messages'
176 parser.add_argument('-N', '--no-heartbeat',
177 dest='no_heartbeat',
178 action='store_true',
179 default=False,
180 help=_help)
181
182 _help = "suppress debug and info logs"
183 parser.add_argument('-q', '--quiet',
184 dest='quiet',
185 action='count',
186 help=_help)
187
188 _help = 'enable verbose logging'
189 parser.add_argument('-v', '--verbose',
190 dest='verbose',
191 action='count',
192 help=_help)
193
194 _help = ('use docker container name as container instance id'
195 ' (overrides -i/--instance-id option)')
196 parser.add_argument('--instance-id-is-container-name',
197 dest='instance_id_is_container_name',
198 action='store_true',
199 default=False,
200 help=_help)
201
202 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
203 'If not '
204 'specified (None), the address from the config file is used'
205 % defs['kafka_adapter'])
206 parser.add_argument('-KA', '--kafka_adapter',
207 dest='kafka_adapter',
208 action='store',
209 default=defs['kafka_adapter'],
210 help=_help)
211
212 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
213 'If not '
214 'specified (None), the address from the config file is used'
215 % defs['kafka_cluster'])
216 parser.add_argument('-KC', '--kafka_cluster',
217 dest='kafka_cluster',
218 action='store',
219 default=defs['kafka_cluster'],
220 help=_help)
221
222 _help = 'backend to use for config persistence'
223 parser.add_argument('-b', '--backend',
224 default=defs['backend'],
225 choices=['none', 'consul', 'etcd'],
226 help=_help)
227
228 _help = 'topic of core on the kafka bus'
229 parser.add_argument('-ct', '--core_topic',
230 dest='core_topic',
231 action='store',
232 default=defs['core_topic'],
233 help=_help)
234
235 _help = 'Enable remote python debug'
236 parser.add_argument('-de', '--debug_enabled',
237 dest='debug_enabled',
238 action='store_true',
239 default=defs['debug_enabled'],
240 help=_help)
241
242 _help = 'remote debug hostname or IP address'
243 parser.add_argument('-dh', '--debug_host',
244 dest='debug_host',
245 action='store',
246 default=defs['debug_host'],
247 help=_help)
248
249 _help = 'remote debug port number'
250 parser.add_argument('-dp', '--debug_port',
251 dest='debug_port',
252 action='store',
253 default=defs['debug_port'],
254 help=_help)
255
256 args = parser.parse_args()
257
258 # post-processing
259
260 if args.instance_id_is_container_name:
261 args.instance_id = get_my_containers_name()
262
263 return args
264
265
266def setup_remote_debug(host, port, logger):
267 try:
268 import sys
269 sys.path.append('/voltha/pydevd/pycharm-debug.egg')
270 import pydevd
271 # Initial breakpoint
272
273 pydevd.settrace(host, port=port, stdoutToServer=True, stderrToServer=True, suspend=False)
274
275 except ImportError:
276 logger.error('Error importing pydevd package')
277 logger.error('REMOTE DEBUGGING will not be supported in this run...')
278 # Continue on, you do not want to completely kill VOLTHA, you just need to fix it.
279
280 except AttributeError:
281 logger.error('Attribute error. Perhaps try to explicitly set PYTHONPATH to'
282 'pydevd directory and rlogger.errorun again?')
283 logger.error('REMOTE DEBUGGING will not be supported in this run...')
284 # Continue on, you do not want to completely kill VOLTHA, you just need to fix it.
285
286 except:
287 import sys
288 logger.error("pydevd startup exception: %s" % sys.exc_info()[0])
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500289 logger.error('REMOTE DEBUGGING will not be supported in this run...')
Chip Bolingf5af85d2019-02-12 15:36:17 -0600290
291
292def load_config(args):
293 path = args.config
294 if path.startswith('.'):
295 dir = os.path.dirname(os.path.abspath(__file__))
296 path = os.path.join(dir, path)
297 path = os.path.abspath(path)
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500298
Chip Bolingf5af85d2019-02-12 15:36:17 -0600299 with open(path) as fd:
300 config = yaml.load(fd)
301 return config
302
303
304def print_banner(log):
305 log.info(" _____ _______ _____ _ _ ____ _ _______ ")
306 log.info(" /\ | __ \__ __| __ \ /\ | \ | | / __ \| | |__ __|")
307 log.info(" / \ | | | | | | | |__) | / \ | \| | | | | | | | | ")
308 log.info(" / /\ \ | | | | | | | _ / / /\ \ | . ` | | | | | | | | ")
309 log.info(" / ____ \| |__| | | | | | \ \ / ____ \| |\ | | |__| | |____| | ")
310 log.info(" /_/ \_\_____/ |_| |_| _\_\/_/ \_\_| \_| \____/|______|_| ")
311 log.info(" /\ | | | | ")
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500312 log.info(' _ _ _ ')
Chip Bolingf5af85d2019-02-12 15:36:17 -0600313 log.info(" / \ __| | __ _ _ __ | |_ ___ _ __ ")
314 log.info(" / /\ \ / _` |/ _` | '_ \| __/ _ \ '__| ")
315 log.info(" / ____ \ (_| | (_| | |_) | || __/ | ")
316 log.info(" /_/ \_\__,_|\__,_| .__/ \__\___|_| ")
317 log.info(" | | ")
318 log.info(" |_| ")
319 log.info("")
320 log.info('(to stop: press Ctrl-C)')
321
322
323@implementer(IComponent)
324class Main(object):
325
326 def __init__(self):
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500327 try:
328 self.args = args = parse_args()
329 self.config = load_config(args)
Chip Bolingf5af85d2019-02-12 15:36:17 -0600330
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500331 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
332 self.log = setup_logging(self.config.get('logging', {}),
333 args.instance_id,
334 verbosity_adjust=verbosity_adjust)
335 self.log.info('container-number-extractor',
336 regex=args.container_name_regex)
Chip Bolingf5af85d2019-02-12 15:36:17 -0600337
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500338 if args.debug_enabled:
339 setup_remote_debug(args.debug_host, args.debug_port, self.log)
Chip Bolingf5af85d2019-02-12 15:36:17 -0600340
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500341 self.adtran_olt_adapter_version = self.get_version()
342 self.log.info('ADTRAN-OLT-Adapter-Version',
343 version=self.adtran_olt_adapter_version)
Chip Bolingf5af85d2019-02-12 15:36:17 -0600344
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500345 if not args.no_banner:
346 print_banner(self.log)
Chip Bolingf5af85d2019-02-12 15:36:17 -0600347
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500348 self.adapter = None
Chip Bolingf5af85d2019-02-12 15:36:17 -0600349
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500350 # Create a unique instance id using the passed-in instance id and
351 # UTC timestamp
352 current_time = arrow.utcnow().timestamp
353 self.instance_id = self.args.instance_id + '_' + str(current_time)
Chip Bolingf5af85d2019-02-12 15:36:17 -0600354
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500355 self.core_topic = args.core_topic
356 self.listening_topic = args.name
357 self.startup_components()
Chip Bolingf5af85d2019-02-12 15:36:17 -0600358
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500359 if not args.no_heartbeat:
360 self.start_heartbeat()
361 self.start_kafka_cluster_heartbeat(self.instance_id)
Chip Bolingf5af85d2019-02-12 15:36:17 -0600362
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500363 except Exception as e:
364 self.log.exception('unhandled-exception', e=e)
365 raise
Chip Bolingf5af85d2019-02-12 15:36:17 -0600366
367 def get_version(self):
368 path = defs['version_file']
369 if not path.startswith('/'):
370 dir = os.path.dirname(os.path.abspath(__file__))
371 path = os.path.join(dir, path)
372
373 path = os.path.abspath(path)
374 version_file = open(path, 'r')
375 v = version_file.read()
376
377 # Use Version to validate the version string - exception will be raised
378 # if the version is invalid
379 Version(v)
380
381 version_file.close()
382 return v
383
384 def start(self):
385 self.start_reactor() # will not return except Keyboard interrupt
386
387 def stop(self):
388 pass
389
390 def get_args(self):
391 """Allow access to command line args"""
392 return self.args
393
394 def get_config(self):
395 """Allow access to content of config file"""
396 return self.config
397
398 def _get_adapter_config(self):
399 cfg = AdapterConfig()
400 return cfg
401
402 @inlineCallbacks
403 def startup_components(self):
404 try:
405 self.log.info('starting-internal-components',
406 consul=self.args.consul,
407 etcd=self.args.etcd)
408
409 registry.register('main', self)
410
411 # Update the logger to output the vcore id.
412 self.log = update_logging(instance_id=self.instance_id,
413 vcore_id=None)
414
415 yield registry.register(
416 'kafka_cluster_proxy',
417 KafkaProxy(
418 self.args.consul,
419 self.args.kafka_cluster,
420 config=self.config.get('kafka-cluster-proxy', {})
421 )
422 ).start()
423
424 config = self._get_adapter_config()
425
426 self.core_proxy = CoreProxy(
427 kafka_proxy=None,
428 core_topic=self.core_topic,
429 my_listening_topic=self.listening_topic)
430
431 self.adapter_proxy = AdapterProxy(
432 kafka_proxy=None,
433 core_topic=self.core_topic,
434 my_listening_topic=self.listening_topic)
435
436 self.adapter = AdtranOltAdapter(core_proxy=self.core_proxy,
437 adapter_proxy=self.adapter_proxy,
438 config=config)
439
440 adtran_request_handler = AdapterRequestFacade(adapter=self.adapter)
441
442 yield registry.register(
443 'kafka_adapter_proxy',
444 IKafkaMessagingProxy(
445 kafka_host_port=self.args.kafka_adapter,
446 # TODO: Add KV Store object reference
447 kv_store=self.args.backend,
448 default_topic=self.args.name,
449 group_id_prefix=self.args.instance_id,
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500450 # Needs to assign a real class
Chip Bolingf5af85d2019-02-12 15:36:17 -0600451 target_cls=adtran_request_handler
452 )
453 ).start()
454
455 self.core_proxy.kafka_proxy = get_messaging_proxy()
456 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
457
458 # retry for ever
459 res = yield self._register_with_core(-1)
460
461 self.log.info('started-internal-services')
462
463 except Exception as e:
464 self.log.exception('Failure-to-start-all-components', e=e)
465
466 @inlineCallbacks
467 def shutdown_components(self):
468 """Execute before the reactor is shut down"""
469 self.log.info('exiting-on-keyboard-interrupt')
470 for component in reversed(registry.iterate()):
471 yield component.stop()
472
473 import threading
474 self.log.info('THREADS:')
475 main_thread = threading.current_thread()
476 for t in threading.enumerate():
477 if t is main_thread:
478 continue
479 if not t.isDaemon():
480 continue
481 self.log.info('joining thread {} {}'.format(
482 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
483 t.join()
484
485 def start_reactor(self):
486 from twisted.internet import reactor
487 reactor.callWhenRunning(
488 lambda: self.log.info('twisted-reactor-started'))
489 reactor.addSystemEventTrigger('before', 'shutdown',
490 self.shutdown_components)
491 reactor.run()
492
493 @inlineCallbacks
494 def _register_with_core(self, retries):
495 while 1:
496 try:
497 resp = yield self.core_proxy.register(
498 self.adapter.adapter_descriptor(),
499 self.adapter.device_types())
500 if resp:
501 self.log.info('registered-with-core',
502 coreId=resp.instance_id)
503 returnValue(resp)
504 except TimeOutError as e:
505 self.log.warn("timeout-when-registering-with-core", e=e)
506 if retries == 0:
507 self.log.exception("no-more-retries", e=e)
508 raise
509 else:
510 retries = retries if retries < 0 else retries - 1
511 yield asleep(defs['retry_interval'])
512 except Exception as e:
513 self.log.exception("failed-registration", e=e)
514 raise
515
516 def start_heartbeat(self):
517
518 t0 = time.time()
519 t0s = time.ctime(t0)
520
521 def heartbeat():
522 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
523
524 lc = LoopingCall(heartbeat)
525 lc.start(10)
526
527 # Temporary function to send a heartbeat message to the external kafka
528 # broker
529 def start_kafka_cluster_heartbeat(self, instance_id):
530 # For heartbeat we will send a message to a specific "voltha-heartbeat"
531 # topic. The message is a protocol buf
532 # message
533 message = dict(
534 type='heartbeat',
535 adapter=self.args.name,
536 instance=instance_id,
537 ip=get_my_primary_local_ipv4()
538 )
539 topic = defs['heartbeat_topic']
540
541 def send_msg(start_time):
542 try:
543 kafka_cluster_proxy = get_kafka_proxy()
544 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
545 # self.log.debug('kafka-proxy-available')
546 message['ts'] = arrow.utcnow().timestamp
547 message['uptime'] = time.time() - start_time
548 # self.log.debug('start-kafka-heartbeat')
549 kafka_cluster_proxy.send_message(topic, dumps(message))
550 else:
551 self.log.error('kafka-proxy-unavailable')
Chip Bolingd2d7a4d2019-03-14 14:34:56 -0500552 except Exception as e:
553 self.log.exception('failed-sending-message-heartbeat', e=e)
Chip Bolingf5af85d2019-02-12 15:36:17 -0600554
555 try:
556 t0 = time.time()
557 lc = LoopingCall(send_msg, t0)
558 lc.start(10)
559 except Exception, e:
560 self.log.exception('failed-kafka-heartbeat', e=e)
561
562
563if __name__ == '__main__':
564 Main().start()