blob: 63e2bc4c6828ab66c25a7a399c5e822a06345abf [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim ONU Adapter main entry point"""
19
20import argparse
21import arrow
22import os
23import time
24
25import yaml
26from simplejson import dumps
27from twisted.internet.defer import inlineCallbacks, returnValue
28from twisted.internet.task import LoopingCall
29from zope.interface import implementer
30from adapters.protos import third_party
31from adapters.common.structlog_setup import setup_logging, update_logging
32from adapters.common.utils.dockerhelpers import get_my_containers_name
33from adapters.common.utils.nethelpers import get_my_primary_local_ipv4, \
34 get_my_primary_interface
35from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
36from adapters.common.utils.registry import registry, IComponent
37from packaging.version import Version
38from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, get_messaging_proxy
39from adapters.ponsim_onu.ponsim_onu import PonSimOnuAdapter
40from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
41from adapters.kafka.adapter_request_facade import AdapterRequestFacade
42from adapters.kafka.core_proxy import CoreProxy
43from adapters.common.utils.deferred_utils import TimeOutError
44from adapters.common.utils.asleep import asleep
45
46_ = third_party
47
48defs = dict(
49 version_file='./VERSION',
50 config=os.environ.get('CONFIG', './ponsim_onu.yml'),
51 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
52 '0-9]+)\..*$'),
53 consul=os.environ.get('CONSUL', 'localhost:8500'),
54 name=os.environ.get('NAME', 'ponsim_onu'),
55 vendor=os.environ.get('VENDOR', 'Voltha Project'),
56 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_onu'),
57 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
58 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
59 etcd=os.environ.get('ETCD', 'localhost:2379'),
60 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
61 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
62 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
63 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
64 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
65 backend=os.environ.get('BACKEND', 'none'),
66 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
67 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
68)
69
70
71def parse_args():
72 parser = argparse.ArgumentParser()
73
74 _help = ('Path to ponsim_onu.yml config file (default: %s). '
75 'If relative, it is relative to main.py of ponsim adapter.'
76 % defs['config'])
77 parser.add_argument('-c', '--config',
78 dest='config',
79 action='store',
80 default=defs['config'],
81 help=_help)
82
83 _help = 'Regular expression for extracting conatiner number from ' \
84 'container name (default: %s)' % defs['container_name_regex']
85 parser.add_argument('-X', '--container-number-extractor',
86 dest='container_name_regex',
87 action='store',
88 default=defs['container_name_regex'],
89 help=_help)
90
91 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
92 parser.add_argument('-C', '--consul',
93 dest='consul',
94 action='store',
95 default=defs['consul'],
96 help=_help)
97
98 _help = 'name of this adapter (default: %s)' % defs['name']
99 parser.add_argument('-na', '--name',
100 dest='name',
101 action='store',
102 default=defs['name'],
103 help=_help)
104
105 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
106 parser.add_argument('-ven', '--vendor',
107 dest='vendor',
108 action='store',
109 default=defs['vendor'],
110 help=_help)
111
112 _help = 'supported device type of this adapter (default: %s)' % defs[
113 'device_type']
114 parser.add_argument('-dt', '--device_type',
115 dest='device_type',
116 action='store',
117 default=defs['device_type'],
118 help=_help)
119
120 _help = 'specifies whether the device type accepts bulk flow updates ' \
121 'adapter (default: %s)' % defs['accept_bulk_flow']
122 parser.add_argument('-abf', '--accept_bulk_flow',
123 dest='accept_bulk_flow',
124 action='store',
125 default=defs['accept_bulk_flow'],
126 help=_help)
127
128 _help = 'specifies whether the device type accepts add/remove flow ' \
129 '(default: %s)' % defs['accept_atomic_flow']
130 parser.add_argument('-aaf', '--accept_atomic_flow',
131 dest='accept_atomic_flow',
132 action='store',
133 default=defs['accept_atomic_flow'],
134 help=_help)
135
136 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
137 parser.add_argument('-e', '--etcd',
138 dest='etcd',
139 action='store',
140 default=defs['etcd'],
141 help=_help)
142
143 _help = ('unique string id of this container instance (default: %s)'
144 % defs['instance_id'])
145 parser.add_argument('-i', '--instance-id',
146 dest='instance_id',
147 action='store',
148 default=defs['instance_id'],
149 help=_help)
150
151 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
152 parser.add_argument('-I', '--interface',
153 dest='interface',
154 action='store',
155 default=defs['interface'],
156 help=_help)
157
158 _help = 'omit startup banner log lines'
159 parser.add_argument('-n', '--no-banner',
160 dest='no_banner',
161 action='store_true',
162 default=False,
163 help=_help)
164
165 _help = 'do not emit periodic heartbeat log messages'
166 parser.add_argument('-N', '--no-heartbeat',
167 dest='no_heartbeat',
168 action='store_true',
169 default=False,
170 help=_help)
171
172 _help = "suppress debug and info logs"
173 parser.add_argument('-q', '--quiet',
174 dest='quiet',
175 action='count',
176 help=_help)
177
178 _help = 'enable verbose logging'
179 parser.add_argument('-v', '--verbose',
180 dest='verbose',
181 action='count',
182 help=_help)
183
184 _help = ('use docker container name as conatiner instance id'
185 ' (overrides -i/--instance-id option)')
186 parser.add_argument('--instance-id-is-container-name',
187 dest='instance_id_is_container_name',
188 action='store_true',
189 default=False,
190 help=_help)
191
192 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
193 'If not '
194 'specified (None), the address from the config file is used'
195 % defs['kafka_adapter'])
196 parser.add_argument('-KA', '--kafka_adapter',
197 dest='kafka_adapter',
198 action='store',
199 default=defs['kafka_adapter'],
200 help=_help)
201
202 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
203 'If not '
204 'specified (None), the address from the config file is used'
205 % defs['kafka_cluster'])
206 parser.add_argument('-KC', '--kafka_cluster',
207 dest='kafka_cluster',
208 action='store',
209 default=defs['kafka_cluster'],
210 help=_help)
211
212 _help = 'backend to use for config persitence'
213 parser.add_argument('-b', '--backend',
214 default=defs['backend'],
215 choices=['none', 'consul', 'etcd'],
216 help=_help)
217
218 _help = 'topic of core on the kafka bus'
219 parser.add_argument('-ct', '--core_topic',
220 dest='core_topic',
221 action='store',
222 default=defs['core_topic'],
223 help=_help)
224
225 args = parser.parse_args()
226
227 # post-processing
228
229 if args.instance_id_is_container_name:
230 args.instance_id = get_my_containers_name()
231
232 return args
233
234
235def load_config(args):
236 path = args.config
237 if path.startswith('.'):
238 dir = os.path.dirname(os.path.abspath(__file__))
239 path = os.path.join(dir, path)
240 path = os.path.abspath(path)
241 with open(path) as fd:
242 config = yaml.load(fd)
243 return config
244
245
246def print_banner(log):
247 log.info(' ____ _ ___ _ _ _ _ ')
248 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| \ | | | | | ')
249 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | \| | | | | ')
250 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |\ | |_| | ')
251 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_| \_|\___/ ')
252 log.info(' _ _ _ ')
253 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
254 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
255 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
256 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
257 log.info(' |_| ')
258 log.info('(to stop: press Ctrl-C)')
259
260
261@implementer(IComponent)
262class Main(object):
263
264 def __init__(self):
265
266 self.args = args = parse_args()
267 self.config = load_config(args)
268
269 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
270 self.log = setup_logging(self.config.get('logging', {}),
271 args.instance_id,
272 verbosity_adjust=verbosity_adjust)
273 self.log.info('container-number-extractor',
274 regex=args.container_name_regex)
275
276 self.ponsim_olt_adapter_version = self.get_version()
277 self.log.info('Ponsim-ONU-Adapter-Version', version=
278 self.ponsim_olt_adapter_version)
279
280 if not args.no_banner:
281 print_banner(self.log)
282
283 # Create a unique instance id using the passed-in instance id and
284 # UTC timestamp
285 current_time = arrow.utcnow().timestamp
286 self.instance_id = self.args.instance_id + '_' + str(current_time)
287
288 self.core_topic = args.core_topic
289 self.listening_topic = args.name
290 self.startup_components()
291
292 if not args.no_heartbeat:
293 self.start_heartbeat()
294 self.start_kafka_cluster_heartbeat(self.instance_id)
295
296 def get_version(self):
297 path = defs['version_file']
298 if not path.startswith('/'):
299 dir = os.path.dirname(os.path.abspath(__file__))
300 path = os.path.join(dir, path)
301
302 path = os.path.abspath(path)
303 version_file = open(path, 'r')
304 v = version_file.read()
305
306 # Use Version to validate the version string - exception will be raised
307 # if the version is invalid
308 Version(v)
309
310 version_file.close()
311 return v
312
313 def start(self):
314 self.start_reactor() # will not return except Keyboard interrupt
315
316 def stop(self):
317 pass
318
319 def get_args(self):
320 """Allow access to command line args"""
321 return self.args
322
323 def get_config(self):
324 """Allow access to content of config file"""
325 return self.config
326
327 def _get_adapter_config(self):
328 cfg = AdapterConfig()
329 return cfg
330
331 @inlineCallbacks
332 def startup_components(self):
333 try:
334 self.log.info('starting-internal-components',
335 consul=self.args.consul,
336 etcd=self.args.etcd)
337
338 registry.register('main', self)
339
340 # Update the logger to output the vcore id.
341 self.log = update_logging(instance_id=self.instance_id,
342 vcore_id=None)
343
344 yield registry.register(
345 'kafka_cluster_proxy',
346 KafkaProxy(
347 self.args.consul,
348 self.args.kafka_cluster,
349 config=self.config.get('kafka-cluster-proxy', {})
350 )
351 ).start()
352
353 config = self._get_adapter_config()
354
355 self.core_proxy = CoreProxy(
356 kafka_proxy=None,
357 core_topic=self.core_topic,
358 my_listening_topic=self.listening_topic)
359
360 ponsim_onu_adapter = PonSimOnuAdapter(
361 adapter_agent=self.core_proxy, config=config)
362 ponsim_request_handler = AdapterRequestFacade(
363 adapter=ponsim_onu_adapter)
364
365 yield registry.register(
366 'kafka_adapter_proxy',
367 IKafkaMessagingProxy(
368 kafka_host_port=self.args.kafka_adapter,
369 # TODO: Add KV Store object reference
370 kv_store=self.args.backend,
371 default_topic=self.args.name,
372 target_cls=ponsim_request_handler
373 )
374 ).start()
375
376 self.core_proxy.kafka_proxy = get_messaging_proxy()
377
378 # retry for ever
379 res = yield self._register_with_core(-1)
380
381 self.log.info('started-internal-services')
382
383 except Exception as e:
384 self.log.exception('Failure-to-start-all-components', e=e)
385
386 @inlineCallbacks
387 def shutdown_components(self):
388 """Execute before the reactor is shut down"""
389 self.log.info('exiting-on-keyboard-interrupt')
390 for component in reversed(registry.iterate()):
391 yield component.stop()
392
393 import threading
394 self.log.info('THREADS:')
395 main_thread = threading.current_thread()
396 for t in threading.enumerate():
397 if t is main_thread:
398 continue
399 if not t.isDaemon():
400 continue
401 self.log.info('joining thread {} {}'.format(
402 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
403 t.join()
404
405 def start_reactor(self):
406 from twisted.internet import reactor
407 reactor.callWhenRunning(
408 lambda: self.log.info('twisted-reactor-started'))
409 reactor.addSystemEventTrigger('before', 'shutdown',
410 self.shutdown_components)
411 reactor.run()
412
413 @inlineCallbacks
414 def _register_with_core(self, retries):
415 # Send registration to Core with adapter specs
416 adapter = Adapter()
417 adapter.id = self.args.name
418 adapter.vendor = self.args.name
419 adapter.version = self.ponsim_olt_adapter_version
420 while 1:
421 try:
422 resp = yield self.core_proxy.register(adapter)
423 self.log.info('registration-response', response=resp)
424 returnValue(resp)
425 except TimeOutError as e:
426 self.log.warn("timeout-when-registering-with-core", e=e)
427 if retries == 0:
428 self.log.exception("no-more-retries", e=e)
429 raise
430 else:
431 retries = retries if retries < 0 else retries - 1
432 yield asleep(defs['retry_interval'])
433 except Exception as e:
434 self.log.exception("failed-registration", e=e)
435 raise
436
437 def start_heartbeat(self):
438
439 t0 = time.time()
440 t0s = time.ctime(t0)
441
442 def heartbeat():
443 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
444
445 lc = LoopingCall(heartbeat)
446 lc.start(10)
447
448 # Temporary function to send a heartbeat message to the external kafka
449 # broker
450 def start_kafka_cluster_heartbeat(self, instance_id):
451 # For heartbeat we will send a message to a specific "voltha-heartbeat"
452 # topic. The message is a protocol buf
453 # message
454 message = dict(
455 type='heartbeat',
456 adapter=self.args.name,
457 instance=instance_id,
458 ip=get_my_primary_local_ipv4()
459 )
460 topic = defs['heartbeat_topic']
461
462 def send_msg(start_time):
463 try:
464 kafka_cluster_proxy = get_kafka_proxy()
465 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
466 # self.log.debug('kafka-proxy-available')
467 message['ts'] = arrow.utcnow().timestamp
468 message['uptime'] = time.time() - start_time
469 # self.log.debug('start-kafka-heartbeat')
470 kafka_cluster_proxy.send_message(topic, dumps(message))
471 else:
472 self.log.error('kafka-proxy-unavailable')
473 except Exception, e:
474 self.log.exception('failed-sending-message-heartbeat', e=e)
475
476 try:
477 t0 = time.time()
478 lc = LoopingCall(send_msg, t0)
479 lc.start(10)
480 except Exception, e:
481 self.log.exception('failed-kafka-heartbeat', e=e)
482
483
484if __name__ == '__main__':
485 Main().start()