blob: fee9cddec50712488ecb6e0f2efc9c0c1b328c6d [file] [log] [blame]
Scott Baker81739502019-10-14 16:53:10 -07001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim ONU Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
32from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
37 get_my_primary_interface
38from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
43 get_messaging_proxy
44from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from ponsim_onu import PonSimOnuAdapter
46from voltha_protos.adapter_pb2 import AdapterConfig
47
48defs = dict(
49 version_file='./VERSION',
50 config=os.environ.get('CONFIG', './ponsim_onu.yml'),
51 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
52 '0-9]+)\..*$'),
53 consul=os.environ.get('CONSUL', 'localhost:8500'),
54 name=os.environ.get('NAME', 'ponsim_onu'),
55 vendor=os.environ.get('VENDOR', 'Voltha Project'),
56 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_onu'),
57 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
58 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
59 etcd=os.environ.get('ETCD', 'localhost:2379'),
60 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
61 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
62 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
63 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
64 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
65 backend=os.environ.get('BACKEND', 'none'),
66 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
67 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
68)
69
70
71def parse_args():
72 parser = argparse.ArgumentParser()
73
74 _help = ('Path to ponsim_onu.yml config file (default: %s). '
75 'If relative, it is relative to main.py of ponsim adapter.'
76 % defs['config'])
77 parser.add_argument('-c', '--config',
78 dest='config',
79 action='store',
80 default=defs['config'],
81 help=_help)
82
83 _help = 'Regular expression for extracting conatiner number from ' \
84 'container name (default: %s)' % defs['container_name_regex']
85 parser.add_argument('-X', '--container-number-extractor',
86 dest='container_name_regex',
87 action='store',
88 default=defs['container_name_regex'],
89 help=_help)
90
91 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
92 parser.add_argument('-C', '--consul',
93 dest='consul',
94 action='store',
95 default=defs['consul'],
96 help=_help)
97
98 _help = 'name of this adapter (default: %s)' % defs['name']
99 parser.add_argument('-na', '--name',
100 dest='name',
101 action='store',
102 default=defs['name'],
103 help=_help)
104
105 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
106 parser.add_argument('-ven', '--vendor',
107 dest='vendor',
108 action='store',
109 default=defs['vendor'],
110 help=_help)
111
112 _help = 'supported device type of this adapter (default: %s)' % defs[
113 'device_type']
114 parser.add_argument('-dt', '--device_type',
115 dest='device_type',
116 action='store',
117 default=defs['device_type'],
118 help=_help)
119
120 _help = 'specifies whether the device type accepts bulk flow updates ' \
121 'adapter (default: %s)' % defs['accept_bulk_flow']
122 parser.add_argument('-abf', '--accept_bulk_flow',
123 dest='accept_bulk_flow',
124 action='store',
125 default=defs['accept_bulk_flow'],
126 help=_help)
127
128 _help = 'specifies whether the device type accepts add/remove flow ' \
129 '(default: %s)' % defs['accept_atomic_flow']
130 parser.add_argument('-aaf', '--accept_atomic_flow',
131 dest='accept_atomic_flow',
132 action='store',
133 default=defs['accept_atomic_flow'],
134 help=_help)
135
136 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
137 parser.add_argument('-e', '--etcd',
138 dest='etcd',
139 action='store',
140 default=defs['etcd'],
141 help=_help)
142
143 _help = ('unique string id of this container instance (default: %s)'
144 % defs['instance_id'])
145 parser.add_argument('-i', '--instance-id',
146 dest='instance_id',
147 action='store',
148 default=defs['instance_id'],
149 help=_help)
150
151 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
152 parser.add_argument('-I', '--interface',
153 dest='interface',
154 action='store',
155 default=defs['interface'],
156 help=_help)
157
158 _help = 'omit startup banner log lines'
159 parser.add_argument('-n', '--no-banner',
160 dest='no_banner',
161 action='store_true',
162 default=False,
163 help=_help)
164
165 _help = 'do not emit periodic heartbeat log messages'
166 parser.add_argument('-N', '--no-heartbeat',
167 dest='no_heartbeat',
168 action='store_true',
169 default=False,
170 help=_help)
171
172 _help = "suppress debug and info logs"
173 parser.add_argument('-q', '--quiet',
174 dest='quiet',
175 action='count',
176 help=_help)
177
178 _help = 'enable verbose logging'
179 parser.add_argument('-v', '--verbose',
180 dest='verbose',
181 action='count',
182 help=_help)
183
184 _help = ('use docker container name as conatiner instance id'
185 ' (overrides -i/--instance-id option)')
186 parser.add_argument('--instance-id-is-container-name',
187 dest='instance_id_is_container_name',
188 action='store_true',
189 default=False,
190 help=_help)
191
192 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
193 'If not '
194 'specified (None), the address from the config file is used'
195 % defs['kafka_adapter'])
196 parser.add_argument('-KA', '--kafka_adapter',
197 dest='kafka_adapter',
198 action='store',
199 default=defs['kafka_adapter'],
200 help=_help)
201
202 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
203 'If not '
204 'specified (None), the address from the config file is used'
205 % defs['kafka_cluster'])
206 parser.add_argument('-KC', '--kafka_cluster',
207 dest='kafka_cluster',
208 action='store',
209 default=defs['kafka_cluster'],
210 help=_help)
211
212 _help = 'backend to use for config persitence'
213 parser.add_argument('-b', '--backend',
214 default=defs['backend'],
215 choices=['none', 'consul', 'etcd'],
216 help=_help)
217
218 _help = 'topic of core on the kafka bus'
219 parser.add_argument('-ct', '--core_topic',
220 dest='core_topic',
221 action='store',
222 default=defs['core_topic'],
223 help=_help)
224
225 args = parser.parse_args()
226
227 # post-processing
228
229 if args.instance_id_is_container_name:
230 args.instance_id = get_my_containers_name()
231
232 return args
233
234
235def load_config(args):
236 path = args.config
237 if path.startswith('.'):
238 dir = os.path.dirname(os.path.abspath(__file__))
239 path = os.path.join(dir, path)
240 path = os.path.abspath(path)
241 with open(path) as fd:
242 config = yaml.load(fd)
243 return config
244
245
246def print_banner(log):
247 log.info(' ____ _ ___ _ _ _ _ ')
248 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| \ | | | | | ')
249 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | \| | | | | ')
250 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |\ | |_| | ')
251 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_| \_|\___/ ')
252 log.info(' _ _ _ ')
253 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
254 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
255 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
256 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
257 log.info(' |_| ')
258 log.info('(to stop: press Ctrl-C)')
259
260
261@implementer(IComponent)
262class Main(object):
263
264 def __init__(self):
265
266 self.args = args = parse_args()
267 self.config = load_config(args)
268
269 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
270 self.log = setup_logging(self.config.get('logging', {}),
271 args.instance_id,
272 verbosity_adjust=verbosity_adjust)
273 self.log.info('container-number-extractor',
274 regex=args.container_name_regex)
275
276 self.ponsim_olt_adapter_version = self.get_version()
277 self.log.info('Ponsim-ONU-Adapter-Version', version=
278 self.ponsim_olt_adapter_version)
279
280 if not args.no_banner:
281 print_banner(self.log)
282
283 self.adapter = None
284 # Create a unique instance id using the passed-in instance id and
285 # UTC timestamp
286 current_time = arrow.utcnow().timestamp
287 self.instance_id = self.args.instance_id + '_' + str(current_time)
288
289 self.core_topic = args.core_topic
290 self.listening_topic = args.name
291 self.startup_components()
292
293 if not args.no_heartbeat:
294 self.start_heartbeat()
295 self.start_kafka_cluster_heartbeat(self.instance_id)
296
297 def get_version(self):
298 path = defs['version_file']
299 if not path.startswith('/'):
300 dir = os.path.dirname(os.path.abspath(__file__))
301 path = os.path.join(dir, path)
302
303 path = os.path.abspath(path)
304 version_file = open(path, 'r')
305 v = version_file.read()
306
307 # Use Version to validate the version string - exception will be raised
308 # if the version is invalid
309 Version(v)
310
311 version_file.close()
312 return v
313
314 def start(self):
315 self.start_reactor() # will not return except Keyboard interrupt
316
317 def stop(self):
318 pass
319
320 def get_args(self):
321 """Allow access to command line args"""
322 return self.args
323
324 def get_config(self):
325 """Allow access to content of config file"""
326 return self.config
327
328 def _get_adapter_config(self):
329 cfg = AdapterConfig()
330 return cfg
331
332 @inlineCallbacks
333 def startup_components(self):
334 try:
335 self.log.info('starting-internal-components',
336 consul=self.args.consul,
337 etcd=self.args.etcd)
338
339 registry.register('main', self)
340
341 # Update the logger to output the vcore id.
342 self.log = update_logging(instance_id=self.instance_id,
343 vcore_id=None)
344
345 yield registry.register(
346 'kafka_cluster_proxy',
347 KafkaProxy(
348 self.args.consul,
349 self.args.kafka_cluster,
350 config=self.config.get('kafka-cluster-proxy', {})
351 )
352 ).start()
353
354 config = self._get_adapter_config()
355
356 self.core_proxy = CoreProxy(
357 kafka_proxy=None,
358 default_core_topic=self.core_topic,
359 my_listening_topic=self.listening_topic)
360
361 self.adapter_proxy = AdapterProxy(
362 kafka_proxy=None,
363 core_topic=self.core_topic,
364 my_listening_topic=self.listening_topic)
365
366 self.adapter = PonSimOnuAdapter(
367 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
368 config=config)
369 ponsim_request_handler = AdapterRequestFacade(
370 adapter=self.adapter, core_proxy=self.core_proxy)
371
372 yield registry.register(
373 'kafka_adapter_proxy',
374 IKafkaMessagingProxy(
375 kafka_host_port=self.args.kafka_adapter,
376 # TODO: Add KV Store object reference
377 kv_store=self.args.backend,
378 default_topic=self.args.name,
379 group_id_prefix=self.args.instance_id,
380 target_cls=ponsim_request_handler
381 )
382 ).start()
383
384 self.core_proxy.kafka_proxy = get_messaging_proxy()
385 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
386
387 # retry for ever
388 res = yield self._register_with_core(-1)
389
390 self.log.info('started-internal-services')
391
392 except Exception as e:
393 self.log.exception('Failure-to-start-all-components', e=e)
394
395 @inlineCallbacks
396 def shutdown_components(self):
397 """Execute before the reactor is shut down"""
398 self.log.info('exiting-on-keyboard-interrupt')
399 for component in reversed(registry.iterate()):
400 yield component.stop()
401
402 import threading
403 self.log.info('THREADS:')
404 main_thread = threading.current_thread()
405 for t in threading.enumerate():
406 if t is main_thread:
407 continue
408 if not t.isDaemon():
409 continue
410 self.log.info('joining thread {} {}'.format(
411 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
412 t.join()
413
414 def start_reactor(self):
415 from twisted.internet import reactor
416 reactor.callWhenRunning(
417 lambda: self.log.info('twisted-reactor-started'))
418 reactor.addSystemEventTrigger('before', 'shutdown',
419 self.shutdown_components)
420 reactor.run()
421
422 @inlineCallbacks
423 def _register_with_core(self, retries):
424 while 1:
425 try:
426 resp = yield self.core_proxy.register(
427 self.adapter.adapter_descriptor(),
428 self.adapter.device_types())
429 if resp:
430 self.log.info('registered-with-core',
431 coreId=resp.instance_id)
432
433 returnValue(resp)
434 except TimeOutError as e:
435 self.log.warn("timeout-when-registering-with-core", e=e)
436 if retries == 0:
437 self.log.exception("no-more-retries", e=e)
438 raise
439 else:
440 retries = retries if retries < 0 else retries - 1
441 yield asleep(defs['retry_interval'])
442 except Exception as e:
443 self.log.exception("failed-registration", e=e)
444 raise
445
446 def start_heartbeat(self):
447
448 t0 = time.time()
449 t0s = time.ctime(t0)
450
451 def heartbeat():
452 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
453
454 lc = LoopingCall(heartbeat)
455 lc.start(10)
456
457 # Temporary function to send a heartbeat message to the external kafka
458 # broker
459 def start_kafka_cluster_heartbeat(self, instance_id):
460 # For heartbeat we will send a message to a specific "voltha-heartbeat"
461 # topic. The message is a protocol buf
462 # message
463 message = dict(
464 type='heartbeat',
465 adapter=self.args.name,
466 instance=instance_id,
467 ip=get_my_primary_local_ipv4()
468 )
469 topic = defs['heartbeat_topic']
470
471 def send_msg(start_time):
472 try:
473 kafka_cluster_proxy = get_kafka_proxy()
474 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
475 # self.log.debug('kafka-proxy-available')
476 message['ts'] = arrow.utcnow().timestamp
477 message['uptime'] = time.time() - start_time
478 # self.log.debug('start-kafka-heartbeat')
479 kafka_cluster_proxy.send_message(topic, dumps(message))
480 else:
481 self.log.error('kafka-proxy-unavailable')
482 except Exception, e:
483 self.log.exception('failed-sending-message-heartbeat', e=e)
484
485 try:
486 t0 = time.time()
487 lc = LoopingCall(send_msg, t0)
488 lc.start(10)
489 except Exception, e:
490 self.log.exception('failed-kafka-heartbeat', e=e)
491
492
493if __name__ == '__main__':
494 Main().start()