blob: 53745ee6169741017a123de7251a9b13b9a8b091 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim OLT Adapter main entry point"""
19
20import argparse
21import arrow
22import os
23import time
24
25import yaml
26from simplejson import dumps
27from twisted.internet.defer import inlineCallbacks, returnValue
28from twisted.internet.task import LoopingCall
29from zope.interface import implementer
30from adapters.protos import third_party
31from adapters.common.structlog_setup import setup_logging, update_logging
32from adapters.common.utils.dockerhelpers import get_my_containers_name
33from adapters.common.utils.nethelpers import get_my_primary_local_ipv4, \
34 get_my_primary_interface
35from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
36from adapters.common.utils.registry import registry, IComponent
37from packaging.version import Version
38from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, get_messaging_proxy
39from adapters.ponsim_olt.ponsim_olt import PonSimOltAdapter
40from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
41from adapters.kafka.adapter_request_facade import AdapterRequestFacade
42from adapters.kafka.core_proxy import CoreProxy
43from adapters.common.utils.deferred_utils import TimeOutError
44from adapters.common.utils.asleep import asleep
45
46_ = third_party
47
48defs = dict(
49 version_file='./VERSION',
50 config=os.environ.get('CONFIG', './ponsim_olt.yml'),
51 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
52 '0-9]+)\..*$'),
53 consul=os.environ.get('CONSUL', 'localhost:8500'),
54 name=os.environ.get('NAME', 'ponsim_olt'),
55 vendor=os.environ.get('VENDOR', 'Voltha Project'),
56 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_olt'),
57 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
58 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
59 etcd=os.environ.get('ETCD', 'localhost:2379'),
60 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
61 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
62 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
63 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
64 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
65 backend=os.environ.get('BACKEND', 'none'),
66 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
67 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
68)
69
70
71def parse_args():
72 parser = argparse.ArgumentParser()
73
74 _help = ('Path to ponsim_onu.yml config file (default: %s). '
75 'If relative, it is relative to main.py of ponsim adapter.'
76 % defs['config'])
77 parser.add_argument('-c', '--config',
78 dest='config',
79 action='store',
80 default=defs['config'],
81 help=_help)
82
83 _help = 'Regular expression for extracting conatiner number from ' \
84 'container name (default: %s)' % defs['container_name_regex']
85 parser.add_argument('-X', '--container-number-extractor',
86 dest='container_name_regex',
87 action='store',
88 default=defs['container_name_regex'],
89 help=_help)
90
91 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
92 parser.add_argument('-C', '--consul',
93 dest='consul',
94 action='store',
95 default=defs['consul'],
96 help=_help)
97
98 _help = 'name of this adapter (default: %s)' % defs['name']
99 parser.add_argument('-na', '--name',
100 dest='name',
101 action='store',
102 default=defs['name'],
103 help=_help)
104
105 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
106 parser.add_argument('-ven', '--vendor',
107 dest='vendor',
108 action='store',
109 default=defs['vendor'],
110 help=_help)
111
112 _help = 'supported device type of this adapter (default: %s)' % defs[
113 'device_type']
114 parser.add_argument('-dt', '--device_type',
115 dest='device_type',
116 action='store',
117 default=defs['device_type'],
118 help=_help)
119
120 _help = 'specifies whether the device type accepts bulk flow updates ' \
121 'adapter (default: %s)' % defs['accept_bulk_flow']
122 parser.add_argument('-abf', '--accept_bulk_flow',
123 dest='accept_bulk_flow',
124 action='store',
125 default=defs['accept_bulk_flow'],
126 help=_help)
127
128 _help = 'specifies whether the device type accepts add/remove flow ' \
129 '(default: %s)' % defs['accept_atomic_flow']
130 parser.add_argument('-aaf', '--accept_atomic_flow',
131 dest='accept_atomic_flow',
132 action='store',
133 default=defs['accept_atomic_flow'],
134 help=_help)
135
136 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
137 parser.add_argument('-e', '--etcd',
138 dest='etcd',
139 action='store',
140 default=defs['etcd'],
141 help=_help)
142
143 _help = ('unique string id of this container instance (default: %s)'
144 % defs['instance_id'])
145 parser.add_argument('-i', '--instance-id',
146 dest='instance_id',
147 action='store',
148 default=defs['instance_id'],
149 help=_help)
150
151 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
152 parser.add_argument('-I', '--interface',
153 dest='interface',
154 action='store',
155 default=defs['interface'],
156 help=_help)
157
158 _help = 'omit startup banner log lines'
159 parser.add_argument('-n', '--no-banner',
160 dest='no_banner',
161 action='store_true',
162 default=False,
163 help=_help)
164
165 _help = 'do not emit periodic heartbeat log messages'
166 parser.add_argument('-N', '--no-heartbeat',
167 dest='no_heartbeat',
168 action='store_true',
169 default=False,
170 help=_help)
171
172 _help = "suppress debug and info logs"
173 parser.add_argument('-q', '--quiet',
174 dest='quiet',
175 action='count',
176 help=_help)
177
178 _help = 'enable verbose logging'
179 parser.add_argument('-v', '--verbose',
180 dest='verbose',
181 action='count',
182 help=_help)
183
184 _help = ('use docker container name as conatiner instance id'
185 ' (overrides -i/--instance-id option)')
186 parser.add_argument('--instance-id-is-container-name',
187 dest='instance_id_is_container_name',
188 action='store_true',
189 default=False,
190 help=_help)
191
192 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
193 'If not '
194 'specified (None), the address from the config file is used'
195 % defs['kafka_adapter'])
196 parser.add_argument('-KA', '--kafka_adapter',
197 dest='kafka_adapter',
198 action='store',
199 default=defs['kafka_adapter'],
200 help=_help)
201
202 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
203 'If not '
204 'specified (None), the address from the config file is used'
205 % defs['kafka_cluster'])
206 parser.add_argument('-KC', '--kafka_cluster',
207 dest='kafka_cluster',
208 action='store',
209 default=defs['kafka_cluster'],
210 help=_help)
211
212 _help = 'backend to use for config persitence'
213 parser.add_argument('-b', '--backend',
214 default=defs['backend'],
215 choices=['none', 'consul', 'etcd'],
216 help=_help)
217
218 _help = 'topic of core on the kafka bus'
219 parser.add_argument('-ct', '--core_topic',
220 dest='core_topic',
221 action='store',
222 default=defs['core_topic'],
223 help=_help)
224
225 args = parser.parse_args()
226
227 # post-processing
228
229 if args.instance_id_is_container_name:
230 args.instance_id = get_my_containers_name()
231
232 return args
233
234
235def load_config(args):
236 path = args.config
237 if path.startswith('.'):
238 dir = os.path.dirname(os.path.abspath(__file__))
239 path = os.path.join(dir, path)
240 path = os.path.abspath(path)
241 with open(path) as fd:
242 config = yaml.load(fd)
243 return config
244
245
246def print_banner(log):
247 log.info(' ____ _ ___ _ _____ ')
248 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| | |_ _| ')
249 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | | | | ')
250 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |___| | ')
251 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_____|_| ')
252 log.info(' ')
253 log.info(' _ _ _ ')
254 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
255 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
256 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
257 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
258 log.info(' |_| ')
259 log.info('(to stop: press Ctrl-C)')
260
261
262@implementer(IComponent)
263class Main(object):
264
265 def __init__(self):
266
267 self.args = args = parse_args()
268 self.config = load_config(args)
269
270 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
271 self.log = setup_logging(self.config.get('logging', {}),
272 args.instance_id,
273 verbosity_adjust=verbosity_adjust)
274 self.log.info('container-number-extractor',
275 regex=args.container_name_regex)
276
277 self.ponsim_olt_adapter_version = self.get_version()
278 self.log.info('Ponsim-OLT-Adapter-Version', version=
279 self.ponsim_olt_adapter_version)
280
281 if not args.no_banner:
282 print_banner(self.log)
283
284 # Create a unique instance id using the passed-in instance id and
285 # UTC timestamp
286 current_time = arrow.utcnow().timestamp
287 self.instance_id = self.args.instance_id + '_' + str(current_time)
288
289 self.core_topic = args.core_topic
290 self.listening_topic = args.name
291 self.startup_components()
292
293 if not args.no_heartbeat:
294 self.start_heartbeat()
295 self.start_kafka_cluster_heartbeat(self.instance_id)
296
297 def get_version(self):
298 path = defs['version_file']
299 if not path.startswith('/'):
300 dir = os.path.dirname(os.path.abspath(__file__))
301 path = os.path.join(dir, path)
302
303 path = os.path.abspath(path)
304 version_file = open(path, 'r')
305 v = version_file.read()
306
307 # Use Version to validate the version string - exception will be raised
308 # if the version is invalid
309 Version(v)
310
311 version_file.close()
312 return v
313
314 def start(self):
315 self.start_reactor() # will not return except Keyboard interrupt
316
317 def stop(self):
318 pass
319
320 def get_args(self):
321 """Allow access to command line args"""
322 return self.args
323
324 def get_config(self):
325 """Allow access to content of config file"""
326 return self.config
327
328 def _get_adapter_config(self):
329 cfg = AdapterConfig()
330 return cfg
331
332 @inlineCallbacks
333 def startup_components(self):
334 try:
335 self.log.info('starting-internal-components',
336 consul=self.args.consul,
337 etcd=self.args.etcd)
338
339 registry.register('main', self)
340
341 # Update the logger to output the vcore id.
342 self.log = update_logging(instance_id=self.instance_id,
343 vcore_id=None)
344
345 yield registry.register(
346 'kafka_cluster_proxy',
347 KafkaProxy(
348 self.args.consul,
349 self.args.kafka_cluster,
350 config=self.config.get('kafka-cluster-proxy', {})
351 )
352 ).start()
353
354 config = self._get_adapter_config()
355
356 self.core_proxy = CoreProxy(
357 kafka_proxy=None,
358 core_topic=self.core_topic,
359 my_listening_topic=self.listening_topic)
360
361 ponsim_olt_adapter = PonSimOltAdapter(
362 adapter_agent=self.core_proxy, config=config)
363 ponsim_request_handler = AdapterRequestFacade(
364 adapter=ponsim_olt_adapter)
365
366 yield registry.register(
367 'kafka_adapter_proxy',
368 IKafkaMessagingProxy(
369 kafka_host_port=self.args.kafka_adapter,
370 # TODO: Add KV Store object reference
371 kv_store=self.args.backend,
372 default_topic=self.args.name,
373 # Needs to assign a real class
374 target_cls=ponsim_request_handler
375 )
376 ).start()
377
378 self.core_proxy.kafka_proxy = get_messaging_proxy()
379
380 # retry for ever
381 res = yield self._register_with_core(-1)
382
383 self.log.info('started-internal-services')
384
385 except Exception as e:
386 self.log.exception('Failure-to-start-all-components', e=e)
387
388 @inlineCallbacks
389 def shutdown_components(self):
390 """Execute before the reactor is shut down"""
391 self.log.info('exiting-on-keyboard-interrupt')
392 for component in reversed(registry.iterate()):
393 yield component.stop()
394
395 import threading
396 self.log.info('THREADS:')
397 main_thread = threading.current_thread()
398 for t in threading.enumerate():
399 if t is main_thread:
400 continue
401 if not t.isDaemon():
402 continue
403 self.log.info('joining thread {} {}'.format(
404 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
405 t.join()
406
407 def start_reactor(self):
408 from twisted.internet import reactor
409 reactor.callWhenRunning(
410 lambda: self.log.info('twisted-reactor-started'))
411 reactor.addSystemEventTrigger('before', 'shutdown',
412 self.shutdown_components)
413 reactor.run()
414
415 @inlineCallbacks
416 def _register_with_core(self, retries):
417 # Send registration to Core with adapter specs
418 adapter = Adapter()
419 adapter.id = self.args.name
420 adapter.vendor = self.args.name
421 adapter.version = self.ponsim_olt_adapter_version
422 while 1:
423 try:
424 resp = yield self.core_proxy.register(adapter)
425 self.log.info('registration-response', response=resp)
426 returnValue(resp)
427 except TimeOutError as e:
428 self.log.warn("timeout-when-registering-with-core", e=e)
429 if retries == 0:
430 self.log.exception("no-more-retries", e=e)
431 raise
432 else:
433 retries = retries if retries < 0 else retries - 1
434 yield asleep(defs['retry_interval'])
435 except Exception as e:
436 self.log.exception("failed-registration", e=e)
437 raise
438
439 def start_heartbeat(self):
440
441 t0 = time.time()
442 t0s = time.ctime(t0)
443
444 def heartbeat():
445 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
446
447 lc = LoopingCall(heartbeat)
448 lc.start(10)
449
450 # Temporary function to send a heartbeat message to the external kafka
451 # broker
452 def start_kafka_cluster_heartbeat(self, instance_id):
453 # For heartbeat we will send a message to a specific "voltha-heartbeat"
454 # topic. The message is a protocol buf
455 # message
456 message = dict(
457 type='heartbeat',
458 adapter=self.args.name,
459 instance=instance_id,
460 ip=get_my_primary_local_ipv4()
461 )
462 topic = defs['heartbeat_topic']
463
464 def send_msg(start_time):
465 try:
466 kafka_cluster_proxy = get_kafka_proxy()
467 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
468 # self.log.debug('kafka-proxy-available')
469 message['ts'] = arrow.utcnow().timestamp
470 message['uptime'] = time.time() - start_time
471 # self.log.debug('start-kafka-heartbeat')
472 kafka_cluster_proxy.send_message(topic, dumps(message))
473 else:
474 self.log.error('kafka-proxy-unavailable')
475 except Exception, e:
476 self.log.exception('failed-sending-message-heartbeat', e=e)
477
478 try:
479 t0 = time.time()
480 lc = LoopingCall(send_msg, t0)
481 lc.start(10)
482 except Exception, e:
483 self.log.exception('failed-kafka-heartbeat', e=e)
484
485
486if __name__ == '__main__':
487 Main().start()