blob: 0a865006685225d3b79a439187ab0884de152871 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050032from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050037 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050038from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050043 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050044from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from pyvoltha.protos import third_party
46from pyvoltha.protos.adapter_pb2 import AdapterConfig
47
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050048from brcm_openomci_onu import BrcmOpenomciOnuAdapter
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050049
50_ = third_party
51
52defs = dict(
53 version_file='./VERSION',
54 config=os.environ.get('CONFIG', './openonu.yml'),
55 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
56 '0-9]+)\..*$'),
57 consul=os.environ.get('CONSUL', 'localhost:8500'),
58 name=os.environ.get('NAME', 'openonu'),
59 vendor=os.environ.get('VENDOR', 'Voltha Project'),
60 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
61 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
62 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
63 etcd=os.environ.get('ETCD', 'localhost:2379'),
64 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
65 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
66 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
67 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
68 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
69 backend=os.environ.get('BACKEND', 'none'),
70 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
71 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
72)
73
74
75def parse_args():
76 parser = argparse.ArgumentParser()
77
78 _help = ('Path to openonu.yml config file (default: %s). '
79 'If relative, it is relative to main.py of openonu adapter.'
80 % defs['config'])
81 parser.add_argument('-c', '--config',
82 dest='config',
83 action='store',
84 default=defs['config'],
85 help=_help)
86
87 _help = 'Regular expression for extracting conatiner number from ' \
88 'container name (default: %s)' % defs['container_name_regex']
89 parser.add_argument('-X', '--container-number-extractor',
90 dest='container_name_regex',
91 action='store',
92 default=defs['container_name_regex'],
93 help=_help)
94
95 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
96 parser.add_argument('-C', '--consul',
97 dest='consul',
98 action='store',
99 default=defs['consul'],
100 help=_help)
101
102 _help = 'name of this adapter (default: %s)' % defs['name']
103 parser.add_argument('-na', '--name',
104 dest='name',
105 action='store',
106 default=defs['name'],
107 help=_help)
108
109 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
110 parser.add_argument('-ven', '--vendor',
111 dest='vendor',
112 action='store',
113 default=defs['vendor'],
114 help=_help)
115
116 _help = 'supported device type of this adapter (default: %s)' % defs[
117 'device_type']
118 parser.add_argument('-dt', '--device_type',
119 dest='device_type',
120 action='store',
121 default=defs['device_type'],
122 help=_help)
123
124 _help = 'specifies whether the device type accepts bulk flow updates ' \
125 'adapter (default: %s)' % defs['accept_bulk_flow']
126 parser.add_argument('-abf', '--accept_bulk_flow',
127 dest='accept_bulk_flow',
128 action='store',
129 default=defs['accept_bulk_flow'],
130 help=_help)
131
132 _help = 'specifies whether the device type accepts add/remove flow ' \
133 '(default: %s)' % defs['accept_atomic_flow']
134 parser.add_argument('-aaf', '--accept_atomic_flow',
135 dest='accept_atomic_flow',
136 action='store',
137 default=defs['accept_atomic_flow'],
138 help=_help)
139
140 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
141 parser.add_argument('-e', '--etcd',
142 dest='etcd',
143 action='store',
144 default=defs['etcd'],
145 help=_help)
146
147 _help = ('unique string id of this container instance (default: %s)'
148 % defs['instance_id'])
149 parser.add_argument('-i', '--instance-id',
150 dest='instance_id',
151 action='store',
152 default=defs['instance_id'],
153 help=_help)
154
155 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
156 parser.add_argument('-I', '--interface',
157 dest='interface',
158 action='store',
159 default=defs['interface'],
160 help=_help)
161
162 _help = 'omit startup banner log lines'
163 parser.add_argument('-n', '--no-banner',
164 dest='no_banner',
165 action='store_true',
166 default=False,
167 help=_help)
168
169 _help = 'do not emit periodic heartbeat log messages'
170 parser.add_argument('-N', '--no-heartbeat',
171 dest='no_heartbeat',
172 action='store_true',
173 default=False,
174 help=_help)
175
176 _help = "suppress debug and info logs"
177 parser.add_argument('-q', '--quiet',
178 dest='quiet',
179 action='count',
180 help=_help)
181
182 _help = 'enable verbose logging'
183 parser.add_argument('-v', '--verbose',
184 dest='verbose',
185 action='count',
186 help=_help)
187
188 _help = ('use docker container name as conatiner instance id'
189 ' (overrides -i/--instance-id option)')
190 parser.add_argument('--instance-id-is-container-name',
191 dest='instance_id_is_container_name',
192 action='store_true',
193 default=False,
194 help=_help)
195
196 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
197 'If not '
198 'specified (None), the address from the config file is used'
199 % defs['kafka_adapter'])
200 parser.add_argument('-KA', '--kafka_adapter',
201 dest='kafka_adapter',
202 action='store',
203 default=defs['kafka_adapter'],
204 help=_help)
205
206 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
207 'If not '
208 'specified (None), the address from the config file is used'
209 % defs['kafka_cluster'])
210 parser.add_argument('-KC', '--kafka_cluster',
211 dest='kafka_cluster',
212 action='store',
213 default=defs['kafka_cluster'],
214 help=_help)
215
216 _help = 'backend to use for config persitence'
217 parser.add_argument('-b', '--backend',
218 default=defs['backend'],
219 choices=['none', 'consul', 'etcd'],
220 help=_help)
221
222 _help = 'topic of core on the kafka bus'
223 parser.add_argument('-ct', '--core_topic',
224 dest='core_topic',
225 action='store',
226 default=defs['core_topic'],
227 help=_help)
228
229 args = parser.parse_args()
230
231 # post-processing
232
233 if args.instance_id_is_container_name:
234 args.instance_id = get_my_containers_name()
235
236 return args
237
238
239def load_config(args):
240 path = args.config
241 if path.startswith('.'):
242 dir = os.path.dirname(os.path.abspath(__file__))
243 path = os.path.join(dir, path)
244 path = os.path.abspath(path)
245 with open(path) as fd:
246 config = yaml.load(fd)
247 return config
248
249
250def print_banner(log):
251 log.info(' ')
252 log.info(' OpenOnu Adapter ')
253 log.info(' ')
254 log.info('(to stop: press Ctrl-C)')
255
256
257@implementer(IComponent)
258class Main(object):
259
260 def __init__(self):
261
262 self.args = args = parse_args()
263 self.config = load_config(args)
264
265 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
266 self.log = setup_logging(self.config.get('logging', {}),
267 args.instance_id,
268 verbosity_adjust=verbosity_adjust)
269 self.log.info('container-number-extractor',
270 regex=args.container_name_regex)
271
272 self.adapter_version = self.get_version()
273 self.log.info('OpenONU-Adapter-Version', version=
274 self.adapter_version)
275
276 if not args.no_banner:
277 print_banner(self.log)
278
279 self.adapter = None
280 # Create a unique instance id using the passed-in instance id and
281 # UTC timestamp
282 current_time = arrow.utcnow().timestamp
283 self.instance_id = self.args.instance_id + '_' + str(current_time)
284
285 self.core_topic = args.core_topic
286 self.listening_topic = args.name
287 self.startup_components()
288
289 if not args.no_heartbeat:
290 self.start_heartbeat()
291 self.start_kafka_cluster_heartbeat(self.instance_id)
292
293 def get_version(self):
294 path = defs['version_file']
295 if not path.startswith('/'):
296 dir = os.path.dirname(os.path.abspath(__file__))
297 path = os.path.join(dir, path)
298
299 path = os.path.abspath(path)
300 version_file = open(path, 'r')
301 v = version_file.read()
302
303 # Use Version to validate the version string - exception will be raised
304 # if the version is invalid
305 Version(v)
306
307 version_file.close()
308 return v
309
310 def start(self):
311 self.start_reactor() # will not return except Keyboard interrupt
312
313 def stop(self):
314 pass
315
316 def get_args(self):
317 """Allow access to command line args"""
318 return self.args
319
320 def get_config(self):
321 """Allow access to content of config file"""
322 return self.config
323
324 def _get_adapter_config(self):
325 cfg = AdapterConfig()
326 return cfg
327
328 @inlineCallbacks
329 def startup_components(self):
330 try:
331 self.log.info('starting-internal-components',
332 consul=self.args.consul,
333 etcd=self.args.etcd)
334
335 registry.register('main', self)
336
337 # Update the logger to output the vcore id.
338 self.log = update_logging(instance_id=self.instance_id,
339 vcore_id=None)
340
341 yield registry.register(
342 'kafka_cluster_proxy',
343 KafkaProxy(
344 self.args.consul,
345 self.args.kafka_cluster,
346 config=self.config.get('kafka-cluster-proxy', {})
347 )
348 ).start()
349
350 config = self._get_adapter_config()
351
352 self.core_proxy = CoreProxy(
353 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500354 default_core_topic=self.core_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500355 my_listening_topic=self.listening_topic)
356
357 self.adapter_proxy = AdapterProxy(
358 kafka_proxy=None,
359 core_topic=self.core_topic,
360 my_listening_topic=self.listening_topic)
361
362 self.adapter = BrcmOpenomciOnuAdapter(
363 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
364 config=config)
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500365
366 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
367 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500368
369 yield registry.register(
370 'kafka_adapter_proxy',
371 IKafkaMessagingProxy(
372 kafka_host_port=self.args.kafka_adapter,
373 # TODO: Add KV Store object reference
374 kv_store=self.args.backend,
375 default_topic=self.args.name,
376 group_id_prefix=self.args.instance_id,
377 target_cls=openonu_request_handler
378 )
379 ).start()
380
381 self.core_proxy.kafka_proxy = get_messaging_proxy()
382 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
383
384 # retry for ever
385 res = yield self._register_with_core(-1)
386
387 self.log.info('started-internal-services')
388
389 except Exception as e:
390 self.log.exception('Failure-to-start-all-components', e=e)
391
392 @inlineCallbacks
393 def shutdown_components(self):
394 """Execute before the reactor is shut down"""
395 self.log.info('exiting-on-keyboard-interrupt')
396 for component in reversed(registry.iterate()):
397 yield component.stop()
398
399 import threading
400 self.log.info('THREADS:')
401 main_thread = threading.current_thread()
402 for t in threading.enumerate():
403 if t is main_thread:
404 continue
405 if not t.isDaemon():
406 continue
407 self.log.info('joining thread {} {}'.format(
408 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
409 t.join()
410
411 def start_reactor(self):
412 from twisted.internet import reactor
413 reactor.callWhenRunning(
414 lambda: self.log.info('twisted-reactor-started'))
415 reactor.addSystemEventTrigger('before', 'shutdown',
416 self.shutdown_components)
417 reactor.run()
418
419 @inlineCallbacks
420 def _register_with_core(self, retries):
421 while 1:
422 try:
423 resp = yield self.core_proxy.register(
424 self.adapter.adapter_descriptor(),
425 self.adapter.device_types())
426 if resp:
427 self.log.info('registered-with-core',
428 coreId=resp.instance_id)
429
430 returnValue(resp)
431 except TimeOutError as e:
432 self.log.warn("timeout-when-registering-with-core", e=e)
433 if retries == 0:
434 self.log.exception("no-more-retries", e=e)
435 raise
436 else:
437 retries = retries if retries < 0 else retries - 1
438 yield asleep(defs['retry_interval'])
439 except Exception as e:
440 self.log.exception("failed-registration", e=e)
441 raise
442
443 def start_heartbeat(self):
444
445 t0 = time.time()
446 t0s = time.ctime(t0)
447
448 def heartbeat():
449 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
450
451 lc = LoopingCall(heartbeat)
452 lc.start(10)
453
454 # Temporary function to send a heartbeat message to the external kafka
455 # broker
456 def start_kafka_cluster_heartbeat(self, instance_id):
457 # For heartbeat we will send a message to a specific "voltha-heartbeat"
458 # topic. The message is a protocol buf
459 # message
460 message = dict(
461 type='heartbeat',
462 adapter=self.args.name,
463 instance=instance_id,
464 ip=get_my_primary_local_ipv4()
465 )
466 topic = defs['heartbeat_topic']
467
468 def send_msg(start_time):
469 try:
470 kafka_cluster_proxy = get_kafka_proxy()
471 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
472 # self.log.debug('kafka-proxy-available')
473 message['ts'] = arrow.utcnow().timestamp
474 message['uptime'] = time.time() - start_time
475 # self.log.debug('start-kafka-heartbeat')
476 kafka_cluster_proxy.send_message(topic, dumps(message))
477 else:
478 self.log.error('kafka-proxy-unavailable')
479 except Exception, e:
480 self.log.exception('failed-sending-message-heartbeat', e=e)
481
482 try:
483 t0 = time.time()
484 lc = LoopingCall(send_msg, t0)
485 lc.start(10)
486 except Exception, e:
487 self.log.exception('failed-kafka-heartbeat', e=e)
488
489
490if __name__ == '__main__':
491 Main().start()