blob: ed1d15f06dfa9f3ec2f6be692120232dcd01df71 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
32from common.structlog_setup import setup_logging, update_logging
33from common.utils.asleep import asleep
34from common.utils.deferred_utils import TimeOutError
35from common.utils.dockerhelpers import get_my_containers_name
36from common.utils.nethelpers import get_my_primary_local_ipv4, \
37 get_my_primary_interface
38from voltha.core.registry import registry, IComponent
39from kafka.adapter_proxy import AdapterProxy
40from kafka.adapter_request_facade import AdapterRequestFacade
41from kafka.core_proxy import CoreProxy
42from kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
43 get_messaging_proxy
44from kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from brcm_openomci_onu import BrcmOpenomciOnuAdapter
46from voltha.protos import third_party
47from voltha.protos.adapter_pb2 import AdapterConfig
48
49_ = third_party
50
51defs = dict(
52 version_file='./VERSION',
53 config=os.environ.get('CONFIG', './openonu.yml'),
54 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
55 '0-9]+)\..*$'),
56 consul=os.environ.get('CONSUL', 'localhost:8500'),
57 name=os.environ.get('NAME', 'openonu'),
58 vendor=os.environ.get('VENDOR', 'Voltha Project'),
59 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
60 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
61 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
62 etcd=os.environ.get('ETCD', 'localhost:2379'),
63 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
64 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
65 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
66 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
67 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
68 backend=os.environ.get('BACKEND', 'none'),
69 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
70 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
71)
72
73
74def parse_args():
75 parser = argparse.ArgumentParser()
76
77 _help = ('Path to openonu.yml config file (default: %s). '
78 'If relative, it is relative to main.py of openonu adapter.'
79 % defs['config'])
80 parser.add_argument('-c', '--config',
81 dest='config',
82 action='store',
83 default=defs['config'],
84 help=_help)
85
86 _help = 'Regular expression for extracting conatiner number from ' \
87 'container name (default: %s)' % defs['container_name_regex']
88 parser.add_argument('-X', '--container-number-extractor',
89 dest='container_name_regex',
90 action='store',
91 default=defs['container_name_regex'],
92 help=_help)
93
94 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
95 parser.add_argument('-C', '--consul',
96 dest='consul',
97 action='store',
98 default=defs['consul'],
99 help=_help)
100
101 _help = 'name of this adapter (default: %s)' % defs['name']
102 parser.add_argument('-na', '--name',
103 dest='name',
104 action='store',
105 default=defs['name'],
106 help=_help)
107
108 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
109 parser.add_argument('-ven', '--vendor',
110 dest='vendor',
111 action='store',
112 default=defs['vendor'],
113 help=_help)
114
115 _help = 'supported device type of this adapter (default: %s)' % defs[
116 'device_type']
117 parser.add_argument('-dt', '--device_type',
118 dest='device_type',
119 action='store',
120 default=defs['device_type'],
121 help=_help)
122
123 _help = 'specifies whether the device type accepts bulk flow updates ' \
124 'adapter (default: %s)' % defs['accept_bulk_flow']
125 parser.add_argument('-abf', '--accept_bulk_flow',
126 dest='accept_bulk_flow',
127 action='store',
128 default=defs['accept_bulk_flow'],
129 help=_help)
130
131 _help = 'specifies whether the device type accepts add/remove flow ' \
132 '(default: %s)' % defs['accept_atomic_flow']
133 parser.add_argument('-aaf', '--accept_atomic_flow',
134 dest='accept_atomic_flow',
135 action='store',
136 default=defs['accept_atomic_flow'],
137 help=_help)
138
139 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
140 parser.add_argument('-e', '--etcd',
141 dest='etcd',
142 action='store',
143 default=defs['etcd'],
144 help=_help)
145
146 _help = ('unique string id of this container instance (default: %s)'
147 % defs['instance_id'])
148 parser.add_argument('-i', '--instance-id',
149 dest='instance_id',
150 action='store',
151 default=defs['instance_id'],
152 help=_help)
153
154 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
155 parser.add_argument('-I', '--interface',
156 dest='interface',
157 action='store',
158 default=defs['interface'],
159 help=_help)
160
161 _help = 'omit startup banner log lines'
162 parser.add_argument('-n', '--no-banner',
163 dest='no_banner',
164 action='store_true',
165 default=False,
166 help=_help)
167
168 _help = 'do not emit periodic heartbeat log messages'
169 parser.add_argument('-N', '--no-heartbeat',
170 dest='no_heartbeat',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = "suppress debug and info logs"
176 parser.add_argument('-q', '--quiet',
177 dest='quiet',
178 action='count',
179 help=_help)
180
181 _help = 'enable verbose logging'
182 parser.add_argument('-v', '--verbose',
183 dest='verbose',
184 action='count',
185 help=_help)
186
187 _help = ('use docker container name as conatiner instance id'
188 ' (overrides -i/--instance-id option)')
189 parser.add_argument('--instance-id-is-container-name',
190 dest='instance_id_is_container_name',
191 action='store_true',
192 default=False,
193 help=_help)
194
195 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
196 'If not '
197 'specified (None), the address from the config file is used'
198 % defs['kafka_adapter'])
199 parser.add_argument('-KA', '--kafka_adapter',
200 dest='kafka_adapter',
201 action='store',
202 default=defs['kafka_adapter'],
203 help=_help)
204
205 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
206 'If not '
207 'specified (None), the address from the config file is used'
208 % defs['kafka_cluster'])
209 parser.add_argument('-KC', '--kafka_cluster',
210 dest='kafka_cluster',
211 action='store',
212 default=defs['kafka_cluster'],
213 help=_help)
214
215 _help = 'backend to use for config persitence'
216 parser.add_argument('-b', '--backend',
217 default=defs['backend'],
218 choices=['none', 'consul', 'etcd'],
219 help=_help)
220
221 _help = 'topic of core on the kafka bus'
222 parser.add_argument('-ct', '--core_topic',
223 dest='core_topic',
224 action='store',
225 default=defs['core_topic'],
226 help=_help)
227
228 args = parser.parse_args()
229
230 # post-processing
231
232 if args.instance_id_is_container_name:
233 args.instance_id = get_my_containers_name()
234
235 return args
236
237
238def load_config(args):
239 path = args.config
240 if path.startswith('.'):
241 dir = os.path.dirname(os.path.abspath(__file__))
242 path = os.path.join(dir, path)
243 path = os.path.abspath(path)
244 with open(path) as fd:
245 config = yaml.load(fd)
246 return config
247
248
249def print_banner(log):
250 log.info(' ')
251 log.info(' OpenOnu Adapter ')
252 log.info(' ')
253 log.info('(to stop: press Ctrl-C)')
254
255
256@implementer(IComponent)
257class Main(object):
258
259 def __init__(self):
260
261 self.args = args = parse_args()
262 self.config = load_config(args)
263
264 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
265 self.log = setup_logging(self.config.get('logging', {}),
266 args.instance_id,
267 verbosity_adjust=verbosity_adjust)
268 self.log.info('container-number-extractor',
269 regex=args.container_name_regex)
270
271 self.adapter_version = self.get_version()
272 self.log.info('OpenONU-Adapter-Version', version=
273 self.adapter_version)
274
275 if not args.no_banner:
276 print_banner(self.log)
277
278 self.adapter = None
279 # Create a unique instance id using the passed-in instance id and
280 # UTC timestamp
281 current_time = arrow.utcnow().timestamp
282 self.instance_id = self.args.instance_id + '_' + str(current_time)
283
284 self.core_topic = args.core_topic
285 self.listening_topic = args.name
286 self.startup_components()
287
288 if not args.no_heartbeat:
289 self.start_heartbeat()
290 self.start_kafka_cluster_heartbeat(self.instance_id)
291
292 def get_version(self):
293 path = defs['version_file']
294 if not path.startswith('/'):
295 dir = os.path.dirname(os.path.abspath(__file__))
296 path = os.path.join(dir, path)
297
298 path = os.path.abspath(path)
299 version_file = open(path, 'r')
300 v = version_file.read()
301
302 # Use Version to validate the version string - exception will be raised
303 # if the version is invalid
304 Version(v)
305
306 version_file.close()
307 return v
308
309 def start(self):
310 self.start_reactor() # will not return except Keyboard interrupt
311
312 def stop(self):
313 pass
314
315 def get_args(self):
316 """Allow access to command line args"""
317 return self.args
318
319 def get_config(self):
320 """Allow access to content of config file"""
321 return self.config
322
323 def _get_adapter_config(self):
324 cfg = AdapterConfig()
325 return cfg
326
327 @inlineCallbacks
328 def startup_components(self):
329 try:
330 self.log.info('starting-internal-components',
331 consul=self.args.consul,
332 etcd=self.args.etcd)
333
334 registry.register('main', self)
335
336 # Update the logger to output the vcore id.
337 self.log = update_logging(instance_id=self.instance_id,
338 vcore_id=None)
339
340 yield registry.register(
341 'kafka_cluster_proxy',
342 KafkaProxy(
343 self.args.consul,
344 self.args.kafka_cluster,
345 config=self.config.get('kafka-cluster-proxy', {})
346 )
347 ).start()
348
349 config = self._get_adapter_config()
350
351 self.core_proxy = CoreProxy(
352 kafka_proxy=None,
353 core_topic=self.core_topic,
354 my_listening_topic=self.listening_topic)
355
356 self.adapter_proxy = AdapterProxy(
357 kafka_proxy=None,
358 core_topic=self.core_topic,
359 my_listening_topic=self.listening_topic)
360
361 self.adapter = BrcmOpenomciOnuAdapter(
362 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
363 config=config)
364 openonu_request_handler = AdapterRequestFacade(
365 adapter=self.adapter)
366
367 yield registry.register(
368 'kafka_adapter_proxy',
369 IKafkaMessagingProxy(
370 kafka_host_port=self.args.kafka_adapter,
371 # TODO: Add KV Store object reference
372 kv_store=self.args.backend,
373 default_topic=self.args.name,
374 group_id_prefix=self.args.instance_id,
375 target_cls=openonu_request_handler
376 )
377 ).start()
378
379 self.core_proxy.kafka_proxy = get_messaging_proxy()
380 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
381
382 # retry for ever
383 res = yield self._register_with_core(-1)
384
385 self.log.info('started-internal-services')
386
387 except Exception as e:
388 self.log.exception('Failure-to-start-all-components', e=e)
389
390 @inlineCallbacks
391 def shutdown_components(self):
392 """Execute before the reactor is shut down"""
393 self.log.info('exiting-on-keyboard-interrupt')
394 for component in reversed(registry.iterate()):
395 yield component.stop()
396
397 import threading
398 self.log.info('THREADS:')
399 main_thread = threading.current_thread()
400 for t in threading.enumerate():
401 if t is main_thread:
402 continue
403 if not t.isDaemon():
404 continue
405 self.log.info('joining thread {} {}'.format(
406 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
407 t.join()
408
409 def start_reactor(self):
410 from twisted.internet import reactor
411 reactor.callWhenRunning(
412 lambda: self.log.info('twisted-reactor-started'))
413 reactor.addSystemEventTrigger('before', 'shutdown',
414 self.shutdown_components)
415 reactor.run()
416
417 @inlineCallbacks
418 def _register_with_core(self, retries):
419 while 1:
420 try:
421 resp = yield self.core_proxy.register(
422 self.adapter.adapter_descriptor(),
423 self.adapter.device_types())
424 if resp:
425 self.log.info('registered-with-core',
426 coreId=resp.instance_id)
427
428 returnValue(resp)
429 except TimeOutError as e:
430 self.log.warn("timeout-when-registering-with-core", e=e)
431 if retries == 0:
432 self.log.exception("no-more-retries", e=e)
433 raise
434 else:
435 retries = retries if retries < 0 else retries - 1
436 yield asleep(defs['retry_interval'])
437 except Exception as e:
438 self.log.exception("failed-registration", e=e)
439 raise
440
441 def start_heartbeat(self):
442
443 t0 = time.time()
444 t0s = time.ctime(t0)
445
446 def heartbeat():
447 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
448
449 lc = LoopingCall(heartbeat)
450 lc.start(10)
451
452 # Temporary function to send a heartbeat message to the external kafka
453 # broker
454 def start_kafka_cluster_heartbeat(self, instance_id):
455 # For heartbeat we will send a message to a specific "voltha-heartbeat"
456 # topic. The message is a protocol buf
457 # message
458 message = dict(
459 type='heartbeat',
460 adapter=self.args.name,
461 instance=instance_id,
462 ip=get_my_primary_local_ipv4()
463 )
464 topic = defs['heartbeat_topic']
465
466 def send_msg(start_time):
467 try:
468 kafka_cluster_proxy = get_kafka_proxy()
469 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
470 # self.log.debug('kafka-proxy-available')
471 message['ts'] = arrow.utcnow().timestamp
472 message['uptime'] = time.time() - start_time
473 # self.log.debug('start-kafka-heartbeat')
474 kafka_cluster_proxy.send_message(topic, dumps(message))
475 else:
476 self.log.error('kafka-proxy-unavailable')
477 except Exception, e:
478 self.log.exception('failed-sending-message-heartbeat', e=e)
479
480 try:
481 t0 = time.time()
482 lc = LoopingCall(send_msg, t0)
483 lc.start(10)
484 except Exception, e:
485 self.log.exception('failed-kafka-heartbeat', e=e)
486
487
488if __name__ == '__main__':
489 Main().start()