blob: 024be85711a0e8385ea2e56d6322cb72b793893e [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
Matt Jeanneret9fd36df2019-02-14 19:14:36 -050018"""OpenOLT Adapter main entry point"""
William Kurkian6f436d02019-02-06 16:25:01 -050019
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
William Kurkian44cd7bb2019-02-11 16:39:12 -050032from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
William Kurkian6f436d02019-02-06 16:25:01 -050037 get_my_primary_interface
William Kurkian44cd7bb2019-02-11 16:39:12 -050038from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
William Kurkian6f436d02019-02-06 16:25:01 -050043 get_messaging_proxy
William Kurkian44cd7bb2019-02-11 16:39:12 -050044from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian6f436d02019-02-06 16:25:01 -050045from openolt import OpenoltAdapter
William Kurkian8b1690c2019-03-04 16:53:22 -050046#from voltha_protos import third_party
47from voltha_protos.adapter_pb2 import AdapterConfig
William Kurkian6f436d02019-02-06 16:25:01 -050048
William Kurkian8b1690c2019-03-04 16:53:22 -050049#_ = third_party
William Kurkian6f436d02019-02-06 16:25:01 -050050
51defs = dict(
52 version_file='./VERSION',
53 config=os.environ.get('CONFIG', './openolt.yml'),
54 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
55 '0-9]+)\..*$'),
56 consul=os.environ.get('CONSUL', 'localhost:8500'),
57 name=os.environ.get('NAME', 'openolt'),
58 vendor=os.environ.get('VENDOR', 'Voltha Project'),
59 device_type=os.environ.get('DEVICE_TYPE', 'openoltolt'),
60 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
61 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
62 etcd=os.environ.get('ETCD', 'localhost:2379'),
63 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
64 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
65 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
66 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
67 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
68 backend=os.environ.get('BACKEND', 'none'),
69 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
70 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
71)
72
73
74def parse_args():
75 parser = argparse.ArgumentParser()
76
77 _help = ('Path to openolt.yml config file (default: %s). '
78 'If relative, it is relative to main.py of openolt adapter.'
79 % defs['config'])
80 parser.add_argument('-c', '--config',
81 dest='config',
82 action='store',
83 default=defs['config'],
84 help=_help)
85
86 _help = 'Regular expression for extracting conatiner number from ' \
87 'container name (default: %s)' % defs['container_name_regex']
88 parser.add_argument('-X', '--container-number-extractor',
89 dest='container_name_regex',
90 action='store',
91 default=defs['container_name_regex'],
92 help=_help)
93
94 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
95 parser.add_argument('-C', '--consul',
96 dest='consul',
97 action='store',
98 default=defs['consul'],
99 help=_help)
100
101 _help = 'name of this adapter (default: %s)' % defs['name']
102 parser.add_argument('-na', '--name',
103 dest='name',
104 action='store',
105 default=defs['name'],
106 help=_help)
107
108 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
109 parser.add_argument('-ven', '--vendor',
110 dest='vendor',
111 action='store',
112 default=defs['vendor'],
113 help=_help)
114
115 _help = 'supported device type of this adapter (default: %s)' % defs[
116 'device_type']
117 parser.add_argument('-dt', '--device_type',
118 dest='device_type',
119 action='store',
120 default=defs['device_type'],
121 help=_help)
122
123 _help = 'specifies whether the device type accepts bulk flow updates ' \
124 'adapter (default: %s)' % defs['accept_bulk_flow']
125 parser.add_argument('-abf', '--accept_bulk_flow',
126 dest='accept_bulk_flow',
127 action='store',
128 default=defs['accept_bulk_flow'],
129 help=_help)
130
131 _help = 'specifies whether the device type accepts add/remove flow ' \
132 '(default: %s)' % defs['accept_atomic_flow']
133 parser.add_argument('-aaf', '--accept_atomic_flow',
134 dest='accept_atomic_flow',
135 action='store',
136 default=defs['accept_atomic_flow'],
137 help=_help)
138
139 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
140 parser.add_argument('-e', '--etcd',
141 dest='etcd',
142 action='store',
143 default=defs['etcd'],
144 help=_help)
145
146 _help = ('unique string id of this container instance (default: %s)'
147 % defs['instance_id'])
148 parser.add_argument('-i', '--instance-id',
149 dest='instance_id',
150 action='store',
151 default=defs['instance_id'],
152 help=_help)
153
154 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
155 parser.add_argument('-I', '--interface',
156 dest='interface',
157 action='store',
158 default=defs['interface'],
159 help=_help)
160
161 _help = 'omit startup banner log lines'
162 parser.add_argument('-n', '--no-banner',
163 dest='no_banner',
164 action='store_true',
165 default=False,
166 help=_help)
167
168 _help = 'do not emit periodic heartbeat log messages'
169 parser.add_argument('-N', '--no-heartbeat',
170 dest='no_heartbeat',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = "suppress debug and info logs"
176 parser.add_argument('-q', '--quiet',
177 dest='quiet',
178 action='count',
179 help=_help)
180
181 _help = 'enable verbose logging'
182 parser.add_argument('-v', '--verbose',
183 dest='verbose',
184 action='count',
185 help=_help)
186
187 _help = ('use docker container name as conatiner instance id'
188 ' (overrides -i/--instance-id option)')
189 parser.add_argument('--instance-id-is-container-name',
190 dest='instance_id_is_container_name',
191 action='store_true',
192 default=False,
193 help=_help)
194
195 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
196 'If not '
197 'specified (None), the address from the config file is used'
198 % defs['kafka_adapter'])
199 parser.add_argument('-KA', '--kafka_adapter',
200 dest='kafka_adapter',
201 action='store',
202 default=defs['kafka_adapter'],
203 help=_help)
204
205 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
206 'If not '
207 'specified (None), the address from the config file is used'
208 % defs['kafka_cluster'])
209 parser.add_argument('-KC', '--kafka_cluster',
210 dest='kafka_cluster',
211 action='store',
212 default=defs['kafka_cluster'],
213 help=_help)
214
215 _help = 'backend to use for config persitence'
216 parser.add_argument('-b', '--backend',
217 default=defs['backend'],
218 choices=['none', 'consul', 'etcd'],
219 help=_help)
220
221 _help = 'topic of core on the kafka bus'
222 parser.add_argument('-ct', '--core_topic',
223 dest='core_topic',
224 action='store',
225 default=defs['core_topic'],
226 help=_help)
227
228 args = parser.parse_args()
229
230 # post-processing
231
232 if args.instance_id_is_container_name:
233 args.instance_id = get_my_containers_name()
234
235 return args
236
237
238def load_config(args):
239 path = args.config
240 if path.startswith('.'):
241 dir = os.path.dirname(os.path.abspath(__file__))
242 path = os.path.join(dir, path)
243 path = os.path.abspath(path)
Matt Jeanneret9fd36df2019-02-14 19:14:36 -0500244
William Kurkian6f436d02019-02-06 16:25:01 -0500245 with open(path) as fd:
246 config = yaml.load(fd)
247 return config
248
249
William Kurkian6f436d02019-02-06 16:25:01 -0500250def print_banner(log):
251 log.info(' ____ _____ ___ _ ___ ___ _ _____ ')
252 log.info('/ _ \| _ \| __| | / | / _ \| | |_ _| ')
253 log.info('| | | | | )_| | | | / /| | | | | | | | | ')
254 log.info('| |_| | __/ |_|_| |/ / | | |_| | |___| | ')
255 log.info('\____/|_| |___|_| |_| \___/|_____|_| ')
256 log.info(' ')
257 log.info(' _ _ _ ')
258 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
259 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
260 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
261 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
262 log.info(' |_| ')
263 log.info('(to stop: press Ctrl-C)')
264
Matt Jeanneret9fd36df2019-02-14 19:14:36 -0500265
William Kurkian6f436d02019-02-06 16:25:01 -0500266@implementer(IComponent)
267class Main(object):
268
269 def __init__(self):
270
271 self.args = args = parse_args()
Matt Jeanneret9fd36df2019-02-14 19:14:36 -0500272 self.config = load_config(args)
William Kurkian6f436d02019-02-06 16:25:01 -0500273
274 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
275 self.log = setup_logging(self.config.get('logging', {}),
276 args.instance_id,
277 verbosity_adjust=verbosity_adjust)
278 self.log.info('container-number-extractor',
279 regex=args.container_name_regex)
Matt Jeanneret9fd36df2019-02-14 19:14:36 -0500280
William Kurkian6f436d02019-02-06 16:25:01 -0500281 self.openolt_adapter_version = self.get_version()
Matt Jeanneret9fd36df2019-02-14 19:14:36 -0500282 self.log.info('Open-OLT-Adapter-Version',
283 version=self.openolt_adapter_version)
William Kurkian6f436d02019-02-06 16:25:01 -0500284
285 if not args.no_banner:
286 print_banner(self.log)
287
288 self.adapter = None
289 # Create a unique instance id using the passed-in instance id and
290 # UTC timestamp
291 current_time = arrow.utcnow().timestamp
292 self.instance_id = self.args.instance_id + '_' + str(current_time)
293
294 self.core_topic = args.core_topic
295 self.listening_topic = args.name
296 self.startup_components()
297
298 if not args.no_heartbeat:
299 self.start_heartbeat()
300 self.start_kafka_cluster_heartbeat(self.instance_id)
301
302 def get_version(self):
303 path = defs['version_file']
304 if not path.startswith('/'):
305 dir = os.path.dirname(os.path.abspath(__file__))
306 path = os.path.join(dir, path)
307
308 path = os.path.abspath(path)
309 version_file = open(path, 'r')
310 v = version_file.read()
311
312 # Use Version to validate the version string - exception will be raised
313 # if the version is invalid
314 Version(v)
315
316 version_file.close()
317 return v
318
319 def start(self):
320 self.start_reactor() # will not return except Keyboard interrupt
321
322 def stop(self):
323 pass
324
325 def get_args(self):
326 """Allow access to command line args"""
327 return self.args
328
329 def get_config(self):
330 """Allow access to content of config file"""
331 return self.config
332
333 def _get_adapter_config(self):
334 cfg = AdapterConfig()
335 return cfg
336
337 @inlineCallbacks
338 def startup_components(self):
339 try:
340 self.log.info('starting-internal-components',
341 consul=self.args.consul,
342 etcd=self.args.etcd)
343
344 registry.register('main', self)
345
346 # Update the logger to output the vcore id.
347 self.log = update_logging(instance_id=self.instance_id,
348 vcore_id=None)
349
350 yield registry.register(
351 'kafka_cluster_proxy',
352 KafkaProxy(
353 self.args.consul,
354 self.args.kafka_cluster,
355 config=self.config.get('kafka-cluster-proxy', {})
356 )
357 ).start()
358
359 config = self._get_adapter_config()
360
361 self.core_proxy = CoreProxy(
362 kafka_proxy=None,
Matt Jeanneretdfcfc252019-02-28 13:30:03 -0500363 default_core_topic=self.core_topic,
Matt Jeanneret9fd36df2019-02-14 19:14:36 -0500364 my_listening_topic=self.listening_topic)
William Kurkian6f436d02019-02-06 16:25:01 -0500365
366 self.adapter_proxy = AdapterProxy(
367 kafka_proxy=None,
368 core_topic=self.core_topic,
369 my_listening_topic=self.listening_topic)
370
371 self.adapter = OpenoltAdapter(core_proxy=self.core_proxy,
Matt Jeanneret9fd36df2019-02-14 19:14:36 -0500372 adapter_proxy=self.adapter_proxy,
373 config=config)
William Kurkian6f436d02019-02-06 16:25:01 -0500374
Matt Jeanneretb428b952019-03-07 05:14:17 -0500375 self.adapter.start()
376
Matt Jeanneretdfcfc252019-02-28 13:30:03 -0500377 openolt_request_handler = AdapterRequestFacade(adapter=self.adapter,
378 core_proxy=self.core_proxy)
William Kurkian6f436d02019-02-06 16:25:01 -0500379
380 yield registry.register(
381 'kafka_adapter_proxy',
382 IKafkaMessagingProxy(
383 kafka_host_port=self.args.kafka_adapter,
384 # TODO: Add KV Store object reference
385 kv_store=self.args.backend,
386 default_topic=self.args.name,
Matt Jeanneret9fd36df2019-02-14 19:14:36 -0500387 group_id_prefix=self.args.instance_id,
William Kurkian6f436d02019-02-06 16:25:01 -0500388 # Needs to assign a real class
389 target_cls=openolt_request_handler
390
391 )
392 ).start()
393
394 self.core_proxy.kafka_proxy = get_messaging_proxy()
395 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
396
397 # retry for ever
398 res = yield self._register_with_core(-1)
399
400 self.log.info('started-internal-services')
401
402 except Exception as e:
403 self.log.exception('Failure-to-start-all-components', e=e)
404
405 @inlineCallbacks
406 def shutdown_components(self):
407 """Execute before the reactor is shut down"""
408 self.log.info('exiting-on-keyboard-interrupt')
409 for component in reversed(registry.iterate()):
410 yield component.stop()
411
412 import threading
413 self.log.info('THREADS:')
414 main_thread = threading.current_thread()
415 for t in threading.enumerate():
416 if t is main_thread:
417 continue
418 if not t.isDaemon():
419 continue
420 self.log.info('joining thread {} {}'.format(
421 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
422 t.join()
423
424 def start_reactor(self):
425 from twisted.internet import reactor
426 reactor.callWhenRunning(
427 lambda: self.log.info('twisted-reactor-started'))
428 reactor.addSystemEventTrigger('before', 'shutdown',
429 self.shutdown_components)
430 reactor.run()
431
432 @inlineCallbacks
433 def _register_with_core(self, retries):
434 while 1:
435 try:
436 resp = yield self.core_proxy.register(
437 self.adapter.adapter_descriptor(),
438 self.adapter.device_types())
439 if resp:
440 self.log.info('registered-with-core',
441 coreId=resp.instance_id)
442 returnValue(resp)
443 except TimeOutError as e:
444 self.log.warn("timeout-when-registering-with-core", e=e)
445 if retries == 0:
446 self.log.exception("no-more-retries", e=e)
447 raise
448 else:
449 retries = retries if retries < 0 else retries - 1
450 yield asleep(defs['retry_interval'])
451 except Exception as e:
452 self.log.exception("failed-registration", e=e)
453 raise
454
455 def start_heartbeat(self):
456
457 t0 = time.time()
458 t0s = time.ctime(t0)
459
460 def heartbeat():
461 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
462
463 lc = LoopingCall(heartbeat)
464 lc.start(10)
465
466 # Temporary function to send a heartbeat message to the external kafka
467 # broker
468 def start_kafka_cluster_heartbeat(self, instance_id):
469 # For heartbeat we will send a message to a specific "voltha-heartbeat"
470 # topic. The message is a protocol buf
471 # message
472 message = dict(
473 type='heartbeat',
474 adapter=self.args.name,
475 instance=instance_id,
476 ip=get_my_primary_local_ipv4()
477 )
478 topic = defs['heartbeat_topic']
479
480 def send_msg(start_time):
481 try:
482 kafka_cluster_proxy = get_kafka_proxy()
483 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
484 # self.log.debug('kafka-proxy-available')
485 message['ts'] = arrow.utcnow().timestamp
486 message['uptime'] = time.time() - start_time
487 # self.log.debug('start-kafka-heartbeat')
488 kafka_cluster_proxy.send_message(topic, dumps(message))
489 else:
490 self.log.error('kafka-proxy-unavailable')
491 except Exception, e:
492 self.log.exception('failed-sending-message-heartbeat', e=e)
493
494 try:
495 t0 = time.time()
496 lc = LoopingCall(send_msg, t0)
497 lc.start(10)
498 except Exception, e:
499 self.log.exception('failed-kafka-heartbeat', e=e)
500
501
502if __name__ == '__main__':
503 Main().start()