blob: 6521ba7ce0981a887bed6382da74cb5b69ebd54f [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050032from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050037 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050038from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050043 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050044from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050045from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050046
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050047from brcm_openomci_onu import BrcmOpenomciOnuAdapter
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050048
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050049defs = dict(
50 version_file='./VERSION',
51 config=os.environ.get('CONFIG', './openonu.yml'),
52 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
53 '0-9]+)\..*$'),
54 consul=os.environ.get('CONSUL', 'localhost:8500'),
55 name=os.environ.get('NAME', 'openonu'),
56 vendor=os.environ.get('VENDOR', 'Voltha Project'),
57 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
58 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
59 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
60 etcd=os.environ.get('ETCD', 'localhost:2379'),
61 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
62 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
63 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
64 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
65 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
66 backend=os.environ.get('BACKEND', 'none'),
67 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
68 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
69)
70
71
72def parse_args():
73 parser = argparse.ArgumentParser()
74
75 _help = ('Path to openonu.yml config file (default: %s). '
76 'If relative, it is relative to main.py of openonu adapter.'
77 % defs['config'])
78 parser.add_argument('-c', '--config',
79 dest='config',
80 action='store',
81 default=defs['config'],
82 help=_help)
83
84 _help = 'Regular expression for extracting conatiner number from ' \
85 'container name (default: %s)' % defs['container_name_regex']
86 parser.add_argument('-X', '--container-number-extractor',
87 dest='container_name_regex',
88 action='store',
89 default=defs['container_name_regex'],
90 help=_help)
91
92 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
93 parser.add_argument('-C', '--consul',
94 dest='consul',
95 action='store',
96 default=defs['consul'],
97 help=_help)
98
99 _help = 'name of this adapter (default: %s)' % defs['name']
100 parser.add_argument('-na', '--name',
101 dest='name',
102 action='store',
103 default=defs['name'],
104 help=_help)
105
106 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
107 parser.add_argument('-ven', '--vendor',
108 dest='vendor',
109 action='store',
110 default=defs['vendor'],
111 help=_help)
112
113 _help = 'supported device type of this adapter (default: %s)' % defs[
114 'device_type']
115 parser.add_argument('-dt', '--device_type',
116 dest='device_type',
117 action='store',
118 default=defs['device_type'],
119 help=_help)
120
121 _help = 'specifies whether the device type accepts bulk flow updates ' \
122 'adapter (default: %s)' % defs['accept_bulk_flow']
123 parser.add_argument('-abf', '--accept_bulk_flow',
124 dest='accept_bulk_flow',
125 action='store',
126 default=defs['accept_bulk_flow'],
127 help=_help)
128
129 _help = 'specifies whether the device type accepts add/remove flow ' \
130 '(default: %s)' % defs['accept_atomic_flow']
131 parser.add_argument('-aaf', '--accept_atomic_flow',
132 dest='accept_atomic_flow',
133 action='store',
134 default=defs['accept_atomic_flow'],
135 help=_help)
136
137 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
138 parser.add_argument('-e', '--etcd',
139 dest='etcd',
140 action='store',
141 default=defs['etcd'],
142 help=_help)
143
144 _help = ('unique string id of this container instance (default: %s)'
145 % defs['instance_id'])
146 parser.add_argument('-i', '--instance-id',
147 dest='instance_id',
148 action='store',
149 default=defs['instance_id'],
150 help=_help)
151
152 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
153 parser.add_argument('-I', '--interface',
154 dest='interface',
155 action='store',
156 default=defs['interface'],
157 help=_help)
158
159 _help = 'omit startup banner log lines'
160 parser.add_argument('-n', '--no-banner',
161 dest='no_banner',
162 action='store_true',
163 default=False,
164 help=_help)
165
166 _help = 'do not emit periodic heartbeat log messages'
167 parser.add_argument('-N', '--no-heartbeat',
168 dest='no_heartbeat',
169 action='store_true',
170 default=False,
171 help=_help)
172
173 _help = "suppress debug and info logs"
174 parser.add_argument('-q', '--quiet',
175 dest='quiet',
176 action='count',
177 help=_help)
178
179 _help = 'enable verbose logging'
180 parser.add_argument('-v', '--verbose',
181 dest='verbose',
182 action='count',
183 help=_help)
184
185 _help = ('use docker container name as conatiner instance id'
186 ' (overrides -i/--instance-id option)')
187 parser.add_argument('--instance-id-is-container-name',
188 dest='instance_id_is_container_name',
189 action='store_true',
190 default=False,
191 help=_help)
192
193 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
194 'If not '
195 'specified (None), the address from the config file is used'
196 % defs['kafka_adapter'])
197 parser.add_argument('-KA', '--kafka_adapter',
198 dest='kafka_adapter',
199 action='store',
200 default=defs['kafka_adapter'],
201 help=_help)
202
203 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
204 'If not '
205 'specified (None), the address from the config file is used'
206 % defs['kafka_cluster'])
207 parser.add_argument('-KC', '--kafka_cluster',
208 dest='kafka_cluster',
209 action='store',
210 default=defs['kafka_cluster'],
211 help=_help)
212
213 _help = 'backend to use for config persitence'
214 parser.add_argument('-b', '--backend',
215 default=defs['backend'],
216 choices=['none', 'consul', 'etcd'],
217 help=_help)
218
219 _help = 'topic of core on the kafka bus'
220 parser.add_argument('-ct', '--core_topic',
221 dest='core_topic',
222 action='store',
223 default=defs['core_topic'],
224 help=_help)
225
226 args = parser.parse_args()
227
228 # post-processing
229
230 if args.instance_id_is_container_name:
231 args.instance_id = get_my_containers_name()
232
233 return args
234
235
236def load_config(args):
237 path = args.config
238 if path.startswith('.'):
239 dir = os.path.dirname(os.path.abspath(__file__))
240 path = os.path.join(dir, path)
241 path = os.path.abspath(path)
242 with open(path) as fd:
243 config = yaml.load(fd)
244 return config
245
246
247def print_banner(log):
248 log.info(' ')
249 log.info(' OpenOnu Adapter ')
250 log.info(' ')
251 log.info('(to stop: press Ctrl-C)')
252
253
254@implementer(IComponent)
255class Main(object):
256
257 def __init__(self):
258
259 self.args = args = parse_args()
260 self.config = load_config(args)
261
262 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
263 self.log = setup_logging(self.config.get('logging', {}),
264 args.instance_id,
265 verbosity_adjust=verbosity_adjust)
266 self.log.info('container-number-extractor',
267 regex=args.container_name_regex)
268
269 self.adapter_version = self.get_version()
270 self.log.info('OpenONU-Adapter-Version', version=
271 self.adapter_version)
272
273 if not args.no_banner:
274 print_banner(self.log)
275
276 self.adapter = None
277 # Create a unique instance id using the passed-in instance id and
278 # UTC timestamp
279 current_time = arrow.utcnow().timestamp
280 self.instance_id = self.args.instance_id + '_' + str(current_time)
281
282 self.core_topic = args.core_topic
283 self.listening_topic = args.name
284 self.startup_components()
285
286 if not args.no_heartbeat:
287 self.start_heartbeat()
288 self.start_kafka_cluster_heartbeat(self.instance_id)
289
290 def get_version(self):
291 path = defs['version_file']
292 if not path.startswith('/'):
293 dir = os.path.dirname(os.path.abspath(__file__))
294 path = os.path.join(dir, path)
295
296 path = os.path.abspath(path)
297 version_file = open(path, 'r')
298 v = version_file.read()
299
300 # Use Version to validate the version string - exception will be raised
301 # if the version is invalid
302 Version(v)
303
304 version_file.close()
305 return v
306
307 def start(self):
308 self.start_reactor() # will not return except Keyboard interrupt
309
310 def stop(self):
311 pass
312
313 def get_args(self):
314 """Allow access to command line args"""
315 return self.args
316
317 def get_config(self):
318 """Allow access to content of config file"""
319 return self.config
320
321 def _get_adapter_config(self):
322 cfg = AdapterConfig()
323 return cfg
324
325 @inlineCallbacks
326 def startup_components(self):
327 try:
328 self.log.info('starting-internal-components',
329 consul=self.args.consul,
330 etcd=self.args.etcd)
331
332 registry.register('main', self)
333
334 # Update the logger to output the vcore id.
335 self.log = update_logging(instance_id=self.instance_id,
336 vcore_id=None)
337
338 yield registry.register(
339 'kafka_cluster_proxy',
340 KafkaProxy(
341 self.args.consul,
342 self.args.kafka_cluster,
343 config=self.config.get('kafka-cluster-proxy', {})
344 )
345 ).start()
346
347 config = self._get_adapter_config()
348
349 self.core_proxy = CoreProxy(
350 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500351 default_core_topic=self.core_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500352 my_listening_topic=self.listening_topic)
353
354 self.adapter_proxy = AdapterProxy(
355 kafka_proxy=None,
356 core_topic=self.core_topic,
357 my_listening_topic=self.listening_topic)
358
359 self.adapter = BrcmOpenomciOnuAdapter(
360 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
361 config=config)
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500362
363 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
364 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500365
366 yield registry.register(
367 'kafka_adapter_proxy',
368 IKafkaMessagingProxy(
369 kafka_host_port=self.args.kafka_adapter,
370 # TODO: Add KV Store object reference
371 kv_store=self.args.backend,
372 default_topic=self.args.name,
373 group_id_prefix=self.args.instance_id,
374 target_cls=openonu_request_handler
375 )
376 ).start()
377
378 self.core_proxy.kafka_proxy = get_messaging_proxy()
379 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
380
381 # retry for ever
382 res = yield self._register_with_core(-1)
383
384 self.log.info('started-internal-services')
385
386 except Exception as e:
387 self.log.exception('Failure-to-start-all-components', e=e)
388
389 @inlineCallbacks
390 def shutdown_components(self):
391 """Execute before the reactor is shut down"""
392 self.log.info('exiting-on-keyboard-interrupt')
393 for component in reversed(registry.iterate()):
394 yield component.stop()
395
396 import threading
397 self.log.info('THREADS:')
398 main_thread = threading.current_thread()
399 for t in threading.enumerate():
400 if t is main_thread:
401 continue
402 if not t.isDaemon():
403 continue
404 self.log.info('joining thread {} {}'.format(
405 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
406 t.join()
407
408 def start_reactor(self):
409 from twisted.internet import reactor
410 reactor.callWhenRunning(
411 lambda: self.log.info('twisted-reactor-started'))
412 reactor.addSystemEventTrigger('before', 'shutdown',
413 self.shutdown_components)
414 reactor.run()
415
416 @inlineCallbacks
417 def _register_with_core(self, retries):
418 while 1:
419 try:
420 resp = yield self.core_proxy.register(
421 self.adapter.adapter_descriptor(),
422 self.adapter.device_types())
423 if resp:
424 self.log.info('registered-with-core',
425 coreId=resp.instance_id)
426
427 returnValue(resp)
428 except TimeOutError as e:
429 self.log.warn("timeout-when-registering-with-core", e=e)
430 if retries == 0:
431 self.log.exception("no-more-retries", e=e)
432 raise
433 else:
434 retries = retries if retries < 0 else retries - 1
435 yield asleep(defs['retry_interval'])
436 except Exception as e:
437 self.log.exception("failed-registration", e=e)
438 raise
439
440 def start_heartbeat(self):
441
442 t0 = time.time()
443 t0s = time.ctime(t0)
444
445 def heartbeat():
446 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
447
448 lc = LoopingCall(heartbeat)
449 lc.start(10)
450
451 # Temporary function to send a heartbeat message to the external kafka
452 # broker
453 def start_kafka_cluster_heartbeat(self, instance_id):
454 # For heartbeat we will send a message to a specific "voltha-heartbeat"
455 # topic. The message is a protocol buf
456 # message
457 message = dict(
458 type='heartbeat',
459 adapter=self.args.name,
460 instance=instance_id,
461 ip=get_my_primary_local_ipv4()
462 )
463 topic = defs['heartbeat_topic']
464
465 def send_msg(start_time):
466 try:
467 kafka_cluster_proxy = get_kafka_proxy()
468 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
469 # self.log.debug('kafka-proxy-available')
470 message['ts'] = arrow.utcnow().timestamp
471 message['uptime'] = time.time() - start_time
472 # self.log.debug('start-kafka-heartbeat')
473 kafka_cluster_proxy.send_message(topic, dumps(message))
474 else:
475 self.log.error('kafka-proxy-unavailable')
476 except Exception, e:
477 self.log.exception('failed-sending-message-heartbeat', e=e)
478
479 try:
480 t0 = time.time()
481 lc = LoopingCall(send_msg, t0)
482 lc.start(10)
483 except Exception, e:
484 self.log.exception('failed-kafka-heartbeat', e=e)
485
486
487if __name__ == '__main__':
488 Main().start()