blob: 54f171ce171ed477aa9c3a96a61f3f82b4975a53 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050032from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050037 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050038from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050043 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050044from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050045from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050046
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050047from brcm_openomci_onu import BrcmOpenomciOnuAdapter
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050048
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050049defs = dict(
50 version_file='./VERSION',
51 config=os.environ.get('CONFIG', './openonu.yml'),
52 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
53 '0-9]+)\..*$'),
54 consul=os.environ.get('CONSUL', 'localhost:8500'),
55 name=os.environ.get('NAME', 'openonu'),
56 vendor=os.environ.get('VENDOR', 'Voltha Project'),
57 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
58 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
59 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
60 etcd=os.environ.get('ETCD', 'localhost:2379'),
61 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
Devmalya Paulffc89df2019-07-31 17:43:13 -040062 event_topic=os.environ.get('EVENT_TOPIC', 'voltha.events'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050063 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
64 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
65 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
66 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
67 backend=os.environ.get('BACKEND', 'none'),
68 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
69 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
70)
71
72
73def parse_args():
74 parser = argparse.ArgumentParser()
75
76 _help = ('Path to openonu.yml config file (default: %s). '
77 'If relative, it is relative to main.py of openonu adapter.'
78 % defs['config'])
79 parser.add_argument('-c', '--config',
80 dest='config',
81 action='store',
82 default=defs['config'],
83 help=_help)
84
85 _help = 'Regular expression for extracting conatiner number from ' \
86 'container name (default: %s)' % defs['container_name_regex']
87 parser.add_argument('-X', '--container-number-extractor',
88 dest='container_name_regex',
89 action='store',
90 default=defs['container_name_regex'],
91 help=_help)
92
93 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
94 parser.add_argument('-C', '--consul',
95 dest='consul',
96 action='store',
97 default=defs['consul'],
98 help=_help)
99
100 _help = 'name of this adapter (default: %s)' % defs['name']
101 parser.add_argument('-na', '--name',
102 dest='name',
103 action='store',
104 default=defs['name'],
105 help=_help)
106
107 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
108 parser.add_argument('-ven', '--vendor',
109 dest='vendor',
110 action='store',
111 default=defs['vendor'],
112 help=_help)
113
114 _help = 'supported device type of this adapter (default: %s)' % defs[
115 'device_type']
116 parser.add_argument('-dt', '--device_type',
117 dest='device_type',
118 action='store',
119 default=defs['device_type'],
120 help=_help)
121
122 _help = 'specifies whether the device type accepts bulk flow updates ' \
123 'adapter (default: %s)' % defs['accept_bulk_flow']
124 parser.add_argument('-abf', '--accept_bulk_flow',
125 dest='accept_bulk_flow',
126 action='store',
127 default=defs['accept_bulk_flow'],
128 help=_help)
129
130 _help = 'specifies whether the device type accepts add/remove flow ' \
131 '(default: %s)' % defs['accept_atomic_flow']
132 parser.add_argument('-aaf', '--accept_atomic_flow',
133 dest='accept_atomic_flow',
134 action='store',
135 default=defs['accept_atomic_flow'],
136 help=_help)
137
138 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
139 parser.add_argument('-e', '--etcd',
140 dest='etcd',
141 action='store',
142 default=defs['etcd'],
143 help=_help)
144
145 _help = ('unique string id of this container instance (default: %s)'
146 % defs['instance_id'])
147 parser.add_argument('-i', '--instance-id',
148 dest='instance_id',
149 action='store',
150 default=defs['instance_id'],
151 help=_help)
152
153 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
154 parser.add_argument('-I', '--interface',
155 dest='interface',
156 action='store',
157 default=defs['interface'],
158 help=_help)
159
160 _help = 'omit startup banner log lines'
161 parser.add_argument('-n', '--no-banner',
162 dest='no_banner',
163 action='store_true',
164 default=False,
165 help=_help)
166
167 _help = 'do not emit periodic heartbeat log messages'
168 parser.add_argument('-N', '--no-heartbeat',
169 dest='no_heartbeat',
170 action='store_true',
171 default=False,
172 help=_help)
173
174 _help = "suppress debug and info logs"
175 parser.add_argument('-q', '--quiet',
176 dest='quiet',
177 action='count',
178 help=_help)
179
180 _help = 'enable verbose logging'
181 parser.add_argument('-v', '--verbose',
182 dest='verbose',
183 action='count',
184 help=_help)
185
186 _help = ('use docker container name as conatiner instance id'
187 ' (overrides -i/--instance-id option)')
188 parser.add_argument('--instance-id-is-container-name',
189 dest='instance_id_is_container_name',
190 action='store_true',
191 default=False,
192 help=_help)
193
194 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
195 'If not '
196 'specified (None), the address from the config file is used'
197 % defs['kafka_adapter'])
198 parser.add_argument('-KA', '--kafka_adapter',
199 dest='kafka_adapter',
200 action='store',
201 default=defs['kafka_adapter'],
202 help=_help)
203
204 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
205 'If not '
206 'specified (None), the address from the config file is used'
207 % defs['kafka_cluster'])
208 parser.add_argument('-KC', '--kafka_cluster',
209 dest='kafka_cluster',
210 action='store',
211 default=defs['kafka_cluster'],
212 help=_help)
213
214 _help = 'backend to use for config persitence'
215 parser.add_argument('-b', '--backend',
216 default=defs['backend'],
217 choices=['none', 'consul', 'etcd'],
218 help=_help)
219
220 _help = 'topic of core on the kafka bus'
221 parser.add_argument('-ct', '--core_topic',
222 dest='core_topic',
223 action='store',
224 default=defs['core_topic'],
225 help=_help)
226
Devmalya Paulffc89df2019-07-31 17:43:13 -0400227 _help = 'topic of events on the kafka bus'
228 parser.add_argument('-et', '--event_topic',
229 dest='event_topic',
230 action='store',
231 default=defs['event_topic'],
232 help=_help)
233
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500234 args = parser.parse_args()
235
236 # post-processing
237
238 if args.instance_id_is_container_name:
239 args.instance_id = get_my_containers_name()
240
241 return args
242
243
244def load_config(args):
245 path = args.config
246 if path.startswith('.'):
247 dir = os.path.dirname(os.path.abspath(__file__))
248 path = os.path.join(dir, path)
249 path = os.path.abspath(path)
250 with open(path) as fd:
251 config = yaml.load(fd)
252 return config
253
254
255def print_banner(log):
256 log.info(' ')
257 log.info(' OpenOnu Adapter ')
258 log.info(' ')
259 log.info('(to stop: press Ctrl-C)')
260
261
262@implementer(IComponent)
263class Main(object):
264
265 def __init__(self):
266
267 self.args = args = parse_args()
268 self.config = load_config(args)
269
270 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
271 self.log = setup_logging(self.config.get('logging', {}),
272 args.instance_id,
273 verbosity_adjust=verbosity_adjust)
274 self.log.info('container-number-extractor',
275 regex=args.container_name_regex)
276
277 self.adapter_version = self.get_version()
278 self.log.info('OpenONU-Adapter-Version', version=
279 self.adapter_version)
280
281 if not args.no_banner:
282 print_banner(self.log)
283
284 self.adapter = None
285 # Create a unique instance id using the passed-in instance id and
286 # UTC timestamp
287 current_time = arrow.utcnow().timestamp
288 self.instance_id = self.args.instance_id + '_' + str(current_time)
289
290 self.core_topic = args.core_topic
Devmalya Paulffc89df2019-07-31 17:43:13 -0400291 self.event_topic = args.event_topic
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500292 self.listening_topic = args.name
293 self.startup_components()
294
295 if not args.no_heartbeat:
296 self.start_heartbeat()
297 self.start_kafka_cluster_heartbeat(self.instance_id)
298
299 def get_version(self):
300 path = defs['version_file']
301 if not path.startswith('/'):
302 dir = os.path.dirname(os.path.abspath(__file__))
303 path = os.path.join(dir, path)
304
305 path = os.path.abspath(path)
306 version_file = open(path, 'r')
307 v = version_file.read()
308
309 # Use Version to validate the version string - exception will be raised
310 # if the version is invalid
311 Version(v)
312
313 version_file.close()
314 return v
315
316 def start(self):
317 self.start_reactor() # will not return except Keyboard interrupt
318
319 def stop(self):
320 pass
321
322 def get_args(self):
323 """Allow access to command line args"""
324 return self.args
325
326 def get_config(self):
327 """Allow access to content of config file"""
328 return self.config
329
330 def _get_adapter_config(self):
331 cfg = AdapterConfig()
332 return cfg
333
334 @inlineCallbacks
335 def startup_components(self):
336 try:
337 self.log.info('starting-internal-components',
338 consul=self.args.consul,
339 etcd=self.args.etcd)
340
341 registry.register('main', self)
342
343 # Update the logger to output the vcore id.
344 self.log = update_logging(instance_id=self.instance_id,
345 vcore_id=None)
346
347 yield registry.register(
348 'kafka_cluster_proxy',
349 KafkaProxy(
350 self.args.consul,
351 self.args.kafka_cluster,
352 config=self.config.get('kafka-cluster-proxy', {})
353 )
354 ).start()
355
356 config = self._get_adapter_config()
357
358 self.core_proxy = CoreProxy(
359 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500360 default_core_topic=self.core_topic,
Devmalya Paulffc89df2019-07-31 17:43:13 -0400361 default_event_topic=self.event_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500362 my_listening_topic=self.listening_topic)
363
364 self.adapter_proxy = AdapterProxy(
365 kafka_proxy=None,
366 core_topic=self.core_topic,
367 my_listening_topic=self.listening_topic)
368
369 self.adapter = BrcmOpenomciOnuAdapter(
370 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
371 config=config)
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500372
Matt Jeannereta32441c2019-03-07 05:16:37 -0500373 self.adapter.start()
374
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500375 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
376 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500377
378 yield registry.register(
379 'kafka_adapter_proxy',
380 IKafkaMessagingProxy(
381 kafka_host_port=self.args.kafka_adapter,
382 # TODO: Add KV Store object reference
383 kv_store=self.args.backend,
384 default_topic=self.args.name,
385 group_id_prefix=self.args.instance_id,
386 target_cls=openonu_request_handler
387 )
388 ).start()
389
390 self.core_proxy.kafka_proxy = get_messaging_proxy()
391 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
392
393 # retry for ever
394 res = yield self._register_with_core(-1)
395
396 self.log.info('started-internal-services')
397
398 except Exception as e:
399 self.log.exception('Failure-to-start-all-components', e=e)
400
401 @inlineCallbacks
402 def shutdown_components(self):
403 """Execute before the reactor is shut down"""
404 self.log.info('exiting-on-keyboard-interrupt')
405 for component in reversed(registry.iterate()):
406 yield component.stop()
407
408 import threading
409 self.log.info('THREADS:')
410 main_thread = threading.current_thread()
411 for t in threading.enumerate():
412 if t is main_thread:
413 continue
414 if not t.isDaemon():
415 continue
416 self.log.info('joining thread {} {}'.format(
417 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
418 t.join()
419
420 def start_reactor(self):
421 from twisted.internet import reactor
422 reactor.callWhenRunning(
423 lambda: self.log.info('twisted-reactor-started'))
424 reactor.addSystemEventTrigger('before', 'shutdown',
425 self.shutdown_components)
426 reactor.run()
427
428 @inlineCallbacks
429 def _register_with_core(self, retries):
430 while 1:
431 try:
432 resp = yield self.core_proxy.register(
433 self.adapter.adapter_descriptor(),
434 self.adapter.device_types())
435 if resp:
436 self.log.info('registered-with-core',
437 coreId=resp.instance_id)
438
439 returnValue(resp)
440 except TimeOutError as e:
441 self.log.warn("timeout-when-registering-with-core", e=e)
442 if retries == 0:
443 self.log.exception("no-more-retries", e=e)
444 raise
445 else:
446 retries = retries if retries < 0 else retries - 1
447 yield asleep(defs['retry_interval'])
448 except Exception as e:
449 self.log.exception("failed-registration", e=e)
450 raise
451
452 def start_heartbeat(self):
453
454 t0 = time.time()
455 t0s = time.ctime(t0)
456
457 def heartbeat():
458 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
459
460 lc = LoopingCall(heartbeat)
461 lc.start(10)
462
463 # Temporary function to send a heartbeat message to the external kafka
464 # broker
465 def start_kafka_cluster_heartbeat(self, instance_id):
466 # For heartbeat we will send a message to a specific "voltha-heartbeat"
467 # topic. The message is a protocol buf
468 # message
469 message = dict(
470 type='heartbeat',
471 adapter=self.args.name,
472 instance=instance_id,
473 ip=get_my_primary_local_ipv4()
474 )
475 topic = defs['heartbeat_topic']
476
477 def send_msg(start_time):
478 try:
479 kafka_cluster_proxy = get_kafka_proxy()
480 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
481 # self.log.debug('kafka-proxy-available')
482 message['ts'] = arrow.utcnow().timestamp
483 message['uptime'] = time.time() - start_time
484 # self.log.debug('start-kafka-heartbeat')
485 kafka_cluster_proxy.send_message(topic, dumps(message))
486 else:
487 self.log.error('kafka-proxy-unavailable')
488 except Exception, e:
489 self.log.exception('failed-sending-message-heartbeat', e=e)
490
491 try:
492 t0 = time.time()
493 lc = LoopingCall(send_msg, t0)
494 lc.start(10)
495 except Exception, e:
496 self.log.exception('failed-kafka-heartbeat', e=e)
497
498
499if __name__ == '__main__':
500 Main().start()