blob: 5a4918461279e2246347ee667153cf28778a4a11 [file] [log] [blame]
Scott Baker12f1ef82019-10-14 13:06:14 -07001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim OLT Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
32from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
37 get_my_primary_interface
38from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
43 get_messaging_proxy
44from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from ponsim_olt import PonSimOltAdapter
46from voltha_protos.adapter_pb2 import AdapterConfig
47
48defs = dict(
49 version_file='./VERSION',
50 config=os.environ.get('CONFIG', './ponsim_olt.yml'),
51 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
52 '0-9]+)\..*$'),
53 consul=os.environ.get('CONSUL', 'localhost:8500'),
54 name=os.environ.get('NAME', 'ponsim_olt'),
55 vendor=os.environ.get('VENDOR', 'Voltha Project'),
56 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_olt'),
57 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
58 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
59 etcd=os.environ.get('ETCD', 'localhost:2379'),
60 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
61 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
62 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
63 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
64 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
65 backend=os.environ.get('BACKEND', 'none'),
66 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
67 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
68)
69
70
71def parse_args():
72 parser = argparse.ArgumentParser()
73
74 _help = ('Path to ponsim_onu.yml config file (default: %s). '
75 'If relative, it is relative to main.py of ponsim adapter.'
76 % defs['config'])
77 parser.add_argument('-c', '--config',
78 dest='config',
79 action='store',
80 default=defs['config'],
81 help=_help)
82
83 _help = 'Regular expression for extracting conatiner number from ' \
84 'container name (default: %s)' % defs['container_name_regex']
85 parser.add_argument('-X', '--container-number-extractor',
86 dest='container_name_regex',
87 action='store',
88 default=defs['container_name_regex'],
89 help=_help)
90
91 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
92 parser.add_argument('-C', '--consul',
93 dest='consul',
94 action='store',
95 default=defs['consul'],
96 help=_help)
97
98 _help = 'name of this adapter (default: %s)' % defs['name']
99 parser.add_argument('-na', '--name',
100 dest='name',
101 action='store',
102 default=defs['name'],
103 help=_help)
104
105 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
106 parser.add_argument('-ven', '--vendor',
107 dest='vendor',
108 action='store',
109 default=defs['vendor'],
110 help=_help)
111
112 _help = 'supported device type of this adapter (default: %s)' % defs[
113 'device_type']
114 parser.add_argument('-dt', '--device_type',
115 dest='device_type',
116 action='store',
117 default=defs['device_type'],
118 help=_help)
119
120 _help = 'specifies whether the device type accepts bulk flow updates ' \
121 'adapter (default: %s)' % defs['accept_bulk_flow']
122 parser.add_argument('-abf', '--accept_bulk_flow',
123 dest='accept_bulk_flow',
124 action='store',
125 default=defs['accept_bulk_flow'],
126 help=_help)
127
128 _help = 'specifies whether the device type accepts add/remove flow ' \
129 '(default: %s)' % defs['accept_atomic_flow']
130 parser.add_argument('-aaf', '--accept_atomic_flow',
131 dest='accept_atomic_flow',
132 action='store',
133 default=defs['accept_atomic_flow'],
134 help=_help)
135
136 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
137 parser.add_argument('-e', '--etcd',
138 dest='etcd',
139 action='store',
140 default=defs['etcd'],
141 help=_help)
142
143 _help = ('unique string id of this container instance (default: %s)'
144 % defs['instance_id'])
145 parser.add_argument('-i', '--instance-id',
146 dest='instance_id',
147 action='store',
148 default=defs['instance_id'],
149 help=_help)
150
151 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
152 parser.add_argument('-I', '--interface',
153 dest='interface',
154 action='store',
155 default=defs['interface'],
156 help=_help)
157
158 _help = 'omit startup banner log lines'
159 parser.add_argument('-n', '--no-banner',
160 dest='no_banner',
161 action='store_true',
162 default=False,
163 help=_help)
164
165 _help = 'do not emit periodic heartbeat log messages'
166 parser.add_argument('-N', '--no-heartbeat',
167 dest='no_heartbeat',
168 action='store_true',
169 default=False,
170 help=_help)
171
172 _help = "suppress debug and info logs"
173 parser.add_argument('-q', '--quiet',
174 dest='quiet',
175 action='count',
176 help=_help)
177
178 _help = 'enable verbose logging'
179 parser.add_argument('-v', '--verbose',
180 dest='verbose',
181 action='count',
182 help=_help)
183
184 _help = ('use docker container name as conatiner instance id'
185 ' (overrides -i/--instance-id option)')
186 parser.add_argument('--instance-id-is-container-name',
187 dest='instance_id_is_container_name',
188 action='store_true',
189 default=False,
190 help=_help)
191
192 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
193 'If not '
194 'specified (None), the address from the config file is used'
195 % defs['kafka_adapter'])
196 parser.add_argument('-KA', '--kafka_adapter',
197 dest='kafka_adapter',
198 action='store',
199 default=defs['kafka_adapter'],
200 help=_help)
201
202 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
203 'If not '
204 'specified (None), the address from the config file is used'
205 % defs['kafka_cluster'])
206 parser.add_argument('-KC', '--kafka_cluster',
207 dest='kafka_cluster',
208 action='store',
209 default=defs['kafka_cluster'],
210 help=_help)
211
212 _help = 'backend to use for config persitence'
213 parser.add_argument('-b', '--backend',
214 default=defs['backend'],
215 choices=['none', 'consul', 'etcd'],
216 help=_help)
217
218 _help = 'topic of core on the kafka bus'
219 parser.add_argument('-ct', '--core_topic',
220 dest='core_topic',
221 action='store',
222 default=defs['core_topic'],
223 help=_help)
224
225 args = parser.parse_args()
226
227 # post-processing
228
229 if args.instance_id_is_container_name:
230 args.instance_id = get_my_containers_name()
231
232 return args
233
234
235def load_config(args):
236 path = args.config
237 if path.startswith('.'):
238 dir = os.path.dirname(os.path.abspath(__file__))
239 path = os.path.join(dir, path)
240 path = os.path.abspath(path)
241 with open(path) as fd:
242 config = yaml.load(fd)
243 return config
244
245
246def print_banner(log):
247 log.info(' ____ _ ___ _ _____ ')
248 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| | |_ _| ')
249 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | | | | ')
250 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |___| | ')
251 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_____|_| ')
252 log.info(' ')
253 log.info(' _ _ _ ')
254 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
255 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
256 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
257 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
258 log.info(' |_| ')
259 log.info('(to stop: press Ctrl-C)')
260
261
262@implementer(IComponent)
263class Main(object):
264
265 def __init__(self):
266
267 self.args = args = parse_args()
268 self.config = load_config(args)
269
270 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
271 self.log = setup_logging(self.config.get('logging', {}),
272 args.instance_id,
273 verbosity_adjust=verbosity_adjust)
274 self.log.info('container-number-extractor',
275 regex=args.container_name_regex)
276
277 self.ponsim_olt_adapter_version = self.get_version()
278 self.log.info('Ponsim-OLT-Adapter-Version', version=
279 self.ponsim_olt_adapter_version)
280
281 if not args.no_banner:
282 print_banner(self.log)
283
284 self.adapter = None
285 # Create a unique instance id using the passed-in instance id and
286 # UTC timestamp
287 current_time = arrow.utcnow().timestamp
288 self.instance_id = self.args.instance_id + '_' + str(current_time)
289
290 self.core_topic = args.core_topic
291 self.listening_topic = args.name
292 self.startup_components()
293
294 if not args.no_heartbeat:
295 self.start_heartbeat()
296 self.start_kafka_cluster_heartbeat(self.instance_id)
297
298 def get_version(self):
299 path = defs['version_file']
300 if not path.startswith('/'):
301 dir = os.path.dirname(os.path.abspath(__file__))
302 path = os.path.join(dir, path)
303
304 path = os.path.abspath(path)
305 version_file = open(path, 'r')
306 v = version_file.read()
307
308 # Use Version to validate the version string - exception will be raised
309 # if the version is invalid
310 Version(v)
311
312 version_file.close()
313 return v
314
315 def start(self):
316 self.start_reactor() # will not return except Keyboard interrupt
317
318 def stop(self):
319 pass
320
321 def get_args(self):
322 """Allow access to command line args"""
323 return self.args
324
325 def get_config(self):
326 """Allow access to content of config file"""
327 return self.config
328
329 def _get_adapter_config(self):
330 cfg = AdapterConfig()
331 return cfg
332
333 @inlineCallbacks
334 def startup_components(self):
335 try:
336 self.log.info('starting-internal-components',
337 consul=self.args.consul,
338 etcd=self.args.etcd)
339
340 registry.register('main', self)
341
342 # Update the logger to output the vcore id.
343 self.log = update_logging(instance_id=self.instance_id,
344 vcore_id=None)
345
346 yield registry.register(
347 'kafka_cluster_proxy',
348 KafkaProxy(
349 self.args.consul,
350 self.args.kafka_cluster,
351 config=self.config.get('kafka-cluster-proxy', {})
352 )
353 ).start()
354
355 config = self._get_adapter_config()
356
357 self.core_proxy = CoreProxy(
358 kafka_proxy=None,
359 default_core_topic=self.core_topic,
360 my_listening_topic=self.listening_topic)
361
362 self.adapter_proxy = AdapterProxy(
363 kafka_proxy=None,
364 core_topic=self.core_topic,
365 my_listening_topic=self.listening_topic)
366
367 self.adapter = PonSimOltAdapter(core_proxy=self.core_proxy,
368 adapter_proxy=self.adapter_proxy,
369 config=config)
370
371 ponsim_request_handler = AdapterRequestFacade(adapter=self.adapter,
372 core_proxy=self.core_proxy)
373
374 yield registry.register(
375 'kafka_adapter_proxy',
376 IKafkaMessagingProxy(
377 kafka_host_port=self.args.kafka_adapter,
378 # TODO: Add KV Store object reference
379 kv_store=self.args.backend,
380 default_topic=self.args.name,
381 group_id_prefix=self.args.instance_id,
382 target_cls=ponsim_request_handler
383 )
384 ).start()
385
386 self.core_proxy.kafka_proxy = get_messaging_proxy()
387 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
388
389 # retry for ever
390 res = yield self._register_with_core(-1)
391
392 self.log.info('started-internal-services')
393
394 except Exception as e:
395 self.log.exception('Failure-to-start-all-components', e=e)
396
397 @inlineCallbacks
398 def shutdown_components(self):
399 """Execute before the reactor is shut down"""
400 self.log.info('exiting-on-keyboard-interrupt')
401 for component in reversed(registry.iterate()):
402 yield component.stop()
403
404 import threading
405 self.log.info('THREADS:')
406 main_thread = threading.current_thread()
407 for t in threading.enumerate():
408 if t is main_thread:
409 continue
410 if not t.isDaemon():
411 continue
412 self.log.info('joining thread {} {}'.format(
413 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
414 t.join()
415
416 def start_reactor(self):
417 from twisted.internet import reactor
418 reactor.callWhenRunning(
419 lambda: self.log.info('twisted-reactor-started'))
420 reactor.addSystemEventTrigger('before', 'shutdown',
421 self.shutdown_components)
422 reactor.run()
423
424 @inlineCallbacks
425 def _register_with_core(self, retries):
426 while 1:
427 try:
428 resp = yield self.core_proxy.register(
429 self.adapter.adapter_descriptor(),
430 self.adapter.device_types())
431 if resp:
432 self.log.info('registered-with-core',
433 coreId=resp.instance_id)
434 returnValue(resp)
435 except TimeOutError as e:
436 self.log.warn("timeout-when-registering-with-core", e=e)
437 if retries == 0:
438 self.log.exception("no-more-retries", e=e)
439 raise
440 else:
441 retries = retries if retries < 0 else retries - 1
442 yield asleep(defs['retry_interval'])
443 except Exception as e:
444 self.log.exception("failed-registration", e=e)
445 raise
446
447 def start_heartbeat(self):
448
449 t0 = time.time()
450 t0s = time.ctime(t0)
451
452 def heartbeat():
453 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
454
455 lc = LoopingCall(heartbeat)
456 lc.start(10)
457
458 # Temporary function to send a heartbeat message to the external kafka
459 # broker
460 def start_kafka_cluster_heartbeat(self, instance_id):
461 # For heartbeat we will send a message to a specific "voltha-heartbeat"
462 # topic. The message is a protocol buf
463 # message
464 message = dict(
465 type='heartbeat',
466 adapter=self.args.name,
467 instance=instance_id,
468 ip=get_my_primary_local_ipv4()
469 )
470 topic = defs['heartbeat_topic']
471
472 def send_msg(start_time):
473 try:
474 kafka_cluster_proxy = get_kafka_proxy()
475 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
476 # self.log.debug('kafka-proxy-available')
477 message['ts'] = arrow.utcnow().timestamp
478 message['uptime'] = time.time() - start_time
479 # self.log.debug('start-kafka-heartbeat')
480 kafka_cluster_proxy.send_message(topic, dumps(message))
481 else:
482 self.log.error('kafka-proxy-unavailable')
483 except Exception, e:
484 self.log.exception('failed-sending-message-heartbeat', e=e)
485
486 try:
487 t0 = time.time()
488 lc = LoopingCall(send_msg, t0)
489 lc.start(10)
490 except Exception, e:
491 self.log.exception('failed-kafka-heartbeat', e=e)
492
493
494if __name__ == '__main__':
495 Main().start()