blob: 902ed0eb4f43fa06abe792eb2090e0dbbe91d824 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000026import SocketServer
27
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050028from packaging.version import Version
29from simplejson import dumps
30from twisted.internet.defer import inlineCallbacks, returnValue
31from twisted.internet.task import LoopingCall
32from zope.interface import implementer
33
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050034from pyvoltha.common.structlog_setup import setup_logging, update_logging
35from pyvoltha.common.utils.asleep import asleep
36from pyvoltha.common.utils.deferred_utils import TimeOutError
37from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
38from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050039 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050040from pyvoltha.common.utils.registry import registry, IComponent
41from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
42from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
43from pyvoltha.adapters.kafka.core_proxy import CoreProxy
44from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050045 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050046from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050047from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050048
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050049from brcm_openomci_onu import BrcmOpenomciOnuAdapter
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000050from probe import Probe
51
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050052
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050053defs = dict(
54 version_file='./VERSION',
55 config=os.environ.get('CONFIG', './openonu.yml'),
56 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
57 '0-9]+)\..*$'),
58 consul=os.environ.get('CONSUL', 'localhost:8500'),
59 name=os.environ.get('NAME', 'openonu'),
60 vendor=os.environ.get('VENDOR', 'Voltha Project'),
61 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
62 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
63 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
64 etcd=os.environ.get('ETCD', 'localhost:2379'),
65 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
Devmalya Paulffc89df2019-07-31 17:43:13 -040066 event_topic=os.environ.get('EVENT_TOPIC', 'voltha.events'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050067 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
68 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
69 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
70 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
71 backend=os.environ.get('BACKEND', 'none'),
72 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
73 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000074 probe=os.environ.get('PROBE', ':8080')
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050075)
76
77
78def parse_args():
79 parser = argparse.ArgumentParser()
80
81 _help = ('Path to openonu.yml config file (default: %s). '
82 'If relative, it is relative to main.py of openonu adapter.'
83 % defs['config'])
84 parser.add_argument('-c', '--config',
85 dest='config',
86 action='store',
87 default=defs['config'],
88 help=_help)
89
90 _help = 'Regular expression for extracting conatiner number from ' \
91 'container name (default: %s)' % defs['container_name_regex']
92 parser.add_argument('-X', '--container-number-extractor',
93 dest='container_name_regex',
94 action='store',
95 default=defs['container_name_regex'],
96 help=_help)
97
98 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
99 parser.add_argument('-C', '--consul',
100 dest='consul',
101 action='store',
102 default=defs['consul'],
103 help=_help)
104
105 _help = 'name of this adapter (default: %s)' % defs['name']
106 parser.add_argument('-na', '--name',
107 dest='name',
108 action='store',
109 default=defs['name'],
110 help=_help)
111
112 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
113 parser.add_argument('-ven', '--vendor',
114 dest='vendor',
115 action='store',
116 default=defs['vendor'],
117 help=_help)
118
119 _help = 'supported device type of this adapter (default: %s)' % defs[
120 'device_type']
121 parser.add_argument('-dt', '--device_type',
122 dest='device_type',
123 action='store',
124 default=defs['device_type'],
125 help=_help)
126
127 _help = 'specifies whether the device type accepts bulk flow updates ' \
128 'adapter (default: %s)' % defs['accept_bulk_flow']
129 parser.add_argument('-abf', '--accept_bulk_flow',
130 dest='accept_bulk_flow',
131 action='store',
132 default=defs['accept_bulk_flow'],
133 help=_help)
134
135 _help = 'specifies whether the device type accepts add/remove flow ' \
136 '(default: %s)' % defs['accept_atomic_flow']
137 parser.add_argument('-aaf', '--accept_atomic_flow',
138 dest='accept_atomic_flow',
139 action='store',
140 default=defs['accept_atomic_flow'],
141 help=_help)
142
143 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
144 parser.add_argument('-e', '--etcd',
145 dest='etcd',
146 action='store',
147 default=defs['etcd'],
148 help=_help)
149
150 _help = ('unique string id of this container instance (default: %s)'
151 % defs['instance_id'])
152 parser.add_argument('-i', '--instance-id',
153 dest='instance_id',
154 action='store',
155 default=defs['instance_id'],
156 help=_help)
157
158 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
159 parser.add_argument('-I', '--interface',
160 dest='interface',
161 action='store',
162 default=defs['interface'],
163 help=_help)
164
165 _help = 'omit startup banner log lines'
166 parser.add_argument('-n', '--no-banner',
167 dest='no_banner',
168 action='store_true',
169 default=False,
170 help=_help)
171
172 _help = 'do not emit periodic heartbeat log messages'
173 parser.add_argument('-N', '--no-heartbeat',
174 dest='no_heartbeat',
175 action='store_true',
176 default=False,
177 help=_help)
178
179 _help = "suppress debug and info logs"
180 parser.add_argument('-q', '--quiet',
181 dest='quiet',
182 action='count',
183 help=_help)
184
185 _help = 'enable verbose logging'
186 parser.add_argument('-v', '--verbose',
187 dest='verbose',
188 action='count',
189 help=_help)
190
191 _help = ('use docker container name as conatiner instance id'
192 ' (overrides -i/--instance-id option)')
193 parser.add_argument('--instance-id-is-container-name',
194 dest='instance_id_is_container_name',
195 action='store_true',
196 default=False,
197 help=_help)
198
199 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
200 'If not '
201 'specified (None), the address from the config file is used'
202 % defs['kafka_adapter'])
203 parser.add_argument('-KA', '--kafka_adapter',
204 dest='kafka_adapter',
205 action='store',
206 default=defs['kafka_adapter'],
207 help=_help)
208
209 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
210 'If not '
211 'specified (None), the address from the config file is used'
212 % defs['kafka_cluster'])
213 parser.add_argument('-KC', '--kafka_cluster',
214 dest='kafka_cluster',
215 action='store',
216 default=defs['kafka_cluster'],
217 help=_help)
218
219 _help = 'backend to use for config persitence'
220 parser.add_argument('-b', '--backend',
221 default=defs['backend'],
222 choices=['none', 'consul', 'etcd'],
223 help=_help)
224
225 _help = 'topic of core on the kafka bus'
226 parser.add_argument('-ct', '--core_topic',
227 dest='core_topic',
228 action='store',
229 default=defs['core_topic'],
230 help=_help)
231
Devmalya Paulffc89df2019-07-31 17:43:13 -0400232 _help = 'topic of events on the kafka bus'
233 parser.add_argument('-et', '--event_topic',
234 dest='event_topic',
235 action='store',
236 default=defs['event_topic'],
237 help=_help)
238
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000239
240 _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
241 parser.add_argument(
242 '-P', '--probe', dest='probe', action='store',
243 default=defs['probe'],
244 help=_help)
245
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500246 args = parser.parse_args()
247
248 # post-processing
249
250 if args.instance_id_is_container_name:
251 args.instance_id = get_my_containers_name()
252
253 return args
254
255
256def load_config(args):
257 path = args.config
258 if path.startswith('.'):
259 dir = os.path.dirname(os.path.abspath(__file__))
260 path = os.path.join(dir, path)
261 path = os.path.abspath(path)
262 with open(path) as fd:
263 config = yaml.load(fd)
264 return config
265
266
267def print_banner(log):
268 log.info(' ')
269 log.info(' OpenOnu Adapter ')
270 log.info(' ')
271 log.info('(to stop: press Ctrl-C)')
272
273
274@implementer(IComponent)
275class Main(object):
276
277 def __init__(self):
278
279 self.args = args = parse_args()
280 self.config = load_config(args)
281
282 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
283 self.log = setup_logging(self.config.get('logging', {}),
284 args.instance_id,
285 verbosity_adjust=verbosity_adjust)
286 self.log.info('container-number-extractor',
287 regex=args.container_name_regex)
288
289 self.adapter_version = self.get_version()
290 self.log.info('OpenONU-Adapter-Version', version=
291 self.adapter_version)
292
293 if not args.no_banner:
294 print_banner(self.log)
295
296 self.adapter = None
297 # Create a unique instance id using the passed-in instance id and
298 # UTC timestamp
299 current_time = arrow.utcnow().timestamp
300 self.instance_id = self.args.instance_id + '_' + str(current_time)
301
302 self.core_topic = args.core_topic
Devmalya Paulffc89df2019-07-31 17:43:13 -0400303 self.event_topic = args.event_topic
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500304 self.listening_topic = args.name
305 self.startup_components()
306
307 if not args.no_heartbeat:
308 self.start_heartbeat()
309 self.start_kafka_cluster_heartbeat(self.instance_id)
310
311 def get_version(self):
312 path = defs['version_file']
313 if not path.startswith('/'):
314 dir = os.path.dirname(os.path.abspath(__file__))
315 path = os.path.join(dir, path)
316
317 path = os.path.abspath(path)
318 version_file = open(path, 'r')
319 v = version_file.read()
320
321 # Use Version to validate the version string - exception will be raised
322 # if the version is invalid
323 Version(v)
324
325 version_file.close()
326 return v
327
328 def start(self):
329 self.start_reactor() # will not return except Keyboard interrupt
330
331 def stop(self):
332 pass
333
334 def get_args(self):
335 """Allow access to command line args"""
336 return self.args
337
338 def get_config(self):
339 """Allow access to content of config file"""
340 return self.config
341
342 def _get_adapter_config(self):
343 cfg = AdapterConfig()
344 return cfg
345
346 @inlineCallbacks
347 def startup_components(self):
348 try:
349 self.log.info('starting-internal-components',
350 consul=self.args.consul,
351 etcd=self.args.etcd)
352
353 registry.register('main', self)
354
355 # Update the logger to output the vcore id.
356 self.log = update_logging(instance_id=self.instance_id,
357 vcore_id=None)
358
359 yield registry.register(
360 'kafka_cluster_proxy',
361 KafkaProxy(
362 self.args.consul,
363 self.args.kafka_cluster,
364 config=self.config.get('kafka-cluster-proxy', {})
365 )
366 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000367 Probe.kafka_cluster_proxy_running = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500368
369 config = self._get_adapter_config()
370
371 self.core_proxy = CoreProxy(
372 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500373 default_core_topic=self.core_topic,
Devmalya Paulffc89df2019-07-31 17:43:13 -0400374 default_event_topic=self.event_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500375 my_listening_topic=self.listening_topic)
376
377 self.adapter_proxy = AdapterProxy(
378 kafka_proxy=None,
379 core_topic=self.core_topic,
380 my_listening_topic=self.listening_topic)
381
382 self.adapter = BrcmOpenomciOnuAdapter(
383 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
384 config=config)
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500385
Matt Jeannereta32441c2019-03-07 05:16:37 -0500386 self.adapter.start()
387
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500388 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
389 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500390
391 yield registry.register(
392 'kafka_adapter_proxy',
393 IKafkaMessagingProxy(
394 kafka_host_port=self.args.kafka_adapter,
395 # TODO: Add KV Store object reference
396 kv_store=self.args.backend,
397 default_topic=self.args.name,
398 group_id_prefix=self.args.instance_id,
399 target_cls=openonu_request_handler
400 )
401 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000402 Probe.kafka_adapter_proxy_running = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500403
404 self.core_proxy.kafka_proxy = get_messaging_proxy()
405 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
406
407 # retry for ever
408 res = yield self._register_with_core(-1)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000409 Probe.register_adapter_with_core = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500410
411 self.log.info('started-internal-services')
412
413 except Exception as e:
414 self.log.exception('Failure-to-start-all-components', e=e)
415
416 @inlineCallbacks
417 def shutdown_components(self):
418 """Execute before the reactor is shut down"""
419 self.log.info('exiting-on-keyboard-interrupt')
420 for component in reversed(registry.iterate()):
421 yield component.stop()
422
423 import threading
424 self.log.info('THREADS:')
425 main_thread = threading.current_thread()
426 for t in threading.enumerate():
427 if t is main_thread:
428 continue
429 if not t.isDaemon():
430 continue
431 self.log.info('joining thread {} {}'.format(
432 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
433 t.join()
434
435 def start_reactor(self):
436 from twisted.internet import reactor
437 reactor.callWhenRunning(
438 lambda: self.log.info('twisted-reactor-started'))
439 reactor.addSystemEventTrigger('before', 'shutdown',
440 self.shutdown_components)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000441 reactor.callInThread(self.start_probe)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500442 reactor.run()
443
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000444 def start_probe(self):
445 args = self.args
446 host = args.probe.split(':')[0]
447 port = args.probe.split(':')[1]
448 server = SocketServer.TCPServer((host, int(port)), Probe)
449 server.serve_forever()
450
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500451 @inlineCallbacks
452 def _register_with_core(self, retries):
453 while 1:
454 try:
455 resp = yield self.core_proxy.register(
456 self.adapter.adapter_descriptor(),
457 self.adapter.device_types())
458 if resp:
459 self.log.info('registered-with-core',
460 coreId=resp.instance_id)
461
462 returnValue(resp)
463 except TimeOutError as e:
464 self.log.warn("timeout-when-registering-with-core", e=e)
465 if retries == 0:
466 self.log.exception("no-more-retries", e=e)
467 raise
468 else:
469 retries = retries if retries < 0 else retries - 1
470 yield asleep(defs['retry_interval'])
471 except Exception as e:
472 self.log.exception("failed-registration", e=e)
473 raise
474
475 def start_heartbeat(self):
476
477 t0 = time.time()
478 t0s = time.ctime(t0)
479
480 def heartbeat():
481 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
482
483 lc = LoopingCall(heartbeat)
484 lc.start(10)
485
486 # Temporary function to send a heartbeat message to the external kafka
487 # broker
488 def start_kafka_cluster_heartbeat(self, instance_id):
489 # For heartbeat we will send a message to a specific "voltha-heartbeat"
490 # topic. The message is a protocol buf
491 # message
492 message = dict(
493 type='heartbeat',
494 adapter=self.args.name,
495 instance=instance_id,
496 ip=get_my_primary_local_ipv4()
497 )
498 topic = defs['heartbeat_topic']
499
500 def send_msg(start_time):
501 try:
502 kafka_cluster_proxy = get_kafka_proxy()
503 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
504 # self.log.debug('kafka-proxy-available')
505 message['ts'] = arrow.utcnow().timestamp
506 message['uptime'] = time.time() - start_time
507 # self.log.debug('start-kafka-heartbeat')
508 kafka_cluster_proxy.send_message(topic, dumps(message))
509 else:
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000510 Probe.kafka_cluster_proxy_running = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500511 self.log.error('kafka-proxy-unavailable')
512 except Exception, e:
513 self.log.exception('failed-sending-message-heartbeat', e=e)
514
515 try:
516 t0 = time.time()
517 lc = LoopingCall(send_msg, t0)
518 lc.start(10)
519 except Exception, e:
520 self.log.exception('failed-kafka-heartbeat', e=e)
521
522
523if __name__ == '__main__':
524 Main().start()