blob: d0eadcba6741d1d1269d9f2f3e152efc9bd9a954 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050032from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050037 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050038from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050043 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050044from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050045from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050046
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050047from brcm_openomci_onu import BrcmOpenomciOnuAdapter
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050048
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050049defs = dict(
50 version_file='./VERSION',
51 config=os.environ.get('CONFIG', './openonu.yml'),
52 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
53 '0-9]+)\..*$'),
54 consul=os.environ.get('CONSUL', 'localhost:8500'),
55 name=os.environ.get('NAME', 'openonu'),
56 vendor=os.environ.get('VENDOR', 'Voltha Project'),
57 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
58 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
59 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
60 etcd=os.environ.get('ETCD', 'localhost:2379'),
61 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
62 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
63 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
64 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
65 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
66 backend=os.environ.get('BACKEND', 'none'),
67 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
68 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
69)
70
71
72def parse_args():
73 parser = argparse.ArgumentParser()
74
75 _help = ('Path to openonu.yml config file (default: %s). '
76 'If relative, it is relative to main.py of openonu adapter.'
77 % defs['config'])
78 parser.add_argument('-c', '--config',
79 dest='config',
80 action='store',
81 default=defs['config'],
82 help=_help)
83
84 _help = 'Regular expression for extracting conatiner number from ' \
85 'container name (default: %s)' % defs['container_name_regex']
86 parser.add_argument('-X', '--container-number-extractor',
87 dest='container_name_regex',
88 action='store',
89 default=defs['container_name_regex'],
90 help=_help)
91
92 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
93 parser.add_argument('-C', '--consul',
94 dest='consul',
95 action='store',
96 default=defs['consul'],
97 help=_help)
98
99 _help = 'name of this adapter (default: %s)' % defs['name']
100 parser.add_argument('-na', '--name',
101 dest='name',
102 action='store',
103 default=defs['name'],
104 help=_help)
105
106 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
107 parser.add_argument('-ven', '--vendor',
108 dest='vendor',
109 action='store',
110 default=defs['vendor'],
111 help=_help)
112
113 _help = 'supported device type of this adapter (default: %s)' % defs[
114 'device_type']
115 parser.add_argument('-dt', '--device_type',
116 dest='device_type',
117 action='store',
118 default=defs['device_type'],
119 help=_help)
120
121 _help = 'specifies whether the device type accepts bulk flow updates ' \
122 'adapter (default: %s)' % defs['accept_bulk_flow']
123 parser.add_argument('-abf', '--accept_bulk_flow',
124 dest='accept_bulk_flow',
125 action='store',
126 default=defs['accept_bulk_flow'],
127 help=_help)
128
129 _help = 'specifies whether the device type accepts add/remove flow ' \
130 '(default: %s)' % defs['accept_atomic_flow']
131 parser.add_argument('-aaf', '--accept_atomic_flow',
132 dest='accept_atomic_flow',
133 action='store',
134 default=defs['accept_atomic_flow'],
135 help=_help)
136
137 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
138 parser.add_argument('-e', '--etcd',
139 dest='etcd',
140 action='store',
141 default=defs['etcd'],
142 help=_help)
143
144 _help = ('unique string id of this container instance (default: %s)'
145 % defs['instance_id'])
146 parser.add_argument('-i', '--instance-id',
147 dest='instance_id',
148 action='store',
149 default=defs['instance_id'],
150 help=_help)
151
152 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
153 parser.add_argument('-I', '--interface',
154 dest='interface',
155 action='store',
156 default=defs['interface'],
157 help=_help)
158
159 _help = 'omit startup banner log lines'
160 parser.add_argument('-n', '--no-banner',
161 dest='no_banner',
162 action='store_true',
163 default=False,
164 help=_help)
165
166 _help = 'do not emit periodic heartbeat log messages'
167 parser.add_argument('-N', '--no-heartbeat',
168 dest='no_heartbeat',
169 action='store_true',
170 default=False,
171 help=_help)
172
173 _help = "suppress debug and info logs"
174 parser.add_argument('-q', '--quiet',
175 dest='quiet',
176 action='count',
177 help=_help)
178
179 _help = 'enable verbose logging'
180 parser.add_argument('-v', '--verbose',
181 dest='verbose',
182 action='count',
183 help=_help)
184
185 _help = ('use docker container name as conatiner instance id'
186 ' (overrides -i/--instance-id option)')
187 parser.add_argument('--instance-id-is-container-name',
188 dest='instance_id_is_container_name',
189 action='store_true',
190 default=False,
191 help=_help)
192
193 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
194 'If not '
195 'specified (None), the address from the config file is used'
196 % defs['kafka_adapter'])
197 parser.add_argument('-KA', '--kafka_adapter',
198 dest='kafka_adapter',
199 action='store',
200 default=defs['kafka_adapter'],
201 help=_help)
202
203 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
204 'If not '
205 'specified (None), the address from the config file is used'
206 % defs['kafka_cluster'])
207 parser.add_argument('-KC', '--kafka_cluster',
208 dest='kafka_cluster',
209 action='store',
210 default=defs['kafka_cluster'],
211 help=_help)
212
213 _help = 'backend to use for config persitence'
214 parser.add_argument('-b', '--backend',
215 default=defs['backend'],
216 choices=['none', 'consul', 'etcd'],
217 help=_help)
218
219 _help = 'topic of core on the kafka bus'
220 parser.add_argument('-ct', '--core_topic',
221 dest='core_topic',
222 action='store',
223 default=defs['core_topic'],
224 help=_help)
225
226 args = parser.parse_args()
227
228 # post-processing
229
230 if args.instance_id_is_container_name:
231 args.instance_id = get_my_containers_name()
232
233 return args
234
235
236def load_config(args):
237 path = args.config
238 if path.startswith('.'):
239 dir = os.path.dirname(os.path.abspath(__file__))
240 path = os.path.join(dir, path)
241 path = os.path.abspath(path)
242 with open(path) as fd:
243 config = yaml.load(fd)
244 return config
245
246
247def print_banner(log):
248 log.info(' ')
249 log.info(' OpenOnu Adapter ')
250 log.info(' ')
251 log.info('(to stop: press Ctrl-C)')
252
253
254@implementer(IComponent)
255class Main(object):
256
257 def __init__(self):
258
259 self.args = args = parse_args()
260 self.config = load_config(args)
261
262 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
263 self.log = setup_logging(self.config.get('logging', {}),
264 args.instance_id,
265 verbosity_adjust=verbosity_adjust)
266 self.log.info('container-number-extractor',
267 regex=args.container_name_regex)
268
269 self.adapter_version = self.get_version()
270 self.log.info('OpenONU-Adapter-Version', version=
271 self.adapter_version)
272
273 if not args.no_banner:
274 print_banner(self.log)
275
276 self.adapter = None
277 # Create a unique instance id using the passed-in instance id and
278 # UTC timestamp
279 current_time = arrow.utcnow().timestamp
280 self.instance_id = self.args.instance_id + '_' + str(current_time)
281
282 self.core_topic = args.core_topic
283 self.listening_topic = args.name
284 self.startup_components()
285
286 if not args.no_heartbeat:
287 self.start_heartbeat()
288 self.start_kafka_cluster_heartbeat(self.instance_id)
289
290 def get_version(self):
291 path = defs['version_file']
292 if not path.startswith('/'):
293 dir = os.path.dirname(os.path.abspath(__file__))
294 path = os.path.join(dir, path)
295
296 path = os.path.abspath(path)
297 version_file = open(path, 'r')
298 v = version_file.read()
299
300 # Use Version to validate the version string - exception will be raised
301 # if the version is invalid
302 Version(v)
303
304 version_file.close()
305 return v
306
307 def start(self):
308 self.start_reactor() # will not return except Keyboard interrupt
309
310 def stop(self):
311 pass
312
313 def get_args(self):
314 """Allow access to command line args"""
315 return self.args
316
317 def get_config(self):
318 """Allow access to content of config file"""
319 return self.config
320
321 def _get_adapter_config(self):
322 cfg = AdapterConfig()
323 return cfg
324
325 @inlineCallbacks
326 def startup_components(self):
327 try:
328 self.log.info('starting-internal-components',
329 consul=self.args.consul,
330 etcd=self.args.etcd)
331
332 registry.register('main', self)
333
334 # Update the logger to output the vcore id.
335 self.log = update_logging(instance_id=self.instance_id,
336 vcore_id=None)
337
338 yield registry.register(
339 'kafka_cluster_proxy',
340 KafkaProxy(
341 self.args.consul,
342 self.args.kafka_cluster,
343 config=self.config.get('kafka-cluster-proxy', {})
344 )
345 ).start()
346
347 config = self._get_adapter_config()
348
349 self.core_proxy = CoreProxy(
350 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500351 default_core_topic=self.core_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500352 my_listening_topic=self.listening_topic)
353
354 self.adapter_proxy = AdapterProxy(
355 kafka_proxy=None,
356 core_topic=self.core_topic,
357 my_listening_topic=self.listening_topic)
358
359 self.adapter = BrcmOpenomciOnuAdapter(
360 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
361 config=config)
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500362
Matt Jeannereta32441c2019-03-07 05:16:37 -0500363 self.adapter.start()
364
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500365 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
366 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500367
368 yield registry.register(
369 'kafka_adapter_proxy',
370 IKafkaMessagingProxy(
371 kafka_host_port=self.args.kafka_adapter,
372 # TODO: Add KV Store object reference
373 kv_store=self.args.backend,
374 default_topic=self.args.name,
375 group_id_prefix=self.args.instance_id,
376 target_cls=openonu_request_handler
377 )
378 ).start()
379
380 self.core_proxy.kafka_proxy = get_messaging_proxy()
381 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
382
383 # retry for ever
384 res = yield self._register_with_core(-1)
385
386 self.log.info('started-internal-services')
387
388 except Exception as e:
389 self.log.exception('Failure-to-start-all-components', e=e)
390
391 @inlineCallbacks
392 def shutdown_components(self):
393 """Execute before the reactor is shut down"""
394 self.log.info('exiting-on-keyboard-interrupt')
395 for component in reversed(registry.iterate()):
396 yield component.stop()
397
398 import threading
399 self.log.info('THREADS:')
400 main_thread = threading.current_thread()
401 for t in threading.enumerate():
402 if t is main_thread:
403 continue
404 if not t.isDaemon():
405 continue
406 self.log.info('joining thread {} {}'.format(
407 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
408 t.join()
409
410 def start_reactor(self):
411 from twisted.internet import reactor
412 reactor.callWhenRunning(
413 lambda: self.log.info('twisted-reactor-started'))
414 reactor.addSystemEventTrigger('before', 'shutdown',
415 self.shutdown_components)
416 reactor.run()
417
418 @inlineCallbacks
419 def _register_with_core(self, retries):
420 while 1:
421 try:
422 resp = yield self.core_proxy.register(
423 self.adapter.adapter_descriptor(),
424 self.adapter.device_types())
425 if resp:
426 self.log.info('registered-with-core',
427 coreId=resp.instance_id)
428
429 returnValue(resp)
430 except TimeOutError as e:
431 self.log.warn("timeout-when-registering-with-core", e=e)
432 if retries == 0:
433 self.log.exception("no-more-retries", e=e)
434 raise
435 else:
436 retries = retries if retries < 0 else retries - 1
437 yield asleep(defs['retry_interval'])
438 except Exception as e:
439 self.log.exception("failed-registration", e=e)
440 raise
441
442 def start_heartbeat(self):
443
444 t0 = time.time()
445 t0s = time.ctime(t0)
446
447 def heartbeat():
448 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
449
450 lc = LoopingCall(heartbeat)
451 lc.start(10)
452
453 # Temporary function to send a heartbeat message to the external kafka
454 # broker
455 def start_kafka_cluster_heartbeat(self, instance_id):
456 # For heartbeat we will send a message to a specific "voltha-heartbeat"
457 # topic. The message is a protocol buf
458 # message
459 message = dict(
460 type='heartbeat',
461 adapter=self.args.name,
462 instance=instance_id,
463 ip=get_my_primary_local_ipv4()
464 )
465 topic = defs['heartbeat_topic']
466
467 def send_msg(start_time):
468 try:
469 kafka_cluster_proxy = get_kafka_proxy()
470 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
471 # self.log.debug('kafka-proxy-available')
472 message['ts'] = arrow.utcnow().timestamp
473 message['uptime'] = time.time() - start_time
474 # self.log.debug('start-kafka-heartbeat')
475 kafka_cluster_proxy.send_message(topic, dumps(message))
476 else:
477 self.log.error('kafka-proxy-unavailable')
478 except Exception, e:
479 self.log.exception('failed-sending-message-heartbeat', e=e)
480
481 try:
482 t0 = time.time()
483 lc = LoopingCall(send_msg, t0)
484 lc.start(10)
485 except Exception, e:
486 self.log.exception('failed-kafka-heartbeat', e=e)
487
488
489if __name__ == '__main__':
490 Main().start()