blob: d6418e99a7d82f06c1d598eeb753bbb40925a00e [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim ONU Adapter main entry point"""
19
20import argparse
khenaidoob9203542018-09-17 22:56:37 -040021import os
22import time
23
khenaidoo91ecfd62018-11-04 17:13:42 -050024import arrow
khenaidoob9203542018-09-17 22:56:37 -040025import yaml
khenaidoo91ecfd62018-11-04 17:13:42 -050026from packaging.version import Version
khenaidoob9203542018-09-17 22:56:37 -040027from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
khenaidoo91ecfd62018-11-04 17:13:42 -050031
khenaidoofdbad6e2018-11-06 22:26:38 -050032from python.common.structlog_setup import setup_logging, update_logging
33from python.common.utils.asleep import asleep
34from python.common.utils.deferred_utils import TimeOutError
35from python.common.utils.dockerhelpers import get_my_containers_name
36from python.common.utils.nethelpers import get_my_primary_local_ipv4, \
khenaidoob9203542018-09-17 22:56:37 -040037 get_my_primary_interface
khenaidoofdbad6e2018-11-06 22:26:38 -050038from python.common.utils.registry import registry, IComponent
39from python.adapters.kafka.adapter_proxy import AdapterProxy
40from python.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from python.adapters.kafka.core_proxy import CoreProxy
42from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo91ecfd62018-11-04 17:13:42 -050043 get_messaging_proxy
khenaidoofdbad6e2018-11-06 22:26:38 -050044from python.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from ponsim_onu import PonSimOnuAdapter
46from python.protos import third_party
47from python.protos.adapter_pb2 import AdapterConfig
khenaidoob9203542018-09-17 22:56:37 -040048
49_ = third_party
50
51defs = dict(
52 version_file='./VERSION',
53 config=os.environ.get('CONFIG', './ponsim_onu.yml'),
54 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
55 '0-9]+)\..*$'),
56 consul=os.environ.get('CONSUL', 'localhost:8500'),
57 name=os.environ.get('NAME', 'ponsim_onu'),
58 vendor=os.environ.get('VENDOR', 'Voltha Project'),
59 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_onu'),
60 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
61 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
62 etcd=os.environ.get('ETCD', 'localhost:2379'),
63 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
64 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
65 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
66 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
67 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
68 backend=os.environ.get('BACKEND', 'none'),
69 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
70 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
71)
72
73
74def parse_args():
75 parser = argparse.ArgumentParser()
76
77 _help = ('Path to ponsim_onu.yml config file (default: %s). '
78 'If relative, it is relative to main.py of ponsim adapter.'
79 % defs['config'])
80 parser.add_argument('-c', '--config',
81 dest='config',
82 action='store',
83 default=defs['config'],
84 help=_help)
85
86 _help = 'Regular expression for extracting conatiner number from ' \
87 'container name (default: %s)' % defs['container_name_regex']
88 parser.add_argument('-X', '--container-number-extractor',
89 dest='container_name_regex',
90 action='store',
91 default=defs['container_name_regex'],
92 help=_help)
93
94 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
95 parser.add_argument('-C', '--consul',
96 dest='consul',
97 action='store',
98 default=defs['consul'],
99 help=_help)
100
101 _help = 'name of this adapter (default: %s)' % defs['name']
102 parser.add_argument('-na', '--name',
103 dest='name',
104 action='store',
105 default=defs['name'],
106 help=_help)
107
108 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
109 parser.add_argument('-ven', '--vendor',
110 dest='vendor',
111 action='store',
112 default=defs['vendor'],
113 help=_help)
114
115 _help = 'supported device type of this adapter (default: %s)' % defs[
116 'device_type']
117 parser.add_argument('-dt', '--device_type',
118 dest='device_type',
119 action='store',
120 default=defs['device_type'],
121 help=_help)
122
123 _help = 'specifies whether the device type accepts bulk flow updates ' \
124 'adapter (default: %s)' % defs['accept_bulk_flow']
125 parser.add_argument('-abf', '--accept_bulk_flow',
126 dest='accept_bulk_flow',
127 action='store',
128 default=defs['accept_bulk_flow'],
129 help=_help)
130
131 _help = 'specifies whether the device type accepts add/remove flow ' \
132 '(default: %s)' % defs['accept_atomic_flow']
133 parser.add_argument('-aaf', '--accept_atomic_flow',
134 dest='accept_atomic_flow',
135 action='store',
136 default=defs['accept_atomic_flow'],
137 help=_help)
138
139 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
140 parser.add_argument('-e', '--etcd',
141 dest='etcd',
142 action='store',
143 default=defs['etcd'],
144 help=_help)
145
146 _help = ('unique string id of this container instance (default: %s)'
147 % defs['instance_id'])
148 parser.add_argument('-i', '--instance-id',
149 dest='instance_id',
150 action='store',
151 default=defs['instance_id'],
152 help=_help)
153
154 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
155 parser.add_argument('-I', '--interface',
156 dest='interface',
157 action='store',
158 default=defs['interface'],
159 help=_help)
160
161 _help = 'omit startup banner log lines'
162 parser.add_argument('-n', '--no-banner',
163 dest='no_banner',
164 action='store_true',
165 default=False,
166 help=_help)
167
168 _help = 'do not emit periodic heartbeat log messages'
169 parser.add_argument('-N', '--no-heartbeat',
170 dest='no_heartbeat',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = "suppress debug and info logs"
176 parser.add_argument('-q', '--quiet',
177 dest='quiet',
178 action='count',
179 help=_help)
180
181 _help = 'enable verbose logging'
182 parser.add_argument('-v', '--verbose',
183 dest='verbose',
184 action='count',
185 help=_help)
186
187 _help = ('use docker container name as conatiner instance id'
188 ' (overrides -i/--instance-id option)')
189 parser.add_argument('--instance-id-is-container-name',
190 dest='instance_id_is_container_name',
191 action='store_true',
192 default=False,
193 help=_help)
194
195 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
196 'If not '
197 'specified (None), the address from the config file is used'
198 % defs['kafka_adapter'])
199 parser.add_argument('-KA', '--kafka_adapter',
200 dest='kafka_adapter',
201 action='store',
202 default=defs['kafka_adapter'],
203 help=_help)
204
205 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
206 'If not '
207 'specified (None), the address from the config file is used'
208 % defs['kafka_cluster'])
209 parser.add_argument('-KC', '--kafka_cluster',
210 dest='kafka_cluster',
211 action='store',
212 default=defs['kafka_cluster'],
213 help=_help)
214
215 _help = 'backend to use for config persitence'
216 parser.add_argument('-b', '--backend',
217 default=defs['backend'],
218 choices=['none', 'consul', 'etcd'],
219 help=_help)
220
221 _help = 'topic of core on the kafka bus'
222 parser.add_argument('-ct', '--core_topic',
223 dest='core_topic',
224 action='store',
225 default=defs['core_topic'],
226 help=_help)
227
228 args = parser.parse_args()
229
230 # post-processing
231
232 if args.instance_id_is_container_name:
233 args.instance_id = get_my_containers_name()
234
235 return args
236
237
238def load_config(args):
239 path = args.config
240 if path.startswith('.'):
241 dir = os.path.dirname(os.path.abspath(__file__))
242 path = os.path.join(dir, path)
243 path = os.path.abspath(path)
244 with open(path) as fd:
245 config = yaml.load(fd)
246 return config
247
248
249def print_banner(log):
250 log.info(' ____ _ ___ _ _ _ _ ')
251 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| \ | | | | | ')
252 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | \| | | | | ')
253 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |\ | |_| | ')
254 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_| \_|\___/ ')
255 log.info(' _ _ _ ')
256 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
257 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
258 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
259 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
260 log.info(' |_| ')
261 log.info('(to stop: press Ctrl-C)')
262
263
264@implementer(IComponent)
265class Main(object):
266
267 def __init__(self):
268
269 self.args = args = parse_args()
270 self.config = load_config(args)
271
272 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
273 self.log = setup_logging(self.config.get('logging', {}),
274 args.instance_id,
275 verbosity_adjust=verbosity_adjust)
276 self.log.info('container-number-extractor',
277 regex=args.container_name_regex)
278
279 self.ponsim_olt_adapter_version = self.get_version()
280 self.log.info('Ponsim-ONU-Adapter-Version', version=
281 self.ponsim_olt_adapter_version)
282
283 if not args.no_banner:
284 print_banner(self.log)
285
khenaidoo91ecfd62018-11-04 17:13:42 -0500286 self.adapter = None
khenaidoob9203542018-09-17 22:56:37 -0400287 # Create a unique instance id using the passed-in instance id and
288 # UTC timestamp
289 current_time = arrow.utcnow().timestamp
290 self.instance_id = self.args.instance_id + '_' + str(current_time)
291
292 self.core_topic = args.core_topic
293 self.listening_topic = args.name
294 self.startup_components()
295
296 if not args.no_heartbeat:
297 self.start_heartbeat()
298 self.start_kafka_cluster_heartbeat(self.instance_id)
299
300 def get_version(self):
301 path = defs['version_file']
302 if not path.startswith('/'):
303 dir = os.path.dirname(os.path.abspath(__file__))
304 path = os.path.join(dir, path)
305
306 path = os.path.abspath(path)
307 version_file = open(path, 'r')
308 v = version_file.read()
309
310 # Use Version to validate the version string - exception will be raised
311 # if the version is invalid
312 Version(v)
313
314 version_file.close()
315 return v
316
317 def start(self):
318 self.start_reactor() # will not return except Keyboard interrupt
319
320 def stop(self):
321 pass
322
323 def get_args(self):
324 """Allow access to command line args"""
325 return self.args
326
327 def get_config(self):
328 """Allow access to content of config file"""
329 return self.config
330
331 def _get_adapter_config(self):
332 cfg = AdapterConfig()
333 return cfg
334
335 @inlineCallbacks
336 def startup_components(self):
337 try:
338 self.log.info('starting-internal-components',
339 consul=self.args.consul,
340 etcd=self.args.etcd)
341
342 registry.register('main', self)
343
344 # Update the logger to output the vcore id.
345 self.log = update_logging(instance_id=self.instance_id,
346 vcore_id=None)
347
348 yield registry.register(
349 'kafka_cluster_proxy',
350 KafkaProxy(
351 self.args.consul,
352 self.args.kafka_cluster,
353 config=self.config.get('kafka-cluster-proxy', {})
354 )
355 ).start()
356
357 config = self._get_adapter_config()
358
359 self.core_proxy = CoreProxy(
360 kafka_proxy=None,
361 core_topic=self.core_topic,
362 my_listening_topic=self.listening_topic)
363
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400364 self.adapter_proxy = AdapterProxy(
365 kafka_proxy=None,
366 core_topic=self.core_topic,
367 my_listening_topic=self.listening_topic)
368
khenaidoo91ecfd62018-11-04 17:13:42 -0500369 self.adapter = PonSimOnuAdapter(
370 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
371 config=config)
khenaidoob9203542018-09-17 22:56:37 -0400372 ponsim_request_handler = AdapterRequestFacade(
khenaidoo91ecfd62018-11-04 17:13:42 -0500373 adapter=self.adapter)
khenaidoob9203542018-09-17 22:56:37 -0400374
375 yield registry.register(
376 'kafka_adapter_proxy',
377 IKafkaMessagingProxy(
378 kafka_host_port=self.args.kafka_adapter,
379 # TODO: Add KV Store object reference
380 kv_store=self.args.backend,
381 default_topic=self.args.name,
382 target_cls=ponsim_request_handler
383 )
384 ).start()
385
386 self.core_proxy.kafka_proxy = get_messaging_proxy()
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400387 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
khenaidoob9203542018-09-17 22:56:37 -0400388
389 # retry for ever
390 res = yield self._register_with_core(-1)
391
392 self.log.info('started-internal-services')
393
394 except Exception as e:
395 self.log.exception('Failure-to-start-all-components', e=e)
396
397 @inlineCallbacks
398 def shutdown_components(self):
399 """Execute before the reactor is shut down"""
400 self.log.info('exiting-on-keyboard-interrupt')
401 for component in reversed(registry.iterate()):
402 yield component.stop()
403
404 import threading
405 self.log.info('THREADS:')
406 main_thread = threading.current_thread()
407 for t in threading.enumerate():
408 if t is main_thread:
409 continue
410 if not t.isDaemon():
411 continue
412 self.log.info('joining thread {} {}'.format(
413 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
414 t.join()
415
416 def start_reactor(self):
417 from twisted.internet import reactor
418 reactor.callWhenRunning(
419 lambda: self.log.info('twisted-reactor-started'))
420 reactor.addSystemEventTrigger('before', 'shutdown',
421 self.shutdown_components)
422 reactor.run()
423
424 @inlineCallbacks
425 def _register_with_core(self, retries):
khenaidoob9203542018-09-17 22:56:37 -0400426 while 1:
427 try:
khenaidoo91ecfd62018-11-04 17:13:42 -0500428 resp = yield self.core_proxy.register(
429 self.adapter.adapter_descriptor(),
430 self.adapter.device_types())
431 if resp:
432 self.log.info('registered-with-core',
433 coreId=resp.instance_id)
434
khenaidoob9203542018-09-17 22:56:37 -0400435 returnValue(resp)
436 except TimeOutError as e:
437 self.log.warn("timeout-when-registering-with-core", e=e)
438 if retries == 0:
439 self.log.exception("no-more-retries", e=e)
440 raise
441 else:
442 retries = retries if retries < 0 else retries - 1
443 yield asleep(defs['retry_interval'])
444 except Exception as e:
445 self.log.exception("failed-registration", e=e)
446 raise
447
448 def start_heartbeat(self):
449
450 t0 = time.time()
451 t0s = time.ctime(t0)
452
453 def heartbeat():
454 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
455
456 lc = LoopingCall(heartbeat)
457 lc.start(10)
458
459 # Temporary function to send a heartbeat message to the external kafka
460 # broker
461 def start_kafka_cluster_heartbeat(self, instance_id):
462 # For heartbeat we will send a message to a specific "voltha-heartbeat"
463 # topic. The message is a protocol buf
464 # message
465 message = dict(
466 type='heartbeat',
467 adapter=self.args.name,
468 instance=instance_id,
469 ip=get_my_primary_local_ipv4()
470 )
471 topic = defs['heartbeat_topic']
472
473 def send_msg(start_time):
474 try:
475 kafka_cluster_proxy = get_kafka_proxy()
476 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
477 # self.log.debug('kafka-proxy-available')
478 message['ts'] = arrow.utcnow().timestamp
479 message['uptime'] = time.time() - start_time
480 # self.log.debug('start-kafka-heartbeat')
481 kafka_cluster_proxy.send_message(topic, dumps(message))
482 else:
483 self.log.error('kafka-proxy-unavailable')
484 except Exception, e:
485 self.log.exception('failed-sending-message-heartbeat', e=e)
486
487 try:
488 t0 = time.time()
489 lc = LoopingCall(send_msg, t0)
490 lc.start(10)
491 except Exception, e:
492 self.log.exception('failed-kafka-heartbeat', e=e)
493
494
495if __name__ == '__main__':
496 Main().start()