blob: e3c5b7f2eaa8fc112aec1fe8280eb7206df2c1c1 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim OLT Adapter main entry point"""
19
20import argparse
khenaidoob9203542018-09-17 22:56:37 -040021import os
22import time
23
khenaidoo6fdf0ba2018-11-02 14:38:33 -040024import arrow
khenaidoob9203542018-09-17 22:56:37 -040025import yaml
khenaidoo6fdf0ba2018-11-02 14:38:33 -040026from packaging.version import Version
khenaidoob9203542018-09-17 22:56:37 -040027from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
khenaidoo6fdf0ba2018-11-02 14:38:33 -040031
khenaidoofdbad6e2018-11-06 22:26:38 -050032from python.common.structlog_setup import setup_logging, update_logging
33from python.common.utils.asleep import asleep
34from python.common.utils.deferred_utils import TimeOutError
35from python.common.utils.dockerhelpers import get_my_containers_name
36from python.common.utils.nethelpers import get_my_primary_local_ipv4, \
khenaidoob9203542018-09-17 22:56:37 -040037 get_my_primary_interface
khenaidoofdbad6e2018-11-06 22:26:38 -050038from python.common.utils.registry import registry, IComponent
39from python.adapters.kafka.adapter_proxy import AdapterProxy
40from python.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from python.adapters.kafka.core_proxy import CoreProxy
42from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040043 get_messaging_proxy
khenaidoofdbad6e2018-11-06 22:26:38 -050044from python.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from ponsim_olt import PonSimOltAdapter
46from python.protos import third_party
47from python.protos.adapter_pb2 import AdapterConfig
khenaidoob9203542018-09-17 22:56:37 -040048
49_ = third_party
50
51defs = dict(
52 version_file='./VERSION',
53 config=os.environ.get('CONFIG', './ponsim_olt.yml'),
54 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
55 '0-9]+)\..*$'),
56 consul=os.environ.get('CONSUL', 'localhost:8500'),
57 name=os.environ.get('NAME', 'ponsim_olt'),
58 vendor=os.environ.get('VENDOR', 'Voltha Project'),
59 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_olt'),
60 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
61 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
62 etcd=os.environ.get('ETCD', 'localhost:2379'),
63 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
64 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
65 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
khenaidooca301322019-01-09 23:06:32 -050066 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
67 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
khenaidoob9203542018-09-17 22:56:37 -040068 backend=os.environ.get('BACKEND', 'none'),
69 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
70 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
71)
72
73
74def parse_args():
75 parser = argparse.ArgumentParser()
76
77 _help = ('Path to ponsim_onu.yml config file (default: %s). '
78 'If relative, it is relative to main.py of ponsim adapter.'
79 % defs['config'])
80 parser.add_argument('-c', '--config',
81 dest='config',
82 action='store',
83 default=defs['config'],
84 help=_help)
85
86 _help = 'Regular expression for extracting conatiner number from ' \
87 'container name (default: %s)' % defs['container_name_regex']
88 parser.add_argument('-X', '--container-number-extractor',
89 dest='container_name_regex',
90 action='store',
91 default=defs['container_name_regex'],
92 help=_help)
93
94 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
95 parser.add_argument('-C', '--consul',
96 dest='consul',
97 action='store',
98 default=defs['consul'],
99 help=_help)
100
101 _help = 'name of this adapter (default: %s)' % defs['name']
102 parser.add_argument('-na', '--name',
103 dest='name',
104 action='store',
105 default=defs['name'],
106 help=_help)
107
108 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
109 parser.add_argument('-ven', '--vendor',
110 dest='vendor',
111 action='store',
112 default=defs['vendor'],
113 help=_help)
114
115 _help = 'supported device type of this adapter (default: %s)' % defs[
116 'device_type']
117 parser.add_argument('-dt', '--device_type',
118 dest='device_type',
119 action='store',
120 default=defs['device_type'],
121 help=_help)
122
123 _help = 'specifies whether the device type accepts bulk flow updates ' \
124 'adapter (default: %s)' % defs['accept_bulk_flow']
125 parser.add_argument('-abf', '--accept_bulk_flow',
126 dest='accept_bulk_flow',
127 action='store',
128 default=defs['accept_bulk_flow'],
129 help=_help)
130
131 _help = 'specifies whether the device type accepts add/remove flow ' \
132 '(default: %s)' % defs['accept_atomic_flow']
133 parser.add_argument('-aaf', '--accept_atomic_flow',
134 dest='accept_atomic_flow',
135 action='store',
136 default=defs['accept_atomic_flow'],
137 help=_help)
138
139 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
140 parser.add_argument('-e', '--etcd',
141 dest='etcd',
142 action='store',
143 default=defs['etcd'],
144 help=_help)
145
146 _help = ('unique string id of this container instance (default: %s)'
147 % defs['instance_id'])
148 parser.add_argument('-i', '--instance-id',
149 dest='instance_id',
150 action='store',
151 default=defs['instance_id'],
152 help=_help)
153
154 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
155 parser.add_argument('-I', '--interface',
156 dest='interface',
157 action='store',
158 default=defs['interface'],
159 help=_help)
160
161 _help = 'omit startup banner log lines'
162 parser.add_argument('-n', '--no-banner',
163 dest='no_banner',
164 action='store_true',
165 default=False,
166 help=_help)
167
168 _help = 'do not emit periodic heartbeat log messages'
169 parser.add_argument('-N', '--no-heartbeat',
170 dest='no_heartbeat',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = "suppress debug and info logs"
176 parser.add_argument('-q', '--quiet',
177 dest='quiet',
178 action='count',
179 help=_help)
180
181 _help = 'enable verbose logging'
182 parser.add_argument('-v', '--verbose',
183 dest='verbose',
184 action='count',
185 help=_help)
186
187 _help = ('use docker container name as conatiner instance id'
188 ' (overrides -i/--instance-id option)')
189 parser.add_argument('--instance-id-is-container-name',
190 dest='instance_id_is_container_name',
191 action='store_true',
192 default=False,
193 help=_help)
194
195 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
196 'If not '
197 'specified (None), the address from the config file is used'
198 % defs['kafka_adapter'])
199 parser.add_argument('-KA', '--kafka_adapter',
200 dest='kafka_adapter',
201 action='store',
202 default=defs['kafka_adapter'],
203 help=_help)
204
205 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
206 'If not '
207 'specified (None), the address from the config file is used'
208 % defs['kafka_cluster'])
209 parser.add_argument('-KC', '--kafka_cluster',
210 dest='kafka_cluster',
211 action='store',
212 default=defs['kafka_cluster'],
213 help=_help)
214
215 _help = 'backend to use for config persitence'
216 parser.add_argument('-b', '--backend',
217 default=defs['backend'],
218 choices=['none', 'consul', 'etcd'],
219 help=_help)
220
221 _help = 'topic of core on the kafka bus'
222 parser.add_argument('-ct', '--core_topic',
223 dest='core_topic',
224 action='store',
225 default=defs['core_topic'],
226 help=_help)
227
228 args = parser.parse_args()
229
230 # post-processing
231
232 if args.instance_id_is_container_name:
233 args.instance_id = get_my_containers_name()
234
235 return args
236
237
238def load_config(args):
239 path = args.config
240 if path.startswith('.'):
241 dir = os.path.dirname(os.path.abspath(__file__))
242 path = os.path.join(dir, path)
243 path = os.path.abspath(path)
244 with open(path) as fd:
245 config = yaml.load(fd)
246 return config
247
248
249def print_banner(log):
250 log.info(' ____ _ ___ _ _____ ')
251 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| | |_ _| ')
252 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | | | | ')
253 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |___| | ')
254 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_____|_| ')
255 log.info(' ')
256 log.info(' _ _ _ ')
257 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
258 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
259 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
260 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
261 log.info(' |_| ')
262 log.info('(to stop: press Ctrl-C)')
263
264
265@implementer(IComponent)
266class Main(object):
267
268 def __init__(self):
269
270 self.args = args = parse_args()
271 self.config = load_config(args)
272
273 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
274 self.log = setup_logging(self.config.get('logging', {}),
275 args.instance_id,
276 verbosity_adjust=verbosity_adjust)
277 self.log.info('container-number-extractor',
278 regex=args.container_name_regex)
279
280 self.ponsim_olt_adapter_version = self.get_version()
281 self.log.info('Ponsim-OLT-Adapter-Version', version=
282 self.ponsim_olt_adapter_version)
283
284 if not args.no_banner:
285 print_banner(self.log)
286
khenaidoo91ecfd62018-11-04 17:13:42 -0500287 self.adapter = None
khenaidoob9203542018-09-17 22:56:37 -0400288 # Create a unique instance id using the passed-in instance id and
289 # UTC timestamp
290 current_time = arrow.utcnow().timestamp
291 self.instance_id = self.args.instance_id + '_' + str(current_time)
292
293 self.core_topic = args.core_topic
294 self.listening_topic = args.name
295 self.startup_components()
296
297 if not args.no_heartbeat:
298 self.start_heartbeat()
299 self.start_kafka_cluster_heartbeat(self.instance_id)
300
301 def get_version(self):
302 path = defs['version_file']
303 if not path.startswith('/'):
304 dir = os.path.dirname(os.path.abspath(__file__))
305 path = os.path.join(dir, path)
306
307 path = os.path.abspath(path)
308 version_file = open(path, 'r')
309 v = version_file.read()
310
311 # Use Version to validate the version string - exception will be raised
312 # if the version is invalid
313 Version(v)
314
315 version_file.close()
316 return v
317
318 def start(self):
319 self.start_reactor() # will not return except Keyboard interrupt
320
321 def stop(self):
322 pass
323
324 def get_args(self):
325 """Allow access to command line args"""
326 return self.args
327
328 def get_config(self):
329 """Allow access to content of config file"""
330 return self.config
331
332 def _get_adapter_config(self):
333 cfg = AdapterConfig()
334 return cfg
335
336 @inlineCallbacks
337 def startup_components(self):
338 try:
339 self.log.info('starting-internal-components',
340 consul=self.args.consul,
341 etcd=self.args.etcd)
342
343 registry.register('main', self)
344
345 # Update the logger to output the vcore id.
346 self.log = update_logging(instance_id=self.instance_id,
347 vcore_id=None)
348
349 yield registry.register(
350 'kafka_cluster_proxy',
351 KafkaProxy(
352 self.args.consul,
353 self.args.kafka_cluster,
354 config=self.config.get('kafka-cluster-proxy', {})
355 )
356 ).start()
357
358 config = self._get_adapter_config()
359
360 self.core_proxy = CoreProxy(
361 kafka_proxy=None,
khenaidoo54e0ddf2019-02-27 16:21:33 -0500362 default_core_topic=self.core_topic,
khenaidoob9203542018-09-17 22:56:37 -0400363 my_listening_topic=self.listening_topic)
364
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400365 self.adapter_proxy = AdapterProxy(
366 kafka_proxy=None,
367 core_topic=self.core_topic,
368 my_listening_topic=self.listening_topic)
369
khenaidoo91ecfd62018-11-04 17:13:42 -0500370 self.adapter = PonSimOltAdapter(core_proxy=self.core_proxy,
371 adapter_proxy=self.adapter_proxy,
372 config=config)
373
khenaidoo54e0ddf2019-02-27 16:21:33 -0500374 ponsim_request_handler = AdapterRequestFacade(adapter=self.adapter,
375 core_proxy=self.core_proxy)
khenaidoob9203542018-09-17 22:56:37 -0400376
377 yield registry.register(
378 'kafka_adapter_proxy',
379 IKafkaMessagingProxy(
380 kafka_host_port=self.args.kafka_adapter,
381 # TODO: Add KV Store object reference
382 kv_store=self.args.backend,
383 default_topic=self.args.name,
khenaidooca301322019-01-09 23:06:32 -0500384 group_id_prefix=self.args.instance_id,
khenaidoob9203542018-09-17 22:56:37 -0400385 target_cls=ponsim_request_handler
386 )
387 ).start()
388
389 self.core_proxy.kafka_proxy = get_messaging_proxy()
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400390 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
khenaidoob9203542018-09-17 22:56:37 -0400391
392 # retry for ever
393 res = yield self._register_with_core(-1)
394
395 self.log.info('started-internal-services')
396
397 except Exception as e:
398 self.log.exception('Failure-to-start-all-components', e=e)
399
400 @inlineCallbacks
401 def shutdown_components(self):
402 """Execute before the reactor is shut down"""
403 self.log.info('exiting-on-keyboard-interrupt')
404 for component in reversed(registry.iterate()):
405 yield component.stop()
406
407 import threading
408 self.log.info('THREADS:')
409 main_thread = threading.current_thread()
410 for t in threading.enumerate():
411 if t is main_thread:
412 continue
413 if not t.isDaemon():
414 continue
415 self.log.info('joining thread {} {}'.format(
416 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
417 t.join()
418
419 def start_reactor(self):
420 from twisted.internet import reactor
421 reactor.callWhenRunning(
422 lambda: self.log.info('twisted-reactor-started'))
423 reactor.addSystemEventTrigger('before', 'shutdown',
424 self.shutdown_components)
425 reactor.run()
426
427 @inlineCallbacks
428 def _register_with_core(self, retries):
khenaidoob9203542018-09-17 22:56:37 -0400429 while 1:
430 try:
khenaidoo91ecfd62018-11-04 17:13:42 -0500431 resp = yield self.core_proxy.register(
432 self.adapter.adapter_descriptor(),
433 self.adapter.device_types())
434 if resp:
435 self.log.info('registered-with-core',
436 coreId=resp.instance_id)
khenaidoob9203542018-09-17 22:56:37 -0400437 returnValue(resp)
438 except TimeOutError as e:
439 self.log.warn("timeout-when-registering-with-core", e=e)
440 if retries == 0:
441 self.log.exception("no-more-retries", e=e)
442 raise
443 else:
444 retries = retries if retries < 0 else retries - 1
445 yield asleep(defs['retry_interval'])
446 except Exception as e:
447 self.log.exception("failed-registration", e=e)
448 raise
449
450 def start_heartbeat(self):
451
452 t0 = time.time()
453 t0s = time.ctime(t0)
454
455 def heartbeat():
456 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
457
458 lc = LoopingCall(heartbeat)
459 lc.start(10)
460
461 # Temporary function to send a heartbeat message to the external kafka
462 # broker
463 def start_kafka_cluster_heartbeat(self, instance_id):
464 # For heartbeat we will send a message to a specific "voltha-heartbeat"
465 # topic. The message is a protocol buf
466 # message
467 message = dict(
468 type='heartbeat',
469 adapter=self.args.name,
470 instance=instance_id,
471 ip=get_my_primary_local_ipv4()
472 )
473 topic = defs['heartbeat_topic']
474
475 def send_msg(start_time):
476 try:
477 kafka_cluster_proxy = get_kafka_proxy()
478 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
479 # self.log.debug('kafka-proxy-available')
480 message['ts'] = arrow.utcnow().timestamp
481 message['uptime'] = time.time() - start_time
482 # self.log.debug('start-kafka-heartbeat')
483 kafka_cluster_proxy.send_message(topic, dumps(message))
484 else:
485 self.log.error('kafka-proxy-unavailable')
486 except Exception, e:
487 self.log.exception('failed-sending-message-heartbeat', e=e)
488
489 try:
490 t0 = time.time()
491 lc = LoopingCall(send_msg, t0)
492 lc.start(10)
493 except Exception, e:
494 self.log.exception('failed-kafka-heartbeat', e=e)
495
496
497if __name__ == '__main__':
498 Main().start()