blob: 5a4918461279e2246347ee667153cf28778a4a11 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim OLT Adapter main entry point"""
19
20import argparse
khenaidoob9203542018-09-17 22:56:37 -040021import os
22import time
23
khenaidoo6fdf0ba2018-11-02 14:38:33 -040024import arrow
khenaidoob9203542018-09-17 22:56:37 -040025import yaml
khenaidoo6fdf0ba2018-11-02 14:38:33 -040026from packaging.version import Version
khenaidoob9203542018-09-17 22:56:37 -040027from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
khenaidoo6fdf0ba2018-11-02 14:38:33 -040031
William Kurkianfc0dcda2019-04-08 16:54:36 -040032from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
khenaidoob9203542018-09-17 22:56:37 -040037 get_my_primary_interface
William Kurkianfc0dcda2019-04-08 16:54:36 -040038from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
khenaidoo6fdf0ba2018-11-02 14:38:33 -040043 get_messaging_proxy
William Kurkianfc0dcda2019-04-08 16:54:36 -040044from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
khenaidoofdbad6e2018-11-06 22:26:38 -050045from ponsim_olt import PonSimOltAdapter
William Kurkianfc0dcda2019-04-08 16:54:36 -040046from voltha_protos.adapter_pb2 import AdapterConfig
khenaidoob9203542018-09-17 22:56:37 -040047
48defs = dict(
49 version_file='./VERSION',
50 config=os.environ.get('CONFIG', './ponsim_olt.yml'),
51 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
52 '0-9]+)\..*$'),
53 consul=os.environ.get('CONSUL', 'localhost:8500'),
54 name=os.environ.get('NAME', 'ponsim_olt'),
55 vendor=os.environ.get('VENDOR', 'Voltha Project'),
56 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_olt'),
57 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
58 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
59 etcd=os.environ.get('ETCD', 'localhost:2379'),
60 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
61 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
62 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
khenaidooca301322019-01-09 23:06:32 -050063 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
64 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
khenaidoob9203542018-09-17 22:56:37 -040065 backend=os.environ.get('BACKEND', 'none'),
66 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
67 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
68)
69
70
71def parse_args():
72 parser = argparse.ArgumentParser()
73
74 _help = ('Path to ponsim_onu.yml config file (default: %s). '
75 'If relative, it is relative to main.py of ponsim adapter.'
76 % defs['config'])
77 parser.add_argument('-c', '--config',
78 dest='config',
79 action='store',
80 default=defs['config'],
81 help=_help)
82
83 _help = 'Regular expression for extracting conatiner number from ' \
84 'container name (default: %s)' % defs['container_name_regex']
85 parser.add_argument('-X', '--container-number-extractor',
86 dest='container_name_regex',
87 action='store',
88 default=defs['container_name_regex'],
89 help=_help)
90
91 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
92 parser.add_argument('-C', '--consul',
93 dest='consul',
94 action='store',
95 default=defs['consul'],
96 help=_help)
97
98 _help = 'name of this adapter (default: %s)' % defs['name']
99 parser.add_argument('-na', '--name',
100 dest='name',
101 action='store',
102 default=defs['name'],
103 help=_help)
104
105 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
106 parser.add_argument('-ven', '--vendor',
107 dest='vendor',
108 action='store',
109 default=defs['vendor'],
110 help=_help)
111
112 _help = 'supported device type of this adapter (default: %s)' % defs[
113 'device_type']
114 parser.add_argument('-dt', '--device_type',
115 dest='device_type',
116 action='store',
117 default=defs['device_type'],
118 help=_help)
119
120 _help = 'specifies whether the device type accepts bulk flow updates ' \
121 'adapter (default: %s)' % defs['accept_bulk_flow']
122 parser.add_argument('-abf', '--accept_bulk_flow',
123 dest='accept_bulk_flow',
124 action='store',
125 default=defs['accept_bulk_flow'],
126 help=_help)
127
128 _help = 'specifies whether the device type accepts add/remove flow ' \
129 '(default: %s)' % defs['accept_atomic_flow']
130 parser.add_argument('-aaf', '--accept_atomic_flow',
131 dest='accept_atomic_flow',
132 action='store',
133 default=defs['accept_atomic_flow'],
134 help=_help)
135
136 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
137 parser.add_argument('-e', '--etcd',
138 dest='etcd',
139 action='store',
140 default=defs['etcd'],
141 help=_help)
142
143 _help = ('unique string id of this container instance (default: %s)'
144 % defs['instance_id'])
145 parser.add_argument('-i', '--instance-id',
146 dest='instance_id',
147 action='store',
148 default=defs['instance_id'],
149 help=_help)
150
151 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
152 parser.add_argument('-I', '--interface',
153 dest='interface',
154 action='store',
155 default=defs['interface'],
156 help=_help)
157
158 _help = 'omit startup banner log lines'
159 parser.add_argument('-n', '--no-banner',
160 dest='no_banner',
161 action='store_true',
162 default=False,
163 help=_help)
164
165 _help = 'do not emit periodic heartbeat log messages'
166 parser.add_argument('-N', '--no-heartbeat',
167 dest='no_heartbeat',
168 action='store_true',
169 default=False,
170 help=_help)
171
172 _help = "suppress debug and info logs"
173 parser.add_argument('-q', '--quiet',
174 dest='quiet',
175 action='count',
176 help=_help)
177
178 _help = 'enable verbose logging'
179 parser.add_argument('-v', '--verbose',
180 dest='verbose',
181 action='count',
182 help=_help)
183
184 _help = ('use docker container name as conatiner instance id'
185 ' (overrides -i/--instance-id option)')
186 parser.add_argument('--instance-id-is-container-name',
187 dest='instance_id_is_container_name',
188 action='store_true',
189 default=False,
190 help=_help)
191
192 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
193 'If not '
194 'specified (None), the address from the config file is used'
195 % defs['kafka_adapter'])
196 parser.add_argument('-KA', '--kafka_adapter',
197 dest='kafka_adapter',
198 action='store',
199 default=defs['kafka_adapter'],
200 help=_help)
201
202 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
203 'If not '
204 'specified (None), the address from the config file is used'
205 % defs['kafka_cluster'])
206 parser.add_argument('-KC', '--kafka_cluster',
207 dest='kafka_cluster',
208 action='store',
209 default=defs['kafka_cluster'],
210 help=_help)
211
212 _help = 'backend to use for config persitence'
213 parser.add_argument('-b', '--backend',
214 default=defs['backend'],
215 choices=['none', 'consul', 'etcd'],
216 help=_help)
217
218 _help = 'topic of core on the kafka bus'
219 parser.add_argument('-ct', '--core_topic',
220 dest='core_topic',
221 action='store',
222 default=defs['core_topic'],
223 help=_help)
224
225 args = parser.parse_args()
226
227 # post-processing
228
229 if args.instance_id_is_container_name:
230 args.instance_id = get_my_containers_name()
231
232 return args
233
234
235def load_config(args):
236 path = args.config
237 if path.startswith('.'):
238 dir = os.path.dirname(os.path.abspath(__file__))
239 path = os.path.join(dir, path)
240 path = os.path.abspath(path)
241 with open(path) as fd:
242 config = yaml.load(fd)
243 return config
244
245
246def print_banner(log):
247 log.info(' ____ _ ___ _ _____ ')
248 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| | |_ _| ')
249 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | | | | ')
250 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |___| | ')
251 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_____|_| ')
252 log.info(' ')
253 log.info(' _ _ _ ')
254 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
255 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
256 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
257 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
258 log.info(' |_| ')
259 log.info('(to stop: press Ctrl-C)')
260
261
262@implementer(IComponent)
263class Main(object):
264
265 def __init__(self):
266
267 self.args = args = parse_args()
268 self.config = load_config(args)
269
270 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
271 self.log = setup_logging(self.config.get('logging', {}),
272 args.instance_id,
273 verbosity_adjust=verbosity_adjust)
274 self.log.info('container-number-extractor',
275 regex=args.container_name_regex)
276
277 self.ponsim_olt_adapter_version = self.get_version()
278 self.log.info('Ponsim-OLT-Adapter-Version', version=
279 self.ponsim_olt_adapter_version)
280
281 if not args.no_banner:
282 print_banner(self.log)
283
khenaidoo91ecfd62018-11-04 17:13:42 -0500284 self.adapter = None
khenaidoob9203542018-09-17 22:56:37 -0400285 # Create a unique instance id using the passed-in instance id and
286 # UTC timestamp
287 current_time = arrow.utcnow().timestamp
288 self.instance_id = self.args.instance_id + '_' + str(current_time)
289
290 self.core_topic = args.core_topic
291 self.listening_topic = args.name
292 self.startup_components()
293
294 if not args.no_heartbeat:
295 self.start_heartbeat()
296 self.start_kafka_cluster_heartbeat(self.instance_id)
297
298 def get_version(self):
299 path = defs['version_file']
300 if not path.startswith('/'):
301 dir = os.path.dirname(os.path.abspath(__file__))
302 path = os.path.join(dir, path)
303
304 path = os.path.abspath(path)
305 version_file = open(path, 'r')
306 v = version_file.read()
307
308 # Use Version to validate the version string - exception will be raised
309 # if the version is invalid
310 Version(v)
311
312 version_file.close()
313 return v
314
315 def start(self):
316 self.start_reactor() # will not return except Keyboard interrupt
317
318 def stop(self):
319 pass
320
321 def get_args(self):
322 """Allow access to command line args"""
323 return self.args
324
325 def get_config(self):
326 """Allow access to content of config file"""
327 return self.config
328
329 def _get_adapter_config(self):
330 cfg = AdapterConfig()
331 return cfg
332
333 @inlineCallbacks
334 def startup_components(self):
335 try:
336 self.log.info('starting-internal-components',
337 consul=self.args.consul,
338 etcd=self.args.etcd)
339
340 registry.register('main', self)
341
342 # Update the logger to output the vcore id.
343 self.log = update_logging(instance_id=self.instance_id,
344 vcore_id=None)
345
346 yield registry.register(
347 'kafka_cluster_proxy',
348 KafkaProxy(
349 self.args.consul,
350 self.args.kafka_cluster,
351 config=self.config.get('kafka-cluster-proxy', {})
352 )
353 ).start()
354
355 config = self._get_adapter_config()
356
357 self.core_proxy = CoreProxy(
358 kafka_proxy=None,
khenaidoo54e0ddf2019-02-27 16:21:33 -0500359 default_core_topic=self.core_topic,
khenaidoob9203542018-09-17 22:56:37 -0400360 my_listening_topic=self.listening_topic)
361
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400362 self.adapter_proxy = AdapterProxy(
363 kafka_proxy=None,
364 core_topic=self.core_topic,
365 my_listening_topic=self.listening_topic)
366
khenaidoo91ecfd62018-11-04 17:13:42 -0500367 self.adapter = PonSimOltAdapter(core_proxy=self.core_proxy,
368 adapter_proxy=self.adapter_proxy,
369 config=config)
370
khenaidoo54e0ddf2019-02-27 16:21:33 -0500371 ponsim_request_handler = AdapterRequestFacade(adapter=self.adapter,
372 core_proxy=self.core_proxy)
khenaidoob9203542018-09-17 22:56:37 -0400373
374 yield registry.register(
375 'kafka_adapter_proxy',
376 IKafkaMessagingProxy(
377 kafka_host_port=self.args.kafka_adapter,
378 # TODO: Add KV Store object reference
379 kv_store=self.args.backend,
380 default_topic=self.args.name,
khenaidooca301322019-01-09 23:06:32 -0500381 group_id_prefix=self.args.instance_id,
khenaidoob9203542018-09-17 22:56:37 -0400382 target_cls=ponsim_request_handler
383 )
384 ).start()
385
386 self.core_proxy.kafka_proxy = get_messaging_proxy()
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400387 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
khenaidoob9203542018-09-17 22:56:37 -0400388
389 # retry for ever
390 res = yield self._register_with_core(-1)
391
392 self.log.info('started-internal-services')
393
394 except Exception as e:
395 self.log.exception('Failure-to-start-all-components', e=e)
396
397 @inlineCallbacks
398 def shutdown_components(self):
399 """Execute before the reactor is shut down"""
400 self.log.info('exiting-on-keyboard-interrupt')
401 for component in reversed(registry.iterate()):
402 yield component.stop()
403
404 import threading
405 self.log.info('THREADS:')
406 main_thread = threading.current_thread()
407 for t in threading.enumerate():
408 if t is main_thread:
409 continue
410 if not t.isDaemon():
411 continue
412 self.log.info('joining thread {} {}'.format(
413 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
414 t.join()
415
416 def start_reactor(self):
417 from twisted.internet import reactor
418 reactor.callWhenRunning(
419 lambda: self.log.info('twisted-reactor-started'))
420 reactor.addSystemEventTrigger('before', 'shutdown',
421 self.shutdown_components)
422 reactor.run()
423
424 @inlineCallbacks
425 def _register_with_core(self, retries):
khenaidoob9203542018-09-17 22:56:37 -0400426 while 1:
427 try:
khenaidoo91ecfd62018-11-04 17:13:42 -0500428 resp = yield self.core_proxy.register(
429 self.adapter.adapter_descriptor(),
430 self.adapter.device_types())
431 if resp:
432 self.log.info('registered-with-core',
433 coreId=resp.instance_id)
khenaidoob9203542018-09-17 22:56:37 -0400434 returnValue(resp)
435 except TimeOutError as e:
436 self.log.warn("timeout-when-registering-with-core", e=e)
437 if retries == 0:
438 self.log.exception("no-more-retries", e=e)
439 raise
440 else:
441 retries = retries if retries < 0 else retries - 1
442 yield asleep(defs['retry_interval'])
443 except Exception as e:
444 self.log.exception("failed-registration", e=e)
445 raise
446
447 def start_heartbeat(self):
448
449 t0 = time.time()
450 t0s = time.ctime(t0)
451
452 def heartbeat():
453 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
454
455 lc = LoopingCall(heartbeat)
456 lc.start(10)
457
458 # Temporary function to send a heartbeat message to the external kafka
459 # broker
460 def start_kafka_cluster_heartbeat(self, instance_id):
461 # For heartbeat we will send a message to a specific "voltha-heartbeat"
462 # topic. The message is a protocol buf
463 # message
464 message = dict(
465 type='heartbeat',
466 adapter=self.args.name,
467 instance=instance_id,
468 ip=get_my_primary_local_ipv4()
469 )
470 topic = defs['heartbeat_topic']
471
472 def send_msg(start_time):
473 try:
474 kafka_cluster_proxy = get_kafka_proxy()
475 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
476 # self.log.debug('kafka-proxy-available')
477 message['ts'] = arrow.utcnow().timestamp
478 message['uptime'] = time.time() - start_time
479 # self.log.debug('start-kafka-heartbeat')
480 kafka_cluster_proxy.send_message(topic, dumps(message))
481 else:
482 self.log.error('kafka-proxy-unavailable')
483 except Exception, e:
484 self.log.exception('failed-sending-message-heartbeat', e=e)
485
486 try:
487 t0 = time.time()
488 lc = LoopingCall(send_msg, t0)
489 lc.start(10)
490 except Exception, e:
491 self.log.exception('failed-kafka-heartbeat', e=e)
492
493
494if __name__ == '__main__':
495 Main().start()