blob: b1e9873c598872233ab5ee3036ec76338e837592 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050020from __future__ import absolute_import
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050021import argparse
22import os
23import time
Matt Jeanneret08a8e862019-12-20 14:02:32 -050024import types
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050025
26import arrow
27import yaml
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050028import socketserver
Matt Jeanneret08a8e862019-12-20 14:02:32 -050029import configparser
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000030
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050031from simplejson import dumps
32from twisted.internet.defer import inlineCallbacks, returnValue
33from twisted.internet.task import LoopingCall
34from zope.interface import implementer
35
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050036from pyvoltha.common.structlog_setup import setup_logging, update_logging
37from pyvoltha.common.utils.asleep import asleep
38from pyvoltha.common.utils.deferred_utils import TimeOutError
39from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
40from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050041 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050042from pyvoltha.common.utils.registry import registry, IComponent
43from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
44from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
45from pyvoltha.adapters.kafka.core_proxy import CoreProxy
46from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050047 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050048from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050049from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050050
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050051from brcm_openomci_onu_adapter import BrcmOpenomciOnuAdapter
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000052from probe import Probe
53
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050054defs = dict(
Matt Jeanneret08a8e862019-12-20 14:02:32 -050055 build_info_file='./BUILDINFO',
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050056 config=os.environ.get('CONFIG', './openonu.yml'),
57 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
58 '0-9]+)\..*$'),
59 consul=os.environ.get('CONSUL', 'localhost:8500'),
60 name=os.environ.get('NAME', 'openonu'),
61 vendor=os.environ.get('VENDOR', 'Voltha Project'),
62 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
63 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
64 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
65 etcd=os.environ.get('ETCD', 'localhost:2379'),
66 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
Devmalya Paulffc89df2019-07-31 17:43:13 -040067 event_topic=os.environ.get('EVENT_TOPIC', 'voltha.events'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050068 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
69 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
70 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
71 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
72 backend=os.environ.get('BACKEND', 'none'),
73 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
74 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000075 probe=os.environ.get('PROBE', ':8080')
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050076)
77
78
79def parse_args():
80 parser = argparse.ArgumentParser()
81
82 _help = ('Path to openonu.yml config file (default: %s). '
83 'If relative, it is relative to main.py of openonu adapter.'
84 % defs['config'])
85 parser.add_argument('-c', '--config',
86 dest='config',
87 action='store',
88 default=defs['config'],
89 help=_help)
90
91 _help = 'Regular expression for extracting conatiner number from ' \
92 'container name (default: %s)' % defs['container_name_regex']
93 parser.add_argument('-X', '--container-number-extractor',
94 dest='container_name_regex',
95 action='store',
96 default=defs['container_name_regex'],
97 help=_help)
98
99 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
100 parser.add_argument('-C', '--consul',
101 dest='consul',
102 action='store',
103 default=defs['consul'],
104 help=_help)
105
106 _help = 'name of this adapter (default: %s)' % defs['name']
107 parser.add_argument('-na', '--name',
108 dest='name',
109 action='store',
110 default=defs['name'],
111 help=_help)
112
113 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
114 parser.add_argument('-ven', '--vendor',
115 dest='vendor',
116 action='store',
117 default=defs['vendor'],
118 help=_help)
119
120 _help = 'supported device type of this adapter (default: %s)' % defs[
121 'device_type']
122 parser.add_argument('-dt', '--device_type',
123 dest='device_type',
124 action='store',
125 default=defs['device_type'],
126 help=_help)
127
128 _help = 'specifies whether the device type accepts bulk flow updates ' \
129 'adapter (default: %s)' % defs['accept_bulk_flow']
130 parser.add_argument('-abf', '--accept_bulk_flow',
131 dest='accept_bulk_flow',
132 action='store',
133 default=defs['accept_bulk_flow'],
134 help=_help)
135
136 _help = 'specifies whether the device type accepts add/remove flow ' \
137 '(default: %s)' % defs['accept_atomic_flow']
138 parser.add_argument('-aaf', '--accept_atomic_flow',
139 dest='accept_atomic_flow',
140 action='store',
141 default=defs['accept_atomic_flow'],
142 help=_help)
143
144 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
145 parser.add_argument('-e', '--etcd',
146 dest='etcd',
147 action='store',
148 default=defs['etcd'],
149 help=_help)
150
151 _help = ('unique string id of this container instance (default: %s)'
152 % defs['instance_id'])
153 parser.add_argument('-i', '--instance-id',
154 dest='instance_id',
155 action='store',
156 default=defs['instance_id'],
157 help=_help)
158
159 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
160 parser.add_argument('-I', '--interface',
161 dest='interface',
162 action='store',
163 default=defs['interface'],
164 help=_help)
165
166 _help = 'omit startup banner log lines'
167 parser.add_argument('-n', '--no-banner',
168 dest='no_banner',
169 action='store_true',
170 default=False,
171 help=_help)
172
173 _help = 'do not emit periodic heartbeat log messages'
174 parser.add_argument('-N', '--no-heartbeat',
175 dest='no_heartbeat',
176 action='store_true',
177 default=False,
178 help=_help)
179
180 _help = "suppress debug and info logs"
181 parser.add_argument('-q', '--quiet',
182 dest='quiet',
183 action='count',
184 help=_help)
185
186 _help = 'enable verbose logging'
187 parser.add_argument('-v', '--verbose',
188 dest='verbose',
189 action='count',
190 help=_help)
191
192 _help = ('use docker container name as conatiner instance id'
193 ' (overrides -i/--instance-id option)')
194 parser.add_argument('--instance-id-is-container-name',
195 dest='instance_id_is_container_name',
196 action='store_true',
197 default=False,
198 help=_help)
199
200 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
201 'If not '
202 'specified (None), the address from the config file is used'
203 % defs['kafka_adapter'])
204 parser.add_argument('-KA', '--kafka_adapter',
205 dest='kafka_adapter',
206 action='store',
207 default=defs['kafka_adapter'],
208 help=_help)
209
210 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
211 'If not '
212 'specified (None), the address from the config file is used'
213 % defs['kafka_cluster'])
214 parser.add_argument('-KC', '--kafka_cluster',
215 dest='kafka_cluster',
216 action='store',
217 default=defs['kafka_cluster'],
218 help=_help)
219
220 _help = 'backend to use for config persitence'
221 parser.add_argument('-b', '--backend',
222 default=defs['backend'],
223 choices=['none', 'consul', 'etcd'],
224 help=_help)
225
226 _help = 'topic of core on the kafka bus'
227 parser.add_argument('-ct', '--core_topic',
228 dest='core_topic',
229 action='store',
230 default=defs['core_topic'],
231 help=_help)
232
Devmalya Paulffc89df2019-07-31 17:43:13 -0400233 _help = 'topic of events on the kafka bus'
234 parser.add_argument('-et', '--event_topic',
235 dest='event_topic',
236 action='store',
237 default=defs['event_topic'],
238 help=_help)
239
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000240 _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
241 parser.add_argument(
242 '-P', '--probe', dest='probe', action='store',
243 default=defs['probe'],
244 help=_help)
245
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500246 args = parser.parse_args()
247
248 # post-processing
249
250 if args.instance_id_is_container_name:
251 args.instance_id = get_my_containers_name()
252
253 return args
254
255
256def load_config(args):
257 path = args.config
258 if path.startswith('.'):
259 dir = os.path.dirname(os.path.abspath(__file__))
260 path = os.path.join(dir, path)
261 path = os.path.abspath(path)
262 with open(path) as fd:
263 config = yaml.load(fd)
264 return config
265
266
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500267def get_build_info():
268 path = defs['build_info_file']
269 if not path.startswith('/'):
270 dir = os.path.dirname(os.path.abspath(__file__))
271 path = os.path.join(dir, path)
272 path = os.path.abspath(path)
273 build_info = configparser.ConfigParser()
274 build_info.read(path)
275 results = types.SimpleNamespace(
276 version=build_info.get('buildinfo', 'version', fallback='unknown'),
277 vcs_ref=build_info.get('buildinfo', 'vcs_ref', fallback='unknown'),
278 vcs_dirty=build_info.get('buildinfo', 'vcs_dirty', fallback='unknown'),
279 build_time=build_info.get('buildinfo', 'build_time', fallback='unknown')
280 )
281 return results
282
283
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500284def print_banner(log):
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500285 log.info(' ___________ _____ _ _ _____ _ _ _ _ ')
286 log.info(' | _ | ___ \ ___| \ | | _ | \ | | | | | ')
287 log.info(' | | | | |_/ / |__ | \| | | | | \| | | | | ')
288 log.info(' | | | | __/| __|| . ` | | | | . ` | | | | ')
289 log.info(' \ \_/ / | | |___| |\ \ \_/ / |\ | |_| | ')
290 log.info(' \___/\_| \____/\_| \_/\___/\_| \_/\___/ ')
291 log.info(' ')
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500292
293
294@implementer(IComponent)
295class Main(object):
296
297 def __init__(self):
298
299 self.args = args = parse_args()
300 self.config = load_config(args)
301
Matteo Scandolod8d73172019-11-26 12:15:15 -0700302 # log levels in python are:
303 # 1 - DEBUG => verbosity_adjust = 0
304 # 2 - INFO => verbosity_adjust = 1
305 # 3 - WARNING => verbosity_adjust = 2
306 # 4 - ERROR
307 # 5 - CRITICAL
308 # If no flags are set we want to stick with INFO,
309 # if verbose is set we want to go down to DEBUG
310 # if quiet is set we want to go up to WARNING
311 # if you set both, you're doing something non-sense and you'll be back at INFO
312
313 verbosity_adjust = 1 - (args.verbose or 0) + (args.quiet or 0)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500314 self.log = setup_logging(self.config.get('logging', {}),
315 args.instance_id,
316 verbosity_adjust=verbosity_adjust)
317 self.log.info('container-number-extractor',
318 regex=args.container_name_regex)
319
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500320 self.build_info = get_build_info()
321 self.log.info('OpenONU-Adapter-Version', build_version=self.build_info)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500322
323 if not args.no_banner:
324 print_banner(self.log)
325
326 self.adapter = None
327 # Create a unique instance id using the passed-in instance id and
328 # UTC timestamp
329 current_time = arrow.utcnow().timestamp
330 self.instance_id = self.args.instance_id + '_' + str(current_time)
331
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500332 self.core_topic = str(args.core_topic)
333 self.event_topic = str(args.event_topic)
334 self.listening_topic = str(args.name)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500335 self.startup_components()
336
337 if not args.no_heartbeat:
338 self.start_heartbeat()
339 self.start_kafka_cluster_heartbeat(self.instance_id)
340
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500341 def start(self):
342 self.start_reactor() # will not return except Keyboard interrupt
343
344 def stop(self):
345 pass
346
347 def get_args(self):
348 """Allow access to command line args"""
349 return self.args
350
351 def get_config(self):
352 """Allow access to content of config file"""
353 return self.config
354
355 def _get_adapter_config(self):
356 cfg = AdapterConfig()
357 return cfg
358
359 @inlineCallbacks
360 def startup_components(self):
361 try:
362 self.log.info('starting-internal-components',
363 consul=self.args.consul,
364 etcd=self.args.etcd)
365
366 registry.register('main', self)
367
368 # Update the logger to output the vcore id.
369 self.log = update_logging(instance_id=self.instance_id,
370 vcore_id=None)
371
372 yield registry.register(
373 'kafka_cluster_proxy',
374 KafkaProxy(
375 self.args.consul,
376 self.args.kafka_cluster,
377 config=self.config.get('kafka-cluster-proxy', {})
378 )
379 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000380 Probe.kafka_cluster_proxy_running = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500381
382 config = self._get_adapter_config()
383
384 self.core_proxy = CoreProxy(
385 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500386 default_core_topic=self.core_topic,
Devmalya Paulffc89df2019-07-31 17:43:13 -0400387 default_event_topic=self.event_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500388 my_listening_topic=self.listening_topic)
389
390 self.adapter_proxy = AdapterProxy(
391 kafka_proxy=None,
392 core_topic=self.core_topic,
393 my_listening_topic=self.listening_topic)
394
395 self.adapter = BrcmOpenomciOnuAdapter(
396 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500397 config=config,
398 build_info=self.build_info)
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500399
Matt Jeannereta32441c2019-03-07 05:16:37 -0500400 self.adapter.start()
401
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500402 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
403 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500404
405 yield registry.register(
406 'kafka_adapter_proxy',
407 IKafkaMessagingProxy(
408 kafka_host_port=self.args.kafka_adapter,
409 # TODO: Add KV Store object reference
410 kv_store=self.args.backend,
411 default_topic=self.args.name,
412 group_id_prefix=self.args.instance_id,
413 target_cls=openonu_request_handler
414 )
415 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000416 Probe.kafka_adapter_proxy_running = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500417
418 self.core_proxy.kafka_proxy = get_messaging_proxy()
419 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
420
421 # retry for ever
422 res = yield self._register_with_core(-1)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000423 Probe.register_adapter_with_core = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500424
425 self.log.info('started-internal-services')
426
427 except Exception as e:
428 self.log.exception('Failure-to-start-all-components', e=e)
429
430 @inlineCallbacks
431 def shutdown_components(self):
432 """Execute before the reactor is shut down"""
433 self.log.info('exiting-on-keyboard-interrupt')
434 for component in reversed(registry.iterate()):
435 yield component.stop()
436
437 import threading
438 self.log.info('THREADS:')
439 main_thread = threading.current_thread()
440 for t in threading.enumerate():
441 if t is main_thread:
442 continue
443 if not t.isDaemon():
444 continue
445 self.log.info('joining thread {} {}'.format(
446 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
447 t.join()
448
449 def start_reactor(self):
450 from twisted.internet import reactor
451 reactor.callWhenRunning(
452 lambda: self.log.info('twisted-reactor-started'))
453 reactor.addSystemEventTrigger('before', 'shutdown',
454 self.shutdown_components)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000455 reactor.callInThread(self.start_probe)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500456 reactor.run()
457
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000458 def start_probe(self):
459 args = self.args
460 host = args.probe.split(':')[0]
461 port = args.probe.split(':')[1]
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500462 server = socketserver.TCPServer((host, int(port)), Probe)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000463 server.serve_forever()
464
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500465 @inlineCallbacks
466 def _register_with_core(self, retries):
467 while 1:
468 try:
469 resp = yield self.core_proxy.register(
470 self.adapter.adapter_descriptor(),
471 self.adapter.device_types())
472 if resp:
473 self.log.info('registered-with-core',
474 coreId=resp.instance_id)
475
476 returnValue(resp)
477 except TimeOutError as e:
478 self.log.warn("timeout-when-registering-with-core", e=e)
479 if retries == 0:
480 self.log.exception("no-more-retries", e=e)
481 raise
482 else:
483 retries = retries if retries < 0 else retries - 1
484 yield asleep(defs['retry_interval'])
485 except Exception as e:
486 self.log.exception("failed-registration", e=e)
487 raise
488
489 def start_heartbeat(self):
490
491 t0 = time.time()
492 t0s = time.ctime(t0)
493
494 def heartbeat():
495 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
496
497 lc = LoopingCall(heartbeat)
498 lc.start(10)
499
500 # Temporary function to send a heartbeat message to the external kafka
501 # broker
502 def start_kafka_cluster_heartbeat(self, instance_id):
503 # For heartbeat we will send a message to a specific "voltha-heartbeat"
504 # topic. The message is a protocol buf
505 # message
506 message = dict(
507 type='heartbeat',
508 adapter=self.args.name,
509 instance=instance_id,
510 ip=get_my_primary_local_ipv4()
511 )
512 topic = defs['heartbeat_topic']
513
514 def send_msg(start_time):
515 try:
516 kafka_cluster_proxy = get_kafka_proxy()
517 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
518 # self.log.debug('kafka-proxy-available')
519 message['ts'] = arrow.utcnow().timestamp
520 message['uptime'] = time.time() - start_time
521 # self.log.debug('start-kafka-heartbeat')
522 kafka_cluster_proxy.send_message(topic, dumps(message))
523 else:
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000524 Probe.kafka_cluster_proxy_running = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500525 self.log.error('kafka-proxy-unavailable')
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500526 except Exception as e:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500527 self.log.exception('failed-sending-message-heartbeat', e=e)
528
529 try:
530 t0 = time.time()
531 lc = LoopingCall(send_msg, t0)
532 lc.start(10)
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500533 except Exception as e:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500534 self.log.exception('failed-kafka-heartbeat', e=e)
535
536
537if __name__ == '__main__':
538 Main().start()