blob: f7b2404ac042367eebc0cc87995491262a4cd29b [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050020from __future__ import absolute_import
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050021import argparse
22import os
23import time
Matt Jeanneret08a8e862019-12-20 14:02:32 -050024import types
Rohan Agrawalac066a02020-03-09 12:33:58 +000025import sys
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050026
27import arrow
28import yaml
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050029import socketserver
Matt Jeanneret08a8e862019-12-20 14:02:32 -050030import configparser
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000031
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050032from simplejson import dumps
Neha Sharma61446d32020-03-23 14:32:31 +000033from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050034from twisted.internet.task import LoopingCall
35from zope.interface import implementer
36
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000037from pyvoltha.common.structlog_setup import setup_logging, update_logging, string_to_int
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050038from pyvoltha.common.utils.asleep import asleep
39from pyvoltha.common.utils.deferred_utils import TimeOutError
40from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
41from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050042 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050043from pyvoltha.common.utils.registry import registry, IComponent
44from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
45from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
46from pyvoltha.adapters.kafka.core_proxy import CoreProxy
47from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050048 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050049from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050050from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050051
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050052from brcm_openomci_onu_adapter import BrcmOpenomciOnuAdapter
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000053from probe import Probe
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000054from pyvoltha.adapters.log_controller import LogController, KV_STORE_DATA_PATH_PREFIX
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000055
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050056defs = dict(
Matt Jeanneret08a8e862019-12-20 14:02:32 -050057 build_info_file='./BUILDINFO',
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050058 config=os.environ.get('CONFIG', './openonu.yml'),
59 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
60 '0-9]+)\..*$'),
61 consul=os.environ.get('CONSUL', 'localhost:8500'),
Matteo Scandolobb3317d2020-03-31 10:49:42 -070062 name=os.environ.get('NAME', 'brcm_openomci_onu'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050063 vendor=os.environ.get('VENDOR', 'Voltha Project'),
64 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
65 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
66 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
Mahir Gunyel45610b42020-03-16 17:29:01 -070067 accept_incremental_evto_update=os.environ.get('ACCEPT_INCREMENTAL_EVTO_UPDATE', False),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050068 etcd=os.environ.get('ETCD', 'localhost:2379'),
69 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
serkant.uluderya28e2c922020-05-30 00:30:12 -070070 adapter_topic=os.environ.get('ADAPTER_TOPIC', 'openolt'),
Devmalya Paulffc89df2019-07-31 17:43:13 -040071 event_topic=os.environ.get('EVENT_TOPIC', 'voltha.events'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050072 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
73 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
74 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
75 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
76 backend=os.environ.get('BACKEND', 'none'),
77 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
78 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
Rohan Agrawalac066a02020-03-09 12:33:58 +000079 probe=os.environ.get('PROBE', ':8080'),
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000080 log_level=os.environ.get('LOG_LEVEL', 'WARN'),
Matteo Scandolobb3317d2020-03-31 10:49:42 -070081 current_replica=1,
82 total_replicas=1,
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000083 component_name=os.environ.get('COMPONENT_NAME', "adapter-open-onu")
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050084)
85
86
87def parse_args():
88 parser = argparse.ArgumentParser()
89
90 _help = ('Path to openonu.yml config file (default: %s). '
91 'If relative, it is relative to main.py of openonu adapter.'
92 % defs['config'])
93 parser.add_argument('-c', '--config',
94 dest='config',
95 action='store',
96 default=defs['config'],
97 help=_help)
98
99 _help = 'Regular expression for extracting conatiner number from ' \
100 'container name (default: %s)' % defs['container_name_regex']
101 parser.add_argument('-X', '--container-number-extractor',
102 dest='container_name_regex',
103 action='store',
104 default=defs['container_name_regex'],
105 help=_help)
106
107 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
108 parser.add_argument('-C', '--consul',
109 dest='consul',
110 action='store',
111 default=defs['consul'],
112 help=_help)
113
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700114 # NOTE this is really the adapter type
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500115 _help = 'name of this adapter (default: %s)' % defs['name']
116 parser.add_argument('-na', '--name',
117 dest='name',
118 action='store',
119 default=defs['name'],
120 help=_help)
121
122 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
123 parser.add_argument('-ven', '--vendor',
124 dest='vendor',
125 action='store',
126 default=defs['vendor'],
127 help=_help)
128
129 _help = 'supported device type of this adapter (default: %s)' % defs[
130 'device_type']
131 parser.add_argument('-dt', '--device_type',
132 dest='device_type',
133 action='store',
134 default=defs['device_type'],
135 help=_help)
136
137 _help = 'specifies whether the device type accepts bulk flow updates ' \
138 'adapter (default: %s)' % defs['accept_bulk_flow']
139 parser.add_argument('-abf', '--accept_bulk_flow',
140 dest='accept_bulk_flow',
141 action='store',
142 default=defs['accept_bulk_flow'],
143 help=_help)
144
145 _help = 'specifies whether the device type accepts add/remove flow ' \
146 '(default: %s)' % defs['accept_atomic_flow']
147 parser.add_argument('-aaf', '--accept_atomic_flow',
148 dest='accept_atomic_flow',
149 action='store',
150 default=defs['accept_atomic_flow'],
151 help=_help)
152
Mahir Gunyel45610b42020-03-16 17:29:01 -0700153 _help = 'specifies whether the adapter accepts incremental EVTO updates ' \
154 '(default: %s)' % defs['accept_incremental_evto_update']
155 parser.add_argument('-aie', '--accept_incremental_evto_update',
156 dest='accept_incremental_evto_update',
157 action='store',
158 default=defs['accept_incremental_evto_update'],
159 help=_help)
160
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500161 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
162 parser.add_argument('-e', '--etcd',
163 dest='etcd',
164 action='store',
165 default=defs['etcd'],
166 help=_help)
167
168 _help = ('unique string id of this container instance (default: %s)'
169 % defs['instance_id'])
170 parser.add_argument('-i', '--instance-id',
171 dest='instance_id',
172 action='store',
173 default=defs['instance_id'],
174 help=_help)
175
176 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
177 parser.add_argument('-I', '--interface',
178 dest='interface',
179 action='store',
180 default=defs['interface'],
181 help=_help)
182
183 _help = 'omit startup banner log lines'
184 parser.add_argument('-n', '--no-banner',
185 dest='no_banner',
186 action='store_true',
187 default=False,
188 help=_help)
189
190 _help = 'do not emit periodic heartbeat log messages'
191 parser.add_argument('-N', '--no-heartbeat',
192 dest='no_heartbeat',
193 action='store_true',
194 default=False,
195 help=_help)
196
Rohan Agrawalac066a02020-03-09 12:33:58 +0000197 _help = 'enable logging'
198 parser.add_argument('-l', '--log_level',
199 dest='log_level',
200 action='store',
201 default=defs['log_level'],
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500202 help=_help)
203
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000204 _help = 'get the component name'
205 parser.add_argument('-cn', '--component_name',
206 dest='env',
207 action='store',
208 help=_help)
209
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500210 _help = ('use docker container name as conatiner instance id'
211 ' (overrides -i/--instance-id option)')
212 parser.add_argument('--instance-id-is-container-name',
213 dest='instance_id_is_container_name',
214 action='store_true',
215 default=False,
216 help=_help)
217
218 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
219 'If not '
220 'specified (None), the address from the config file is used'
221 % defs['kafka_adapter'])
222 parser.add_argument('-KA', '--kafka_adapter',
223 dest='kafka_adapter',
224 action='store',
225 default=defs['kafka_adapter'],
226 help=_help)
227
228 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
229 'If not '
230 'specified (None), the address from the config file is used'
231 % defs['kafka_cluster'])
232 parser.add_argument('-KC', '--kafka_cluster',
233 dest='kafka_cluster',
234 action='store',
235 default=defs['kafka_cluster'],
236 help=_help)
237
238 _help = 'backend to use for config persitence'
239 parser.add_argument('-b', '--backend',
240 default=defs['backend'],
241 choices=['none', 'consul', 'etcd'],
242 help=_help)
243
244 _help = 'topic of core on the kafka bus'
245 parser.add_argument('-ct', '--core_topic',
246 dest='core_topic',
247 action='store',
248 default=defs['core_topic'],
249 help=_help)
serkant.uluderya28e2c922020-05-30 00:30:12 -0700250
251 _help = 'topic of openolt adapter on the kafka bus'
252 parser.add_argument('-at', '--adapter_topic',
253 dest='adapter_topic',
254 action='store',
255 default=defs['adapter_topic'],
256 help=_help)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500257
Devmalya Paulffc89df2019-07-31 17:43:13 -0400258 _help = 'topic of events on the kafka bus'
259 parser.add_argument('-et', '--event_topic',
260 dest='event_topic',
261 action='store',
262 default=defs['event_topic'],
263 help=_help)
264
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000265 _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
266 parser.add_argument(
267 '-P', '--probe', dest='probe', action='store',
268 default=defs['probe'],
269 help=_help)
270
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700271 _help = 'Replica number of this particular instance (default: %s)' % defs['current_replica']
272 parser.add_argument(
273 '--currentReplica', dest='current_replica', action='store',
274 default=defs['current_replica'],
275 type=int,
276 help=_help)
277
278 _help = 'Total number of instances for this adapter (default: %s)' % defs['total_replicas']
279 parser.add_argument(
280 '--totalReplicas', dest='total_replicas', action='store',
281 default=defs['total_replicas'],
282 type=int,
283 help=_help)
284
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500285 args = parser.parse_args()
286
287 # post-processing
288
289 if args.instance_id_is_container_name:
290 args.instance_id = get_my_containers_name()
291
292 return args
293
294
295def load_config(args):
296 path = args.config
297 if path.startswith('.'):
298 dir = os.path.dirname(os.path.abspath(__file__))
299 path = os.path.join(dir, path)
300 path = os.path.abspath(path)
301 with open(path) as fd:
302 config = yaml.load(fd)
303 return config
304
305
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500306def get_build_info():
307 path = defs['build_info_file']
308 if not path.startswith('/'):
309 dir = os.path.dirname(os.path.abspath(__file__))
310 path = os.path.join(dir, path)
311 path = os.path.abspath(path)
312 build_info = configparser.ConfigParser()
313 build_info.read(path)
314 results = types.SimpleNamespace(
315 version=build_info.get('buildinfo', 'version', fallback='unknown'),
316 vcs_ref=build_info.get('buildinfo', 'vcs_ref', fallback='unknown'),
317 vcs_dirty=build_info.get('buildinfo', 'vcs_dirty', fallback='unknown'),
318 build_time=build_info.get('buildinfo', 'build_time', fallback='unknown')
319 )
320 return results
321
322
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500323def print_banner(log):
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500324 log.info(' ___________ _____ _ _ _____ _ _ _ _ ')
325 log.info(' | _ | ___ \ ___| \ | | _ | \ | | | | | ')
326 log.info(' | | | | |_/ / |__ | \| | | | | \| | | | | ')
327 log.info(' | | | | __/| __|| . ` | | | | . ` | | | | ')
328 log.info(' \ \_/ / | | |___| |\ \ \_/ / |\ | |_| | ')
329 log.info(' \___/\_| \____/\_| \_/\___/\_| \_/\___/ ')
330 log.info(' ')
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500331
332
333@implementer(IComponent)
334class Main(object):
335
336 def __init__(self):
337
338 self.args = args = parse_args()
339 self.config = load_config(args)
340
Matteo Scandolod8d73172019-11-26 12:15:15 -0700341 # log levels in python are:
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000342 # 1 - DEBUG => verbosity_adjust = 10
343 # 2 - INFO => verbosity_adjust = 20
344 # 3 - WARNING => verbosity_adjust = 30
345 # 4 - ERROR => verbosity_adjust = 40
346 # 5 - CRITICAL => verbosity_adjust = 50
Matteo Scandolod8d73172019-11-26 12:15:15 -0700347
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000348 verbosity_adjust = string_to_int(str(args.log_level))
349
350 if verbosity_adjust == 0:
351 raise ValueError("Invalid loglevel is given: " + str(args.log_level))
352 sys.exit(1)
Rohan Agrawalac066a02020-03-09 12:33:58 +0000353
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500354 self.log = setup_logging(self.config.get('logging', {}),
355 args.instance_id,
356 verbosity_adjust=verbosity_adjust)
357 self.log.info('container-number-extractor',
358 regex=args.container_name_regex)
359
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500360 self.build_info = get_build_info()
361 self.log.info('OpenONU-Adapter-Version', build_version=self.build_info)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500362
363 if not args.no_banner:
364 print_banner(self.log)
365
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000366 self.etcd_host = str(args.etcd).split(':')[0]
367 self.etcd_port = str(args.etcd).split(':')[1]
368
369 self.controller = LogController(self.etcd_host, self.etcd_port)
370
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500371 self.adapter = None
372 # Create a unique instance id using the passed-in instance id and
373 # UTC timestamp
374 current_time = arrow.utcnow().timestamp
375 self.instance_id = self.args.instance_id + '_' + str(current_time)
376
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500377 self.core_topic = str(args.core_topic)
serkant.uluderya28e2c922020-05-30 00:30:12 -0700378 self.adapter_topic = str(args.adapter_topic)
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500379 self.event_topic = str(args.event_topic)
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700380 self.listening_topic = "%s_%s" % (args.name, args.current_replica)
381 self.id = "%s_%s" % (args.name, args.current_replica)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500382 self.startup_components()
383
384 if not args.no_heartbeat:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500385 self.start_kafka_cluster_heartbeat(self.instance_id)
386
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500387 def start(self):
388 self.start_reactor() # will not return except Keyboard interrupt
389
390 def stop(self):
391 pass
392
393 def get_args(self):
394 """Allow access to command line args"""
395 return self.args
396
397 def get_config(self):
398 """Allow access to content of config file"""
399 return self.config
400
401 def _get_adapter_config(self):
402 cfg = AdapterConfig()
403 return cfg
404
405 @inlineCallbacks
406 def startup_components(self):
407 try:
408 self.log.info('starting-internal-components',
409 consul=self.args.consul,
410 etcd=self.args.etcd)
411
412 registry.register('main', self)
413
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500414 yield registry.register(
415 'kafka_cluster_proxy',
416 KafkaProxy(
417 self.args.consul,
418 self.args.kafka_cluster,
419 config=self.config.get('kafka-cluster-proxy', {})
420 )
421 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000422 Probe.kafka_cluster_proxy_running = True
Neha Sharma61446d32020-03-23 14:32:31 +0000423 Probe.kafka_proxy_faulty = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500424
425 config = self._get_adapter_config()
426
427 self.core_proxy = CoreProxy(
428 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500429 default_core_topic=self.core_topic,
Devmalya Paulffc89df2019-07-31 17:43:13 -0400430 default_event_topic=self.event_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500431 my_listening_topic=self.listening_topic)
432
433 self.adapter_proxy = AdapterProxy(
434 kafka_proxy=None,
serkant.uluderya28e2c922020-05-30 00:30:12 -0700435 adapter_topic=self.adapter_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500436 my_listening_topic=self.listening_topic)
437
438 self.adapter = BrcmOpenomciOnuAdapter(
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700439 id=self.id,
440 core_proxy=self.core_proxy,
441 adapter_proxy=self.adapter_proxy,
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500442 config=config,
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700443 build_info=self.build_info,
444 current_replica=self.args.current_replica,
445 total_replicas=self.args.total_replicas,
446 endpoint=self.listening_topic
447 )
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500448
Matt Jeannereta32441c2019-03-07 05:16:37 -0500449 self.adapter.start()
450
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500451 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
452 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500453
454 yield registry.register(
455 'kafka_adapter_proxy',
456 IKafkaMessagingProxy(
457 kafka_host_port=self.args.kafka_adapter,
458 # TODO: Add KV Store object reference
459 kv_store=self.args.backend,
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700460 default_topic=self.listening_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500461 group_id_prefix=self.args.instance_id,
462 target_cls=openonu_request_handler
463 )
464 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000465 Probe.kafka_adapter_proxy_running = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500466
467 self.core_proxy.kafka_proxy = get_messaging_proxy()
468 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
469
470 # retry for ever
471 res = yield self._register_with_core(-1)
Neha Sharma61446d32020-03-23 14:32:31 +0000472 Probe.adapter_registered_with_core = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500473
474 self.log.info('started-internal-services')
475
476 except Exception as e:
477 self.log.exception('Failure-to-start-all-components', e=e)
478
479 @inlineCallbacks
480 def shutdown_components(self):
481 """Execute before the reactor is shut down"""
482 self.log.info('exiting-on-keyboard-interrupt')
483 for component in reversed(registry.iterate()):
484 yield component.stop()
485
Neha Sharma61446d32020-03-23 14:32:31 +0000486 self.server.shutdown()
487
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500488 import threading
489 self.log.info('THREADS:')
490 main_thread = threading.current_thread()
491 for t in threading.enumerate():
492 if t is main_thread:
493 continue
494 if not t.isDaemon():
495 continue
496 self.log.info('joining thread {} {}'.format(
497 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
498 t.join()
499
500 def start_reactor(self):
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000501 from twisted.internet import reactor, defer
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500502 reactor.callWhenRunning(
503 lambda: self.log.info('twisted-reactor-started'))
504 reactor.addSystemEventTrigger('before', 'shutdown',
505 self.shutdown_components)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000506 reactor.callInThread(self.start_probe)
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000507 defer.maybeDeferred(self.controller.start_watch_log_config_change, self.args.instance_id, str(self.args.log_level))
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500508 reactor.run()
509
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000510 def start_probe(self):
511 args = self.args
512 host = args.probe.split(':')[0]
513 port = args.probe.split(':')[1]
Neha Sharma61446d32020-03-23 14:32:31 +0000514 socketserver.TCPServer.allow_reuse_address = True
515 self.server = socketserver.TCPServer((host, int(port)), Probe)
516 self.server.serve_forever()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000517
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500518 @inlineCallbacks
519 def _register_with_core(self, retries):
520 while 1:
521 try:
522 resp = yield self.core_proxy.register(
523 self.adapter.adapter_descriptor(),
524 self.adapter.device_types())
525 if resp:
526 self.log.info('registered-with-core',
527 coreId=resp.instance_id)
528
529 returnValue(resp)
530 except TimeOutError as e:
531 self.log.warn("timeout-when-registering-with-core", e=e)
532 if retries == 0:
533 self.log.exception("no-more-retries", e=e)
534 raise
535 else:
536 retries = retries if retries < 0 else retries - 1
537 yield asleep(defs['retry_interval'])
538 except Exception as e:
539 self.log.exception("failed-registration", e=e)
540 raise
541
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500542 # Temporary function to send a heartbeat message to the external kafka
543 # broker
544 def start_kafka_cluster_heartbeat(self, instance_id):
545 # For heartbeat we will send a message to a specific "voltha-heartbeat"
546 # topic. The message is a protocol buf
547 # message
548 message = dict(
549 type='heartbeat',
550 adapter=self.args.name,
551 instance=instance_id,
552 ip=get_my_primary_local_ipv4()
553 )
554 topic = defs['heartbeat_topic']
555
Neha Sharma61446d32020-03-23 14:32:31 +0000556 def send_heartbeat_msg():
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500557 try:
558 kafka_cluster_proxy = get_kafka_proxy()
Neha Sharma61446d32020-03-23 14:32:31 +0000559 if kafka_cluster_proxy:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500560 message['ts'] = arrow.utcnow().timestamp
Neha Sharma61446d32020-03-23 14:32:31 +0000561 self.log.debug('sending-kafka-heartbeat-message')
562
563 # Creating a handler to receive the message callbacks
564 df = Deferred()
565 df.addCallback(self.process_kafka_alive_state_update)
566 kafka_cluster_proxy.register_alive_state_update(df)
567 kafka_cluster_proxy.send_heartbeat_message(topic, dumps(message))
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500568 else:
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000569 Probe.kafka_cluster_proxy_running = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500570 self.log.error('kafka-proxy-unavailable')
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500571 except Exception as e:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500572 self.log.exception('failed-sending-message-heartbeat', e=e)
573
Neha Sharma61446d32020-03-23 14:32:31 +0000574 def check_heartbeat_delivery():
575 try:
576 kafka_cluster_proxy = get_kafka_proxy()
577 if kafka_cluster_proxy:
578 kafka_cluster_proxy.check_heartbeat_delivery()
579 except Exception as e:
580 self.log.exception('failed-checking-heartbeat-delivery', e=e)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500581
Neha Sharma61446d32020-03-23 14:32:31 +0000582 def schedule_periodic_heartbeat():
583 try:
584 # Sending the heartbeat message in a loop
585 lc_heartbeat = LoopingCall(send_heartbeat_msg)
586 lc_heartbeat.start(10)
587 # Polling the delivery status more frequently to get early notification
588 lc_poll = LoopingCall(check_heartbeat_delivery)
589 lc_poll.start(2)
590 except Exception as e:
591 self.log.exception('failed-kafka-heartbeat-startup', e=e)
592
593 from twisted.internet import reactor
594 # Delaying heartbeat initially to let kafka connection be established
595 reactor.callLater(5, schedule_periodic_heartbeat)
596
597 # Receiving the callback and updating the probe accordingly
598 def process_kafka_alive_state_update(self, alive_state):
599 self.log.debug('process-kafka-alive-state-update', alive_state=alive_state)
600 Probe.kafka_cluster_proxy_running = alive_state
601
602 kafka_cluster_proxy = get_kafka_proxy()
603 if kafka_cluster_proxy:
604 Probe.kafka_proxy_faulty = kafka_cluster_proxy.is_faulty()
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500605
606if __name__ == '__main__':
607 Main().start()