blob: 06210179c1e207995d5f02a1e08f509638bceb51 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim OLT Adapter main entry point"""
19
20import argparse
21import os
22import time
23
24import arrow
25import yaml
26from packaging.version import Version
27from simplejson import dumps
28from twisted.internet.defer import inlineCallbacks, returnValue
29from twisted.internet.task import LoopingCall
30from zope.interface import implementer
31
William Kurkian44cd7bb2019-02-11 16:39:12 -050032from pyvoltha.common.structlog_setup import setup_logging, update_logging
33from pyvoltha.common.utils.asleep import asleep
34from pyvoltha.common.utils.deferred_utils import TimeOutError
35from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
36from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
William Kurkian6f436d02019-02-06 16:25:01 -050037 get_my_primary_interface
William Kurkian44cd7bb2019-02-11 16:39:12 -050038from pyvoltha.common.utils.registry import registry, IComponent
39from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
40from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
41from pyvoltha.adapters.kafka.core_proxy import CoreProxy
42from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
William Kurkian6f436d02019-02-06 16:25:01 -050043 get_messaging_proxy
William Kurkian44cd7bb2019-02-11 16:39:12 -050044from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian6f436d02019-02-06 16:25:01 -050045from openolt import OpenoltAdapter
William Kurkian44cd7bb2019-02-11 16:39:12 -050046from pyvoltha.protos import third_party
47from pyvoltha.protos.adapter_pb2 import AdapterConfig
William Kurkian6f436d02019-02-06 16:25:01 -050048
49_ = third_party
50
51defs = dict(
52 version_file='./VERSION',
53 config=os.environ.get('CONFIG', './openolt.yml'),
54 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
55 '0-9]+)\..*$'),
56 consul=os.environ.get('CONSUL', 'localhost:8500'),
57 name=os.environ.get('NAME', 'openolt'),
58 vendor=os.environ.get('VENDOR', 'Voltha Project'),
59 device_type=os.environ.get('DEVICE_TYPE', 'openoltolt'),
60 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
61 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
62 etcd=os.environ.get('ETCD', 'localhost:2379'),
63 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
64 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
65 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
66 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
67 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
68 backend=os.environ.get('BACKEND', 'none'),
69 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
70 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
71)
72
73
74def parse_args():
75 parser = argparse.ArgumentParser()
76
77 _help = ('Path to openolt.yml config file (default: %s). '
78 'If relative, it is relative to main.py of openolt adapter.'
79 % defs['config'])
80 parser.add_argument('-c', '--config',
81 dest='config',
82 action='store',
83 default=defs['config'],
84 help=_help)
85
86 _help = 'Regular expression for extracting conatiner number from ' \
87 'container name (default: %s)' % defs['container_name_regex']
88 parser.add_argument('-X', '--container-number-extractor',
89 dest='container_name_regex',
90 action='store',
91 default=defs['container_name_regex'],
92 help=_help)
93
94 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
95 parser.add_argument('-C', '--consul',
96 dest='consul',
97 action='store',
98 default=defs['consul'],
99 help=_help)
100
101 _help = 'name of this adapter (default: %s)' % defs['name']
102 parser.add_argument('-na', '--name',
103 dest='name',
104 action='store',
105 default=defs['name'],
106 help=_help)
107
108 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
109 parser.add_argument('-ven', '--vendor',
110 dest='vendor',
111 action='store',
112 default=defs['vendor'],
113 help=_help)
114
115 _help = 'supported device type of this adapter (default: %s)' % defs[
116 'device_type']
117 parser.add_argument('-dt', '--device_type',
118 dest='device_type',
119 action='store',
120 default=defs['device_type'],
121 help=_help)
122
123 _help = 'specifies whether the device type accepts bulk flow updates ' \
124 'adapter (default: %s)' % defs['accept_bulk_flow']
125 parser.add_argument('-abf', '--accept_bulk_flow',
126 dest='accept_bulk_flow',
127 action='store',
128 default=defs['accept_bulk_flow'],
129 help=_help)
130
131 _help = 'specifies whether the device type accepts add/remove flow ' \
132 '(default: %s)' % defs['accept_atomic_flow']
133 parser.add_argument('-aaf', '--accept_atomic_flow',
134 dest='accept_atomic_flow',
135 action='store',
136 default=defs['accept_atomic_flow'],
137 help=_help)
138
139 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
140 parser.add_argument('-e', '--etcd',
141 dest='etcd',
142 action='store',
143 default=defs['etcd'],
144 help=_help)
145
146 _help = ('unique string id of this container instance (default: %s)'
147 % defs['instance_id'])
148 parser.add_argument('-i', '--instance-id',
149 dest='instance_id',
150 action='store',
151 default=defs['instance_id'],
152 help=_help)
153
154 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
155 parser.add_argument('-I', '--interface',
156 dest='interface',
157 action='store',
158 default=defs['interface'],
159 help=_help)
160
161 _help = 'omit startup banner log lines'
162 parser.add_argument('-n', '--no-banner',
163 dest='no_banner',
164 action='store_true',
165 default=False,
166 help=_help)
167
168 _help = 'do not emit periodic heartbeat log messages'
169 parser.add_argument('-N', '--no-heartbeat',
170 dest='no_heartbeat',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = "suppress debug and info logs"
176 parser.add_argument('-q', '--quiet',
177 dest='quiet',
178 action='count',
179 help=_help)
180
181 _help = 'enable verbose logging'
182 parser.add_argument('-v', '--verbose',
183 dest='verbose',
184 action='count',
185 help=_help)
186
187 _help = ('use docker container name as conatiner instance id'
188 ' (overrides -i/--instance-id option)')
189 parser.add_argument('--instance-id-is-container-name',
190 dest='instance_id_is_container_name',
191 action='store_true',
192 default=False,
193 help=_help)
194
195 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
196 'If not '
197 'specified (None), the address from the config file is used'
198 % defs['kafka_adapter'])
199 parser.add_argument('-KA', '--kafka_adapter',
200 dest='kafka_adapter',
201 action='store',
202 default=defs['kafka_adapter'],
203 help=_help)
204
205 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
206 'If not '
207 'specified (None), the address from the config file is used'
208 % defs['kafka_cluster'])
209 parser.add_argument('-KC', '--kafka_cluster',
210 dest='kafka_cluster',
211 action='store',
212 default=defs['kafka_cluster'],
213 help=_help)
214
215 _help = 'backend to use for config persitence'
216 parser.add_argument('-b', '--backend',
217 default=defs['backend'],
218 choices=['none', 'consul', 'etcd'],
219 help=_help)
220
221 _help = 'topic of core on the kafka bus'
222 parser.add_argument('-ct', '--core_topic',
223 dest='core_topic',
224 action='store',
225 default=defs['core_topic'],
226 help=_help)
227
228 args = parser.parse_args()
229
230 # post-processing
231
232 if args.instance_id_is_container_name:
233 args.instance_id = get_my_containers_name()
234
235 return args
236
237
238def load_config(args):
239 path = args.config
240 if path.startswith('.'):
241 dir = os.path.dirname(os.path.abspath(__file__))
242 path = os.path.join(dir, path)
243 path = os.path.abspath(path)
William Kurkianfefd4642019-02-07 15:30:03 -0500244
William Kurkian6f436d02019-02-06 16:25:01 -0500245 with open(path) as fd:
246 config = yaml.load(fd)
247 return config
248
249
250
251
252def print_banner(log):
253 log.info(' ____ _____ ___ _ ___ ___ _ _____ ')
254 log.info('/ _ \| _ \| __| | / | / _ \| | |_ _| ')
255 log.info('| | | | | )_| | | | / /| | | | | | | | | ')
256 log.info('| |_| | __/ |_|_| |/ / | | |_| | |___| | ')
257 log.info('\____/|_| |___|_| |_| \___/|_____|_| ')
258 log.info(' ')
259 log.info(' _ _ _ ')
260 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
261 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
262 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
263 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
264 log.info(' |_| ')
265 log.info('(to stop: press Ctrl-C)')
266
267@implementer(IComponent)
268class Main(object):
269
270 def __init__(self):
271
272 self.args = args = parse_args()
William Kurkianfefd4642019-02-07 15:30:03 -0500273 self.config = load_config(args)
William Kurkian6f436d02019-02-06 16:25:01 -0500274
275 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
276 self.log = setup_logging(self.config.get('logging', {}),
277 args.instance_id,
278 verbosity_adjust=verbosity_adjust)
279 self.log.info('container-number-extractor',
280 regex=args.container_name_regex)
William Kurkianfefd4642019-02-07 15:30:03 -0500281
William Kurkian6f436d02019-02-06 16:25:01 -0500282 self.openolt_adapter_version = self.get_version()
283 self.log.info('Open-OLT-Adapter-Version', version=
284 self.openolt_adapter_version)
285
286 if not args.no_banner:
287 print_banner(self.log)
288
289 self.adapter = None
290 # Create a unique instance id using the passed-in instance id and
291 # UTC timestamp
292 current_time = arrow.utcnow().timestamp
293 self.instance_id = self.args.instance_id + '_' + str(current_time)
294
295 self.core_topic = args.core_topic
296 self.listening_topic = args.name
297 self.startup_components()
298
299 if not args.no_heartbeat:
300 self.start_heartbeat()
301 self.start_kafka_cluster_heartbeat(self.instance_id)
302
303 def get_version(self):
304 path = defs['version_file']
305 if not path.startswith('/'):
306 dir = os.path.dirname(os.path.abspath(__file__))
307 path = os.path.join(dir, path)
308
309 path = os.path.abspath(path)
310 version_file = open(path, 'r')
311 v = version_file.read()
312
313 # Use Version to validate the version string - exception will be raised
314 # if the version is invalid
315 Version(v)
316
317 version_file.close()
318 return v
319
320 def start(self):
321 self.start_reactor() # will not return except Keyboard interrupt
322
323 def stop(self):
324 pass
325
326 def get_args(self):
327 """Allow access to command line args"""
328 return self.args
329
330 def get_config(self):
331 """Allow access to content of config file"""
332 return self.config
333
334 def _get_adapter_config(self):
335 cfg = AdapterConfig()
336 return cfg
337
338 @inlineCallbacks
339 def startup_components(self):
340 try:
341 self.log.info('starting-internal-components',
342 consul=self.args.consul,
343 etcd=self.args.etcd)
344
345 registry.register('main', self)
346
347 # Update the logger to output the vcore id.
348 self.log = update_logging(instance_id=self.instance_id,
349 vcore_id=None)
350
351 yield registry.register(
352 'kafka_cluster_proxy',
353 KafkaProxy(
354 self.args.consul,
355 self.args.kafka_cluster,
356 config=self.config.get('kafka-cluster-proxy', {})
357 )
358 ).start()
359
360 config = self._get_adapter_config()
361
362 self.core_proxy = CoreProxy(
363 kafka_proxy=None,
364 core_topic=self.core_topic,
William Kurkian3a341a22019-02-13 18:23:44 -0500365 my_listening_topic=self.listening_topic,
366 adapter_name=self.args.name)
William Kurkian6f436d02019-02-06 16:25:01 -0500367
368 self.adapter_proxy = AdapterProxy(
369 kafka_proxy=None,
370 core_topic=self.core_topic,
371 my_listening_topic=self.listening_topic)
372
373 self.adapter = OpenoltAdapter(core_proxy=self.core_proxy,
374 adapter_proxy=self.adapter_proxy,
375 config=config)
376
377 openolt_request_handler = AdapterRequestFacade(adapter=self.adapter)
378
379 yield registry.register(
380 'kafka_adapter_proxy',
381 IKafkaMessagingProxy(
382 kafka_host_port=self.args.kafka_adapter,
383 # TODO: Add KV Store object reference
384 kv_store=self.args.backend,
385 default_topic=self.args.name,
386 group_id_prefix=self.args.instance_id,
387 # Needs to assign a real class
388 target_cls=openolt_request_handler
389
390 )
391 ).start()
392
393 self.core_proxy.kafka_proxy = get_messaging_proxy()
394 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
395
396 # retry for ever
397 res = yield self._register_with_core(-1)
398
399 self.log.info('started-internal-services')
400
401 except Exception as e:
402 self.log.exception('Failure-to-start-all-components', e=e)
403
404 @inlineCallbacks
405 def shutdown_components(self):
406 """Execute before the reactor is shut down"""
407 self.log.info('exiting-on-keyboard-interrupt')
408 for component in reversed(registry.iterate()):
409 yield component.stop()
410
411 import threading
412 self.log.info('THREADS:')
413 main_thread = threading.current_thread()
414 for t in threading.enumerate():
415 if t is main_thread:
416 continue
417 if not t.isDaemon():
418 continue
419 self.log.info('joining thread {} {}'.format(
420 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
421 t.join()
422
423 def start_reactor(self):
424 from twisted.internet import reactor
425 reactor.callWhenRunning(
426 lambda: self.log.info('twisted-reactor-started'))
427 reactor.addSystemEventTrigger('before', 'shutdown',
428 self.shutdown_components)
429 reactor.run()
430
431 @inlineCallbacks
432 def _register_with_core(self, retries):
433 while 1:
434 try:
435 resp = yield self.core_proxy.register(
436 self.adapter.adapter_descriptor(),
437 self.adapter.device_types())
438 if resp:
439 self.log.info('registered-with-core',
440 coreId=resp.instance_id)
441 returnValue(resp)
442 except TimeOutError as e:
443 self.log.warn("timeout-when-registering-with-core", e=e)
444 if retries == 0:
445 self.log.exception("no-more-retries", e=e)
446 raise
447 else:
448 retries = retries if retries < 0 else retries - 1
449 yield asleep(defs['retry_interval'])
450 except Exception as e:
451 self.log.exception("failed-registration", e=e)
452 raise
453
454 def start_heartbeat(self):
455
456 t0 = time.time()
457 t0s = time.ctime(t0)
458
459 def heartbeat():
460 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
461
462 lc = LoopingCall(heartbeat)
463 lc.start(10)
464
465 # Temporary function to send a heartbeat message to the external kafka
466 # broker
467 def start_kafka_cluster_heartbeat(self, instance_id):
468 # For heartbeat we will send a message to a specific "voltha-heartbeat"
469 # topic. The message is a protocol buf
470 # message
471 message = dict(
472 type='heartbeat',
473 adapter=self.args.name,
474 instance=instance_id,
475 ip=get_my_primary_local_ipv4()
476 )
477 topic = defs['heartbeat_topic']
478
479 def send_msg(start_time):
480 try:
481 kafka_cluster_proxy = get_kafka_proxy()
482 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
483 # self.log.debug('kafka-proxy-available')
484 message['ts'] = arrow.utcnow().timestamp
485 message['uptime'] = time.time() - start_time
486 # self.log.debug('start-kafka-heartbeat')
487 kafka_cluster_proxy.send_message(topic, dumps(message))
488 else:
489 self.log.error('kafka-proxy-unavailable')
490 except Exception, e:
491 self.log.exception('failed-sending-message-heartbeat', e=e)
492
493 try:
494 t0 = time.time()
495 lc = LoopingCall(send_msg, t0)
496 lc.start(10)
497 except Exception, e:
498 self.log.exception('failed-kafka-heartbeat', e=e)
499
500
501if __name__ == '__main__':
502 Main().start()