blob: 86b6be0b2284ddea7c0761ace0771453a7890e43 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim OLT Adapter main entry point"""
19
20import argparse
khenaidoob9203542018-09-17 22:56:37 -040021import os
22import time
23
khenaidoo6fdf0ba2018-11-02 14:38:33 -040024import arrow
khenaidoob9203542018-09-17 22:56:37 -040025import yaml
khenaidoo6fdf0ba2018-11-02 14:38:33 -040026from packaging.version import Version
khenaidoob9203542018-09-17 22:56:37 -040027from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
khenaidoo6fdf0ba2018-11-02 14:38:33 -040031
khenaidoofdbad6e2018-11-06 22:26:38 -050032from python.common.structlog_setup import setup_logging, update_logging
33from python.common.utils.asleep import asleep
34from python.common.utils.deferred_utils import TimeOutError
35from python.common.utils.dockerhelpers import get_my_containers_name
36from python.common.utils.nethelpers import get_my_primary_local_ipv4, \
khenaidoob9203542018-09-17 22:56:37 -040037 get_my_primary_interface
khenaidoofdbad6e2018-11-06 22:26:38 -050038from python.common.utils.registry import registry, IComponent
39from python.adapters.kafka.adapter_proxy import AdapterProxy
40from python.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from python.adapters.kafka.core_proxy import CoreProxy
42from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040043 get_messaging_proxy
khenaidoofdbad6e2018-11-06 22:26:38 -050044from python.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
45from ponsim_olt import PonSimOltAdapter
46from python.protos import third_party
47from python.protos.adapter_pb2 import AdapterConfig
khenaidoob9203542018-09-17 22:56:37 -040048
49_ = third_party
50
51defs = dict(
52 version_file='./VERSION',
53 config=os.environ.get('CONFIG', './ponsim_olt.yml'),
54 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
55 '0-9]+)\..*$'),
56 consul=os.environ.get('CONSUL', 'localhost:8500'),
57 name=os.environ.get('NAME', 'ponsim_olt'),
58 vendor=os.environ.get('VENDOR', 'Voltha Project'),
59 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_olt'),
60 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
61 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
62 etcd=os.environ.get('ETCD', 'localhost:2379'),
63 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
64 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
65 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
khenaidooca301322019-01-09 23:06:32 -050066 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
67 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
khenaidoob9203542018-09-17 22:56:37 -040068 backend=os.environ.get('BACKEND', 'none'),
69 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
70 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
71)
72
73
74def parse_args():
75 parser = argparse.ArgumentParser()
76
77 _help = ('Path to ponsim_onu.yml config file (default: %s). '
78 'If relative, it is relative to main.py of ponsim adapter.'
79 % defs['config'])
80 parser.add_argument('-c', '--config',
81 dest='config',
82 action='store',
83 default=defs['config'],
84 help=_help)
85
86 _help = 'Regular expression for extracting conatiner number from ' \
87 'container name (default: %s)' % defs['container_name_regex']
88 parser.add_argument('-X', '--container-number-extractor',
89 dest='container_name_regex',
90 action='store',
91 default=defs['container_name_regex'],
92 help=_help)
93
94 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
95 parser.add_argument('-C', '--consul',
96 dest='consul',
97 action='store',
98 default=defs['consul'],
99 help=_help)
100
101 _help = 'name of this adapter (default: %s)' % defs['name']
102 parser.add_argument('-na', '--name',
103 dest='name',
104 action='store',
105 default=defs['name'],
106 help=_help)
107
108 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
109 parser.add_argument('-ven', '--vendor',
110 dest='vendor',
111 action='store',
112 default=defs['vendor'],
113 help=_help)
114
115 _help = 'supported device type of this adapter (default: %s)' % defs[
116 'device_type']
117 parser.add_argument('-dt', '--device_type',
118 dest='device_type',
119 action='store',
120 default=defs['device_type'],
121 help=_help)
122
123 _help = 'specifies whether the device type accepts bulk flow updates ' \
124 'adapter (default: %s)' % defs['accept_bulk_flow']
125 parser.add_argument('-abf', '--accept_bulk_flow',
126 dest='accept_bulk_flow',
127 action='store',
128 default=defs['accept_bulk_flow'],
129 help=_help)
130
131 _help = 'specifies whether the device type accepts add/remove flow ' \
132 '(default: %s)' % defs['accept_atomic_flow']
133 parser.add_argument('-aaf', '--accept_atomic_flow',
134 dest='accept_atomic_flow',
135 action='store',
136 default=defs['accept_atomic_flow'],
137 help=_help)
138
139 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
140 parser.add_argument('-e', '--etcd',
141 dest='etcd',
142 action='store',
143 default=defs['etcd'],
144 help=_help)
145
146 _help = ('unique string id of this container instance (default: %s)'
147 % defs['instance_id'])
148 parser.add_argument('-i', '--instance-id',
149 dest='instance_id',
150 action='store',
151 default=defs['instance_id'],
152 help=_help)
153
154 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
155 parser.add_argument('-I', '--interface',
156 dest='interface',
157 action='store',
158 default=defs['interface'],
159 help=_help)
160
161 _help = 'omit startup banner log lines'
162 parser.add_argument('-n', '--no-banner',
163 dest='no_banner',
164 action='store_true',
165 default=False,
166 help=_help)
167
168 _help = 'do not emit periodic heartbeat log messages'
169 parser.add_argument('-N', '--no-heartbeat',
170 dest='no_heartbeat',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = "suppress debug and info logs"
176 parser.add_argument('-q', '--quiet',
177 dest='quiet',
178 action='count',
179 help=_help)
180
181 _help = 'enable verbose logging'
182 parser.add_argument('-v', '--verbose',
183 dest='verbose',
184 action='count',
185 help=_help)
186
187 _help = ('use docker container name as conatiner instance id'
188 ' (overrides -i/--instance-id option)')
189 parser.add_argument('--instance-id-is-container-name',
190 dest='instance_id_is_container_name',
191 action='store_true',
192 default=False,
193 help=_help)
194
195 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
196 'If not '
197 'specified (None), the address from the config file is used'
198 % defs['kafka_adapter'])
199 parser.add_argument('-KA', '--kafka_adapter',
200 dest='kafka_adapter',
201 action='store',
202 default=defs['kafka_adapter'],
203 help=_help)
204
205 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
206 'If not '
207 'specified (None), the address from the config file is used'
208 % defs['kafka_cluster'])
209 parser.add_argument('-KC', '--kafka_cluster',
210 dest='kafka_cluster',
211 action='store',
212 default=defs['kafka_cluster'],
213 help=_help)
214
215 _help = 'backend to use for config persitence'
216 parser.add_argument('-b', '--backend',
217 default=defs['backend'],
218 choices=['none', 'consul', 'etcd'],
219 help=_help)
220
221 _help = 'topic of core on the kafka bus'
222 parser.add_argument('-ct', '--core_topic',
223 dest='core_topic',
224 action='store',
225 default=defs['core_topic'],
226 help=_help)
227
228 args = parser.parse_args()
229
230 # post-processing
231
232 if args.instance_id_is_container_name:
233 args.instance_id = get_my_containers_name()
234
235 return args
236
237
238def load_config(args):
239 path = args.config
240 if path.startswith('.'):
241 dir = os.path.dirname(os.path.abspath(__file__))
242 path = os.path.join(dir, path)
243 path = os.path.abspath(path)
244 with open(path) as fd:
245 config = yaml.load(fd)
246 return config
247
248
249def print_banner(log):
250 log.info(' ____ _ ___ _ _____ ')
251 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| | |_ _| ')
252 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | | | | ')
253 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |___| | ')
254 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_____|_| ')
255 log.info(' ')
256 log.info(' _ _ _ ')
257 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
258 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
259 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
260 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
261 log.info(' |_| ')
262 log.info('(to stop: press Ctrl-C)')
263
264
265@implementer(IComponent)
266class Main(object):
267
268 def __init__(self):
269
270 self.args = args = parse_args()
271 self.config = load_config(args)
272
273 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
274 self.log = setup_logging(self.config.get('logging', {}),
275 args.instance_id,
276 verbosity_adjust=verbosity_adjust)
277 self.log.info('container-number-extractor',
278 regex=args.container_name_regex)
279
280 self.ponsim_olt_adapter_version = self.get_version()
281 self.log.info('Ponsim-OLT-Adapter-Version', version=
282 self.ponsim_olt_adapter_version)
283
284 if not args.no_banner:
285 print_banner(self.log)
286
khenaidoo91ecfd62018-11-04 17:13:42 -0500287 self.adapter = None
khenaidoob9203542018-09-17 22:56:37 -0400288 # Create a unique instance id using the passed-in instance id and
289 # UTC timestamp
290 current_time = arrow.utcnow().timestamp
291 self.instance_id = self.args.instance_id + '_' + str(current_time)
292
293 self.core_topic = args.core_topic
294 self.listening_topic = args.name
295 self.startup_components()
296
297 if not args.no_heartbeat:
298 self.start_heartbeat()
299 self.start_kafka_cluster_heartbeat(self.instance_id)
300
301 def get_version(self):
302 path = defs['version_file']
303 if not path.startswith('/'):
304 dir = os.path.dirname(os.path.abspath(__file__))
305 path = os.path.join(dir, path)
306
307 path = os.path.abspath(path)
308 version_file = open(path, 'r')
309 v = version_file.read()
310
311 # Use Version to validate the version string - exception will be raised
312 # if the version is invalid
313 Version(v)
314
315 version_file.close()
316 return v
317
318 def start(self):
319 self.start_reactor() # will not return except Keyboard interrupt
320
321 def stop(self):
322 pass
323
324 def get_args(self):
325 """Allow access to command line args"""
326 return self.args
327
328 def get_config(self):
329 """Allow access to content of config file"""
330 return self.config
331
332 def _get_adapter_config(self):
333 cfg = AdapterConfig()
334 return cfg
335
336 @inlineCallbacks
337 def startup_components(self):
338 try:
339 self.log.info('starting-internal-components',
340 consul=self.args.consul,
341 etcd=self.args.etcd)
342
343 registry.register('main', self)
344
345 # Update the logger to output the vcore id.
346 self.log = update_logging(instance_id=self.instance_id,
347 vcore_id=None)
348
349 yield registry.register(
350 'kafka_cluster_proxy',
351 KafkaProxy(
352 self.args.consul,
353 self.args.kafka_cluster,
354 config=self.config.get('kafka-cluster-proxy', {})
355 )
356 ).start()
357
358 config = self._get_adapter_config()
359
360 self.core_proxy = CoreProxy(
361 kafka_proxy=None,
362 core_topic=self.core_topic,
363 my_listening_topic=self.listening_topic)
364
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400365 self.adapter_proxy = AdapterProxy(
366 kafka_proxy=None,
367 core_topic=self.core_topic,
368 my_listening_topic=self.listening_topic)
369
khenaidoo91ecfd62018-11-04 17:13:42 -0500370 self.adapter = PonSimOltAdapter(core_proxy=self.core_proxy,
371 adapter_proxy=self.adapter_proxy,
372 config=config)
373
374 ponsim_request_handler = AdapterRequestFacade(adapter=self.adapter)
khenaidoob9203542018-09-17 22:56:37 -0400375
376 yield registry.register(
377 'kafka_adapter_proxy',
378 IKafkaMessagingProxy(
379 kafka_host_port=self.args.kafka_adapter,
380 # TODO: Add KV Store object reference
381 kv_store=self.args.backend,
382 default_topic=self.args.name,
khenaidooca301322019-01-09 23:06:32 -0500383 group_id_prefix=self.args.instance_id,
khenaidoob9203542018-09-17 22:56:37 -0400384 target_cls=ponsim_request_handler
385 )
386 ).start()
387
388 self.core_proxy.kafka_proxy = get_messaging_proxy()
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400389 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
khenaidoob9203542018-09-17 22:56:37 -0400390
391 # retry for ever
392 res = yield self._register_with_core(-1)
393
394 self.log.info('started-internal-services')
395
396 except Exception as e:
397 self.log.exception('Failure-to-start-all-components', e=e)
398
399 @inlineCallbacks
400 def shutdown_components(self):
401 """Execute before the reactor is shut down"""
402 self.log.info('exiting-on-keyboard-interrupt')
403 for component in reversed(registry.iterate()):
404 yield component.stop()
405
406 import threading
407 self.log.info('THREADS:')
408 main_thread = threading.current_thread()
409 for t in threading.enumerate():
410 if t is main_thread:
411 continue
412 if not t.isDaemon():
413 continue
414 self.log.info('joining thread {} {}'.format(
415 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
416 t.join()
417
418 def start_reactor(self):
419 from twisted.internet import reactor
420 reactor.callWhenRunning(
421 lambda: self.log.info('twisted-reactor-started'))
422 reactor.addSystemEventTrigger('before', 'shutdown',
423 self.shutdown_components)
424 reactor.run()
425
426 @inlineCallbacks
427 def _register_with_core(self, retries):
khenaidoob9203542018-09-17 22:56:37 -0400428 while 1:
429 try:
khenaidoo91ecfd62018-11-04 17:13:42 -0500430 resp = yield self.core_proxy.register(
431 self.adapter.adapter_descriptor(),
432 self.adapter.device_types())
433 if resp:
434 self.log.info('registered-with-core',
435 coreId=resp.instance_id)
khenaidoob9203542018-09-17 22:56:37 -0400436 returnValue(resp)
437 except TimeOutError as e:
438 self.log.warn("timeout-when-registering-with-core", e=e)
439 if retries == 0:
440 self.log.exception("no-more-retries", e=e)
441 raise
442 else:
443 retries = retries if retries < 0 else retries - 1
444 yield asleep(defs['retry_interval'])
445 except Exception as e:
446 self.log.exception("failed-registration", e=e)
447 raise
448
449 def start_heartbeat(self):
450
451 t0 = time.time()
452 t0s = time.ctime(t0)
453
454 def heartbeat():
455 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
456
457 lc = LoopingCall(heartbeat)
458 lc.start(10)
459
460 # Temporary function to send a heartbeat message to the external kafka
461 # broker
462 def start_kafka_cluster_heartbeat(self, instance_id):
463 # For heartbeat we will send a message to a specific "voltha-heartbeat"
464 # topic. The message is a protocol buf
465 # message
466 message = dict(
467 type='heartbeat',
468 adapter=self.args.name,
469 instance=instance_id,
470 ip=get_my_primary_local_ipv4()
471 )
472 topic = defs['heartbeat_topic']
473
474 def send_msg(start_time):
475 try:
476 kafka_cluster_proxy = get_kafka_proxy()
477 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
478 # self.log.debug('kafka-proxy-available')
479 message['ts'] = arrow.utcnow().timestamp
480 message['uptime'] = time.time() - start_time
481 # self.log.debug('start-kafka-heartbeat')
482 kafka_cluster_proxy.send_message(topic, dumps(message))
483 else:
484 self.log.error('kafka-proxy-unavailable')
485 except Exception, e:
486 self.log.exception('failed-sending-message-heartbeat', e=e)
487
488 try:
489 t0 = time.time()
490 lc = LoopingCall(send_msg, t0)
491 lc.start(10)
492 except Exception, e:
493 self.log.exception('failed-kafka-heartbeat', e=e)
494
495
496if __name__ == '__main__':
497 Main().start()