blob: fee9cddec50712488ecb6e0f2efc9c0c1b328c6d [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim ONU Adapter main entry point"""
19
20import argparse
khenaidoob9203542018-09-17 22:56:37 -040021import os
22import time
23
khenaidoo91ecfd62018-11-04 17:13:42 -050024import arrow
khenaidoob9203542018-09-17 22:56:37 -040025import yaml
khenaidoo91ecfd62018-11-04 17:13:42 -050026from packaging.version import Version
khenaidoob9203542018-09-17 22:56:37 -040027from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
khenaidoo91ecfd62018-11-04 17:13:42 -050031
William Kurkianfc0dcda2019-04-08 16:54:36 -040032from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
khenaidoob9203542018-09-17 22:56:37 -040037 get_my_primary_interface
William Kurkianfc0dcda2019-04-08 16:54:36 -040038from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo91ecfd62018-11-04 17:13:42 -050043 get_messaging_proxy
William Kurkianfc0dcda2019-04-08 16:54:36 -040044from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
khenaidoofdbad6e2018-11-06 22:26:38 -050045from ponsim_onu import PonSimOnuAdapter
William Kurkianfc0dcda2019-04-08 16:54:36 -040046from voltha_protos.adapter_pb2 import AdapterConfig
khenaidoob9203542018-09-17 22:56:37 -040047
48defs = dict(
49 version_file='./VERSION',
50 config=os.environ.get('CONFIG', './ponsim_onu.yml'),
51 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
52 '0-9]+)\..*$'),
53 consul=os.environ.get('CONSUL', 'localhost:8500'),
54 name=os.environ.get('NAME', 'ponsim_onu'),
55 vendor=os.environ.get('VENDOR', 'Voltha Project'),
56 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_onu'),
57 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
58 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
59 etcd=os.environ.get('ETCD', 'localhost:2379'),
60 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
61 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
62 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
63 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
64 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
65 backend=os.environ.get('BACKEND', 'none'),
66 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
67 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
68)
69
70
71def parse_args():
72 parser = argparse.ArgumentParser()
73
74 _help = ('Path to ponsim_onu.yml config file (default: %s). '
75 'If relative, it is relative to main.py of ponsim adapter.'
76 % defs['config'])
77 parser.add_argument('-c', '--config',
78 dest='config',
79 action='store',
80 default=defs['config'],
81 help=_help)
82
83 _help = 'Regular expression for extracting conatiner number from ' \
84 'container name (default: %s)' % defs['container_name_regex']
85 parser.add_argument('-X', '--container-number-extractor',
86 dest='container_name_regex',
87 action='store',
88 default=defs['container_name_regex'],
89 help=_help)
90
91 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
92 parser.add_argument('-C', '--consul',
93 dest='consul',
94 action='store',
95 default=defs['consul'],
96 help=_help)
97
98 _help = 'name of this adapter (default: %s)' % defs['name']
99 parser.add_argument('-na', '--name',
100 dest='name',
101 action='store',
102 default=defs['name'],
103 help=_help)
104
105 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
106 parser.add_argument('-ven', '--vendor',
107 dest='vendor',
108 action='store',
109 default=defs['vendor'],
110 help=_help)
111
112 _help = 'supported device type of this adapter (default: %s)' % defs[
113 'device_type']
114 parser.add_argument('-dt', '--device_type',
115 dest='device_type',
116 action='store',
117 default=defs['device_type'],
118 help=_help)
119
120 _help = 'specifies whether the device type accepts bulk flow updates ' \
121 'adapter (default: %s)' % defs['accept_bulk_flow']
122 parser.add_argument('-abf', '--accept_bulk_flow',
123 dest='accept_bulk_flow',
124 action='store',
125 default=defs['accept_bulk_flow'],
126 help=_help)
127
128 _help = 'specifies whether the device type accepts add/remove flow ' \
129 '(default: %s)' % defs['accept_atomic_flow']
130 parser.add_argument('-aaf', '--accept_atomic_flow',
131 dest='accept_atomic_flow',
132 action='store',
133 default=defs['accept_atomic_flow'],
134 help=_help)
135
136 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
137 parser.add_argument('-e', '--etcd',
138 dest='etcd',
139 action='store',
140 default=defs['etcd'],
141 help=_help)
142
143 _help = ('unique string id of this container instance (default: %s)'
144 % defs['instance_id'])
145 parser.add_argument('-i', '--instance-id',
146 dest='instance_id',
147 action='store',
148 default=defs['instance_id'],
149 help=_help)
150
151 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
152 parser.add_argument('-I', '--interface',
153 dest='interface',
154 action='store',
155 default=defs['interface'],
156 help=_help)
157
158 _help = 'omit startup banner log lines'
159 parser.add_argument('-n', '--no-banner',
160 dest='no_banner',
161 action='store_true',
162 default=False,
163 help=_help)
164
165 _help = 'do not emit periodic heartbeat log messages'
166 parser.add_argument('-N', '--no-heartbeat',
167 dest='no_heartbeat',
168 action='store_true',
169 default=False,
170 help=_help)
171
172 _help = "suppress debug and info logs"
173 parser.add_argument('-q', '--quiet',
174 dest='quiet',
175 action='count',
176 help=_help)
177
178 _help = 'enable verbose logging'
179 parser.add_argument('-v', '--verbose',
180 dest='verbose',
181 action='count',
182 help=_help)
183
184 _help = ('use docker container name as conatiner instance id'
185 ' (overrides -i/--instance-id option)')
186 parser.add_argument('--instance-id-is-container-name',
187 dest='instance_id_is_container_name',
188 action='store_true',
189 default=False,
190 help=_help)
191
192 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
193 'If not '
194 'specified (None), the address from the config file is used'
195 % defs['kafka_adapter'])
196 parser.add_argument('-KA', '--kafka_adapter',
197 dest='kafka_adapter',
198 action='store',
199 default=defs['kafka_adapter'],
200 help=_help)
201
202 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
203 'If not '
204 'specified (None), the address from the config file is used'
205 % defs['kafka_cluster'])
206 parser.add_argument('-KC', '--kafka_cluster',
207 dest='kafka_cluster',
208 action='store',
209 default=defs['kafka_cluster'],
210 help=_help)
211
212 _help = 'backend to use for config persitence'
213 parser.add_argument('-b', '--backend',
214 default=defs['backend'],
215 choices=['none', 'consul', 'etcd'],
216 help=_help)
217
218 _help = 'topic of core on the kafka bus'
219 parser.add_argument('-ct', '--core_topic',
220 dest='core_topic',
221 action='store',
222 default=defs['core_topic'],
223 help=_help)
224
225 args = parser.parse_args()
226
227 # post-processing
228
229 if args.instance_id_is_container_name:
230 args.instance_id = get_my_containers_name()
231
232 return args
233
234
235def load_config(args):
236 path = args.config
237 if path.startswith('.'):
238 dir = os.path.dirname(os.path.abspath(__file__))
239 path = os.path.join(dir, path)
240 path = os.path.abspath(path)
241 with open(path) as fd:
242 config = yaml.load(fd)
243 return config
244
245
246def print_banner(log):
247 log.info(' ____ _ ___ _ _ _ _ ')
248 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| \ | | | | | ')
249 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | \| | | | | ')
250 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |\ | |_| | ')
251 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_| \_|\___/ ')
252 log.info(' _ _ _ ')
253 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
254 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
255 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
256 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
257 log.info(' |_| ')
258 log.info('(to stop: press Ctrl-C)')
259
260
261@implementer(IComponent)
262class Main(object):
263
264 def __init__(self):
265
266 self.args = args = parse_args()
267 self.config = load_config(args)
268
269 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
270 self.log = setup_logging(self.config.get('logging', {}),
271 args.instance_id,
272 verbosity_adjust=verbosity_adjust)
273 self.log.info('container-number-extractor',
274 regex=args.container_name_regex)
275
276 self.ponsim_olt_adapter_version = self.get_version()
277 self.log.info('Ponsim-ONU-Adapter-Version', version=
278 self.ponsim_olt_adapter_version)
279
280 if not args.no_banner:
281 print_banner(self.log)
282
khenaidoo91ecfd62018-11-04 17:13:42 -0500283 self.adapter = None
khenaidoob9203542018-09-17 22:56:37 -0400284 # Create a unique instance id using the passed-in instance id and
285 # UTC timestamp
286 current_time = arrow.utcnow().timestamp
287 self.instance_id = self.args.instance_id + '_' + str(current_time)
288
289 self.core_topic = args.core_topic
290 self.listening_topic = args.name
291 self.startup_components()
292
293 if not args.no_heartbeat:
294 self.start_heartbeat()
295 self.start_kafka_cluster_heartbeat(self.instance_id)
296
297 def get_version(self):
298 path = defs['version_file']
299 if not path.startswith('/'):
300 dir = os.path.dirname(os.path.abspath(__file__))
301 path = os.path.join(dir, path)
302
303 path = os.path.abspath(path)
304 version_file = open(path, 'r')
305 v = version_file.read()
306
307 # Use Version to validate the version string - exception will be raised
308 # if the version is invalid
309 Version(v)
310
311 version_file.close()
312 return v
313
314 def start(self):
315 self.start_reactor() # will not return except Keyboard interrupt
316
317 def stop(self):
318 pass
319
320 def get_args(self):
321 """Allow access to command line args"""
322 return self.args
323
324 def get_config(self):
325 """Allow access to content of config file"""
326 return self.config
327
328 def _get_adapter_config(self):
329 cfg = AdapterConfig()
330 return cfg
331
332 @inlineCallbacks
333 def startup_components(self):
334 try:
335 self.log.info('starting-internal-components',
336 consul=self.args.consul,
337 etcd=self.args.etcd)
338
339 registry.register('main', self)
340
341 # Update the logger to output the vcore id.
342 self.log = update_logging(instance_id=self.instance_id,
343 vcore_id=None)
344
345 yield registry.register(
346 'kafka_cluster_proxy',
347 KafkaProxy(
348 self.args.consul,
349 self.args.kafka_cluster,
350 config=self.config.get('kafka-cluster-proxy', {})
351 )
352 ).start()
353
354 config = self._get_adapter_config()
355
356 self.core_proxy = CoreProxy(
357 kafka_proxy=None,
khenaidoo54e0ddf2019-02-27 16:21:33 -0500358 default_core_topic=self.core_topic,
khenaidoob9203542018-09-17 22:56:37 -0400359 my_listening_topic=self.listening_topic)
360
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400361 self.adapter_proxy = AdapterProxy(
362 kafka_proxy=None,
363 core_topic=self.core_topic,
364 my_listening_topic=self.listening_topic)
365
khenaidoo91ecfd62018-11-04 17:13:42 -0500366 self.adapter = PonSimOnuAdapter(
367 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
368 config=config)
khenaidoob9203542018-09-17 22:56:37 -0400369 ponsim_request_handler = AdapterRequestFacade(
khenaidoo54e0ddf2019-02-27 16:21:33 -0500370 adapter=self.adapter, core_proxy=self.core_proxy)
khenaidoob9203542018-09-17 22:56:37 -0400371
372 yield registry.register(
373 'kafka_adapter_proxy',
374 IKafkaMessagingProxy(
375 kafka_host_port=self.args.kafka_adapter,
376 # TODO: Add KV Store object reference
377 kv_store=self.args.backend,
378 default_topic=self.args.name,
khenaidooca301322019-01-09 23:06:32 -0500379 group_id_prefix=self.args.instance_id,
khenaidoob9203542018-09-17 22:56:37 -0400380 target_cls=ponsim_request_handler
381 )
382 ).start()
383
384 self.core_proxy.kafka_proxy = get_messaging_proxy()
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400385 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
khenaidoob9203542018-09-17 22:56:37 -0400386
387 # retry for ever
388 res = yield self._register_with_core(-1)
389
390 self.log.info('started-internal-services')
391
392 except Exception as e:
393 self.log.exception('Failure-to-start-all-components', e=e)
394
395 @inlineCallbacks
396 def shutdown_components(self):
397 """Execute before the reactor is shut down"""
398 self.log.info('exiting-on-keyboard-interrupt')
399 for component in reversed(registry.iterate()):
400 yield component.stop()
401
402 import threading
403 self.log.info('THREADS:')
404 main_thread = threading.current_thread()
405 for t in threading.enumerate():
406 if t is main_thread:
407 continue
408 if not t.isDaemon():
409 continue
410 self.log.info('joining thread {} {}'.format(
411 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
412 t.join()
413
414 def start_reactor(self):
415 from twisted.internet import reactor
416 reactor.callWhenRunning(
417 lambda: self.log.info('twisted-reactor-started'))
418 reactor.addSystemEventTrigger('before', 'shutdown',
419 self.shutdown_components)
420 reactor.run()
421
422 @inlineCallbacks
423 def _register_with_core(self, retries):
khenaidoob9203542018-09-17 22:56:37 -0400424 while 1:
425 try:
khenaidoo91ecfd62018-11-04 17:13:42 -0500426 resp = yield self.core_proxy.register(
427 self.adapter.adapter_descriptor(),
428 self.adapter.device_types())
429 if resp:
430 self.log.info('registered-with-core',
431 coreId=resp.instance_id)
432
khenaidoob9203542018-09-17 22:56:37 -0400433 returnValue(resp)
434 except TimeOutError as e:
435 self.log.warn("timeout-when-registering-with-core", e=e)
436 if retries == 0:
437 self.log.exception("no-more-retries", e=e)
438 raise
439 else:
440 retries = retries if retries < 0 else retries - 1
441 yield asleep(defs['retry_interval'])
442 except Exception as e:
443 self.log.exception("failed-registration", e=e)
444 raise
445
446 def start_heartbeat(self):
447
448 t0 = time.time()
449 t0s = time.ctime(t0)
450
451 def heartbeat():
452 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
453
454 lc = LoopingCall(heartbeat)
455 lc.start(10)
456
457 # Temporary function to send a heartbeat message to the external kafka
458 # broker
459 def start_kafka_cluster_heartbeat(self, instance_id):
460 # For heartbeat we will send a message to a specific "voltha-heartbeat"
461 # topic. The message is a protocol buf
462 # message
463 message = dict(
464 type='heartbeat',
465 adapter=self.args.name,
466 instance=instance_id,
467 ip=get_my_primary_local_ipv4()
468 )
469 topic = defs['heartbeat_topic']
470
471 def send_msg(start_time):
472 try:
473 kafka_cluster_proxy = get_kafka_proxy()
474 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
475 # self.log.debug('kafka-proxy-available')
476 message['ts'] = arrow.utcnow().timestamp
477 message['uptime'] = time.time() - start_time
478 # self.log.debug('start-kafka-heartbeat')
479 kafka_cluster_proxy.send_message(topic, dumps(message))
480 else:
481 self.log.error('kafka-proxy-unavailable')
482 except Exception, e:
483 self.log.exception('failed-sending-message-heartbeat', e=e)
484
485 try:
486 t0 = time.time()
487 lc = LoopingCall(send_msg, t0)
488 lc.start(10)
489 except Exception, e:
490 self.log.exception('failed-kafka-heartbeat', e=e)
491
492
493if __name__ == '__main__':
494 Main().start()