blob: eeb253908a0c5156e985bfacec0dab1c40ac8cf0 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""OpenONU Adapter main entry point"""
19
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050020from __future__ import absolute_import
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050021import argparse
22import os
23import time
Matt Jeanneret08a8e862019-12-20 14:02:32 -050024import types
Rohan Agrawalac066a02020-03-09 12:33:58 +000025import sys
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050026
27import arrow
28import yaml
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050029import socketserver
Matt Jeanneret08a8e862019-12-20 14:02:32 -050030import configparser
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000031
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050032from simplejson import dumps
Neha Sharma61446d32020-03-23 14:32:31 +000033from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050034from twisted.internet.task import LoopingCall
35from zope.interface import implementer
36
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000037from pyvoltha.common.structlog_setup import setup_logging, update_logging, string_to_int
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050038from pyvoltha.common.utils.asleep import asleep
39from pyvoltha.common.utils.deferred_utils import TimeOutError
40from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
41from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050042 get_my_primary_interface
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050043from pyvoltha.common.utils.registry import registry, IComponent
44from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
45from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
46from pyvoltha.adapters.kafka.core_proxy import CoreProxy
47from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050048 get_messaging_proxy
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050049from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkian8235c1e2019-03-05 12:58:28 -050050from voltha_protos.adapter_pb2 import AdapterConfig
Matt Jeanneret72f96fc2019-02-11 10:53:05 -050051
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -050052from brcm_openomci_onu_adapter import BrcmOpenomciOnuAdapter
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000053from probe import Probe
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000054from pyvoltha.adapters.log_controller import LogController, KV_STORE_DATA_PATH_PREFIX
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +000055
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050056defs = dict(
Matt Jeanneret08a8e862019-12-20 14:02:32 -050057 build_info_file='./BUILDINFO',
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050058 config=os.environ.get('CONFIG', './openonu.yml'),
59 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
60 '0-9]+)\..*$'),
61 consul=os.environ.get('CONSUL', 'localhost:8500'),
Matteo Scandolobb3317d2020-03-31 10:49:42 -070062 name=os.environ.get('NAME', 'brcm_openomci_onu'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050063 vendor=os.environ.get('VENDOR', 'Voltha Project'),
64 device_type=os.environ.get('DEVICE_TYPE', 'openonu'),
65 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
66 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
Mahir Gunyel45610b42020-03-16 17:29:01 -070067 accept_incremental_evto_update=os.environ.get('ACCEPT_INCREMENTAL_EVTO_UPDATE', False),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050068 etcd=os.environ.get('ETCD', 'localhost:2379'),
69 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
serkant.uluderya28e2c922020-05-30 00:30:12 -070070 adapter_topic=os.environ.get('ADAPTER_TOPIC', 'openolt'),
Devmalya Paulffc89df2019-07-31 17:43:13 -040071 event_topic=os.environ.get('EVENT_TOPIC', 'voltha.events'),
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050072 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
73 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
74 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
75 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
76 backend=os.environ.get('BACKEND', 'none'),
77 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
78 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
Rohan Agrawalac066a02020-03-09 12:33:58 +000079 probe=os.environ.get('PROBE', ':8080'),
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000080 log_level=os.environ.get('LOG_LEVEL', 'WARN'),
Matteo Scandolobb3317d2020-03-31 10:49:42 -070081 current_replica=1,
82 total_replicas=1,
Rohan Agrawaldfd5e802020-03-25 20:39:47 +000083 component_name=os.environ.get('COMPONENT_NAME', "adapter-open-onu")
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -050084)
85
86
87def parse_args():
88 parser = argparse.ArgumentParser()
89
90 _help = ('Path to openonu.yml config file (default: %s). '
91 'If relative, it is relative to main.py of openonu adapter.'
92 % defs['config'])
93 parser.add_argument('-c', '--config',
94 dest='config',
95 action='store',
96 default=defs['config'],
97 help=_help)
98
99 _help = 'Regular expression for extracting conatiner number from ' \
100 'container name (default: %s)' % defs['container_name_regex']
101 parser.add_argument('-X', '--container-number-extractor',
102 dest='container_name_regex',
103 action='store',
104 default=defs['container_name_regex'],
105 help=_help)
106
107 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
108 parser.add_argument('-C', '--consul',
109 dest='consul',
110 action='store',
111 default=defs['consul'],
112 help=_help)
113
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700114 # NOTE this is really the adapter type
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500115 _help = 'name of this adapter (default: %s)' % defs['name']
116 parser.add_argument('-na', '--name',
117 dest='name',
118 action='store',
119 default=defs['name'],
120 help=_help)
121
122 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
123 parser.add_argument('-ven', '--vendor',
124 dest='vendor',
125 action='store',
126 default=defs['vendor'],
127 help=_help)
128
129 _help = 'supported device type of this adapter (default: %s)' % defs[
130 'device_type']
131 parser.add_argument('-dt', '--device_type',
132 dest='device_type',
133 action='store',
134 default=defs['device_type'],
135 help=_help)
136
137 _help = 'specifies whether the device type accepts bulk flow updates ' \
138 'adapter (default: %s)' % defs['accept_bulk_flow']
139 parser.add_argument('-abf', '--accept_bulk_flow',
140 dest='accept_bulk_flow',
141 action='store',
142 default=defs['accept_bulk_flow'],
143 help=_help)
144
145 _help = 'specifies whether the device type accepts add/remove flow ' \
146 '(default: %s)' % defs['accept_atomic_flow']
147 parser.add_argument('-aaf', '--accept_atomic_flow',
148 dest='accept_atomic_flow',
149 action='store',
150 default=defs['accept_atomic_flow'],
151 help=_help)
152
Mahir Gunyel45610b42020-03-16 17:29:01 -0700153 _help = 'specifies whether the adapter accepts incremental EVTO updates ' \
154 '(default: %s)' % defs['accept_incremental_evto_update']
155 parser.add_argument('-aie', '--accept_incremental_evto_update',
Girish Gowdra68613fc2020-06-23 20:14:50 -0700156 type=str2bool,
Mahir Gunyel45610b42020-03-16 17:29:01 -0700157 dest='accept_incremental_evto_update',
158 action='store',
159 default=defs['accept_incremental_evto_update'],
160 help=_help)
161
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500162 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
163 parser.add_argument('-e', '--etcd',
164 dest='etcd',
165 action='store',
166 default=defs['etcd'],
167 help=_help)
168
169 _help = ('unique string id of this container instance (default: %s)'
170 % defs['instance_id'])
171 parser.add_argument('-i', '--instance-id',
172 dest='instance_id',
173 action='store',
174 default=defs['instance_id'],
175 help=_help)
176
177 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
178 parser.add_argument('-I', '--interface',
179 dest='interface',
180 action='store',
181 default=defs['interface'],
182 help=_help)
183
184 _help = 'omit startup banner log lines'
185 parser.add_argument('-n', '--no-banner',
186 dest='no_banner',
187 action='store_true',
188 default=False,
189 help=_help)
190
191 _help = 'do not emit periodic heartbeat log messages'
192 parser.add_argument('-N', '--no-heartbeat',
193 dest='no_heartbeat',
194 action='store_true',
195 default=False,
196 help=_help)
197
Rohan Agrawalac066a02020-03-09 12:33:58 +0000198 _help = 'enable logging'
199 parser.add_argument('-l', '--log_level',
200 dest='log_level',
201 action='store',
202 default=defs['log_level'],
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500203 help=_help)
204
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000205 _help = 'get the component name'
206 parser.add_argument('-cn', '--component_name',
207 dest='env',
208 action='store',
209 help=_help)
210
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500211 _help = ('use docker container name as conatiner instance id'
212 ' (overrides -i/--instance-id option)')
213 parser.add_argument('--instance-id-is-container-name',
214 dest='instance_id_is_container_name',
215 action='store_true',
216 default=False,
217 help=_help)
218
219 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
220 'If not '
221 'specified (None), the address from the config file is used'
222 % defs['kafka_adapter'])
223 parser.add_argument('-KA', '--kafka_adapter',
224 dest='kafka_adapter',
225 action='store',
226 default=defs['kafka_adapter'],
227 help=_help)
228
229 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
230 'If not '
231 'specified (None), the address from the config file is used'
232 % defs['kafka_cluster'])
233 parser.add_argument('-KC', '--kafka_cluster',
234 dest='kafka_cluster',
235 action='store',
236 default=defs['kafka_cluster'],
237 help=_help)
238
239 _help = 'backend to use for config persitence'
240 parser.add_argument('-b', '--backend',
241 default=defs['backend'],
242 choices=['none', 'consul', 'etcd'],
243 help=_help)
244
245 _help = 'topic of core on the kafka bus'
246 parser.add_argument('-ct', '--core_topic',
247 dest='core_topic',
248 action='store',
249 default=defs['core_topic'],
250 help=_help)
Girish Gowdra68613fc2020-06-23 20:14:50 -0700251
serkant.uluderya28e2c922020-05-30 00:30:12 -0700252 _help = 'topic of openolt adapter on the kafka bus'
253 parser.add_argument('-at', '--adapter_topic',
254 dest='adapter_topic',
255 action='store',
256 default=defs['adapter_topic'],
257 help=_help)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500258
Devmalya Paulffc89df2019-07-31 17:43:13 -0400259 _help = 'topic of events on the kafka bus'
260 parser.add_argument('-et', '--event_topic',
261 dest='event_topic',
262 action='store',
263 default=defs['event_topic'],
264 help=_help)
265
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000266 _help = '<hostname>:<port> for liveness and readiness probes (default: %s)' % defs['probe']
267 parser.add_argument(
268 '-P', '--probe', dest='probe', action='store',
269 default=defs['probe'],
270 help=_help)
271
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700272 _help = 'Replica number of this particular instance (default: %s)' % defs['current_replica']
273 parser.add_argument(
274 '--currentReplica', dest='current_replica', action='store',
275 default=defs['current_replica'],
276 type=int,
277 help=_help)
278
279 _help = 'Total number of instances for this adapter (default: %s)' % defs['total_replicas']
280 parser.add_argument(
281 '--totalReplicas', dest='total_replicas', action='store',
282 default=defs['total_replicas'],
283 type=int,
284 help=_help)
285
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500286 args = parser.parse_args()
287
288 # post-processing
289
290 if args.instance_id_is_container_name:
291 args.instance_id = get_my_containers_name()
292
293 return args
294
295
296def load_config(args):
297 path = args.config
298 if path.startswith('.'):
299 dir = os.path.dirname(os.path.abspath(__file__))
300 path = os.path.join(dir, path)
301 path = os.path.abspath(path)
302 with open(path) as fd:
303 config = yaml.load(fd)
304 return config
305
306
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500307def get_build_info():
308 path = defs['build_info_file']
309 if not path.startswith('/'):
310 dir = os.path.dirname(os.path.abspath(__file__))
311 path = os.path.join(dir, path)
312 path = os.path.abspath(path)
313 build_info = configparser.ConfigParser()
314 build_info.read(path)
315 results = types.SimpleNamespace(
316 version=build_info.get('buildinfo', 'version', fallback='unknown'),
317 vcs_ref=build_info.get('buildinfo', 'vcs_ref', fallback='unknown'),
318 vcs_dirty=build_info.get('buildinfo', 'vcs_dirty', fallback='unknown'),
319 build_time=build_info.get('buildinfo', 'build_time', fallback='unknown')
320 )
321 return results
322
323
Girish Gowdra68613fc2020-06-23 20:14:50 -0700324def str2bool(v):
325 if isinstance(v, bool):
326 return v
327 if v.lower() in ('yes', 'true', 't', 'y', '1'):
328 return True
329 elif v.lower() in ('no', 'false', 'f', 'n', '0'):
330 return False
331 else:
332 raise argparse.ArgumentTypeError('bool-expected')
333
334
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500335def print_banner(log):
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500336 log.info(' ___________ _____ _ _ _____ _ _ _ _ ')
337 log.info(' | _ | ___ \ ___| \ | | _ | \ | | | | | ')
338 log.info(' | | | | |_/ / |__ | \| | | | | \| | | | | ')
339 log.info(' | | | | __/| __|| . ` | | | | . ` | | | | ')
340 log.info(' \ \_/ / | | |___| |\ \ \_/ / |\ | |_| | ')
341 log.info(' \___/\_| \____/\_| \_/\___/\_| \_/\___/ ')
342 log.info(' ')
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500343
344
345@implementer(IComponent)
346class Main(object):
347
348 def __init__(self):
349
350 self.args = args = parse_args()
351 self.config = load_config(args)
352
Matteo Scandolod8d73172019-11-26 12:15:15 -0700353 # log levels in python are:
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000354 # 1 - DEBUG => verbosity_adjust = 10
355 # 2 - INFO => verbosity_adjust = 20
356 # 3 - WARNING => verbosity_adjust = 30
357 # 4 - ERROR => verbosity_adjust = 40
358 # 5 - CRITICAL => verbosity_adjust = 50
Matteo Scandolod8d73172019-11-26 12:15:15 -0700359
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000360 verbosity_adjust = string_to_int(str(args.log_level))
361
362 if verbosity_adjust == 0:
363 raise ValueError("Invalid loglevel is given: " + str(args.log_level))
364 sys.exit(1)
Rohan Agrawalac066a02020-03-09 12:33:58 +0000365
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500366 self.log = setup_logging(self.config.get('logging', {}),
367 args.instance_id,
368 verbosity_adjust=verbosity_adjust)
369 self.log.info('container-number-extractor',
370 regex=args.container_name_regex)
371
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500372 self.build_info = get_build_info()
373 self.log.info('OpenONU-Adapter-Version', build_version=self.build_info)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500374
375 if not args.no_banner:
376 print_banner(self.log)
377
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000378 self.etcd_host = str(args.etcd).split(':')[0]
379 self.etcd_port = str(args.etcd).split(':')[1]
380
381 self.controller = LogController(self.etcd_host, self.etcd_port)
382
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500383 self.adapter = None
384 # Create a unique instance id using the passed-in instance id and
385 # UTC timestamp
386 current_time = arrow.utcnow().timestamp
387 self.instance_id = self.args.instance_id + '_' + str(current_time)
388
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500389 self.core_topic = str(args.core_topic)
serkant.uluderya28e2c922020-05-30 00:30:12 -0700390 self.adapter_topic = str(args.adapter_topic)
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500391 self.event_topic = str(args.event_topic)
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700392 self.listening_topic = "%s_%s" % (args.name, args.current_replica)
393 self.id = "%s_%s" % (args.name, args.current_replica)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500394 self.startup_components()
395
396 if not args.no_heartbeat:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500397 self.start_kafka_cluster_heartbeat(self.instance_id)
398
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500399 def start(self):
400 self.start_reactor() # will not return except Keyboard interrupt
401
402 def stop(self):
403 pass
404
405 def get_args(self):
406 """Allow access to command line args"""
407 return self.args
408
409 def get_config(self):
410 """Allow access to content of config file"""
411 return self.config
412
413 def _get_adapter_config(self):
414 cfg = AdapterConfig()
415 return cfg
416
417 @inlineCallbacks
418 def startup_components(self):
419 try:
420 self.log.info('starting-internal-components',
421 consul=self.args.consul,
422 etcd=self.args.etcd)
423
424 registry.register('main', self)
425
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500426 yield registry.register(
427 'kafka_cluster_proxy',
428 KafkaProxy(
429 self.args.consul,
430 self.args.kafka_cluster,
431 config=self.config.get('kafka-cluster-proxy', {})
432 )
433 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000434 Probe.kafka_cluster_proxy_running = True
Neha Sharma61446d32020-03-23 14:32:31 +0000435 Probe.kafka_proxy_faulty = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500436
437 config = self._get_adapter_config()
438
439 self.core_proxy = CoreProxy(
440 kafka_proxy=None,
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500441 default_core_topic=self.core_topic,
Devmalya Paulffc89df2019-07-31 17:43:13 -0400442 default_event_topic=self.event_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500443 my_listening_topic=self.listening_topic)
444
445 self.adapter_proxy = AdapterProxy(
446 kafka_proxy=None,
serkant.uluderya28e2c922020-05-30 00:30:12 -0700447 adapter_topic=self.adapter_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500448 my_listening_topic=self.listening_topic)
449
450 self.adapter = BrcmOpenomciOnuAdapter(
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700451 id=self.id,
452 core_proxy=self.core_proxy,
453 adapter_proxy=self.adapter_proxy,
Matt Jeanneret08a8e862019-12-20 14:02:32 -0500454 config=config,
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700455 build_info=self.build_info,
456 current_replica=self.args.current_replica,
457 total_replicas=self.args.total_replicas,
458 endpoint=self.listening_topic
459 )
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500460
Matt Jeannereta32441c2019-03-07 05:16:37 -0500461 self.adapter.start()
462
Matt Jeanneretc288ee42019-02-28 13:31:59 -0500463 openonu_request_handler = AdapterRequestFacade(adapter=self.adapter,
464 core_proxy=self.core_proxy)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500465
466 yield registry.register(
467 'kafka_adapter_proxy',
468 IKafkaMessagingProxy(
469 kafka_host_port=self.args.kafka_adapter,
470 # TODO: Add KV Store object reference
471 kv_store=self.args.backend,
Matteo Scandolobb3317d2020-03-31 10:49:42 -0700472 default_topic=self.listening_topic,
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500473 group_id_prefix=self.args.instance_id,
474 target_cls=openonu_request_handler
475 )
476 ).start()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000477 Probe.kafka_adapter_proxy_running = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500478
479 self.core_proxy.kafka_proxy = get_messaging_proxy()
480 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
481
482 # retry for ever
483 res = yield self._register_with_core(-1)
Neha Sharma61446d32020-03-23 14:32:31 +0000484 Probe.adapter_registered_with_core = True
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500485
486 self.log.info('started-internal-services')
487
488 except Exception as e:
489 self.log.exception('Failure-to-start-all-components', e=e)
490
491 @inlineCallbacks
492 def shutdown_components(self):
493 """Execute before the reactor is shut down"""
494 self.log.info('exiting-on-keyboard-interrupt')
495 for component in reversed(registry.iterate()):
496 yield component.stop()
497
Neha Sharma61446d32020-03-23 14:32:31 +0000498 self.server.shutdown()
499
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500500 import threading
501 self.log.info('THREADS:')
502 main_thread = threading.current_thread()
503 for t in threading.enumerate():
504 if t is main_thread:
505 continue
506 if not t.isDaemon():
507 continue
508 self.log.info('joining thread {} {}'.format(
509 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
510 t.join()
511
512 def start_reactor(self):
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000513 from twisted.internet import reactor, defer
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500514 reactor.callWhenRunning(
515 lambda: self.log.info('twisted-reactor-started'))
516 reactor.addSystemEventTrigger('before', 'shutdown',
517 self.shutdown_components)
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000518 reactor.callInThread(self.start_probe)
Rohan Agrawaldfd5e802020-03-25 20:39:47 +0000519 defer.maybeDeferred(self.controller.start_watch_log_config_change, self.args.instance_id, str(self.args.log_level))
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500520 reactor.run()
521
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000522 def start_probe(self):
523 args = self.args
524 host = args.probe.split(':')[0]
525 port = args.probe.split(':')[1]
Neha Sharma61446d32020-03-23 14:32:31 +0000526 socketserver.TCPServer.allow_reuse_address = True
527 self.server = socketserver.TCPServer((host, int(port)), Probe)
528 self.server.serve_forever()
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000529
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500530 @inlineCallbacks
531 def _register_with_core(self, retries):
532 while 1:
533 try:
534 resp = yield self.core_proxy.register(
535 self.adapter.adapter_descriptor(),
536 self.adapter.device_types())
537 if resp:
538 self.log.info('registered-with-core',
539 coreId=resp.instance_id)
540
541 returnValue(resp)
542 except TimeOutError as e:
543 self.log.warn("timeout-when-registering-with-core", e=e)
544 if retries == 0:
545 self.log.exception("no-more-retries", e=e)
546 raise
547 else:
548 retries = retries if retries < 0 else retries - 1
549 yield asleep(defs['retry_interval'])
550 except Exception as e:
551 self.log.exception("failed-registration", e=e)
552 raise
553
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500554 # Temporary function to send a heartbeat message to the external kafka
555 # broker
556 def start_kafka_cluster_heartbeat(self, instance_id):
557 # For heartbeat we will send a message to a specific "voltha-heartbeat"
558 # topic. The message is a protocol buf
559 # message
560 message = dict(
561 type='heartbeat',
562 adapter=self.args.name,
563 instance=instance_id,
564 ip=get_my_primary_local_ipv4()
565 )
566 topic = defs['heartbeat_topic']
567
Neha Sharma61446d32020-03-23 14:32:31 +0000568 def send_heartbeat_msg():
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500569 try:
570 kafka_cluster_proxy = get_kafka_proxy()
Neha Sharma61446d32020-03-23 14:32:31 +0000571 if kafka_cluster_proxy:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500572 message['ts'] = arrow.utcnow().timestamp
Neha Sharma61446d32020-03-23 14:32:31 +0000573 self.log.debug('sending-kafka-heartbeat-message')
574
575 # Creating a handler to receive the message callbacks
576 df = Deferred()
577 df.addCallback(self.process_kafka_alive_state_update)
578 kafka_cluster_proxy.register_alive_state_update(df)
579 kafka_cluster_proxy.send_heartbeat_message(topic, dumps(message))
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500580 else:
Rohan Agrawalc5bdbbc2019-11-14 12:39:39 +0000581 Probe.kafka_cluster_proxy_running = False
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500582 self.log.error('kafka-proxy-unavailable')
Matt Jeanneret2e3cb8d2019-11-16 09:22:41 -0500583 except Exception as e:
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500584 self.log.exception('failed-sending-message-heartbeat', e=e)
585
Neha Sharma61446d32020-03-23 14:32:31 +0000586 def check_heartbeat_delivery():
587 try:
588 kafka_cluster_proxy = get_kafka_proxy()
589 if kafka_cluster_proxy:
590 kafka_cluster_proxy.check_heartbeat_delivery()
591 except Exception as e:
592 self.log.exception('failed-checking-heartbeat-delivery', e=e)
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500593
Neha Sharma61446d32020-03-23 14:32:31 +0000594 def schedule_periodic_heartbeat():
595 try:
596 # Sending the heartbeat message in a loop
597 lc_heartbeat = LoopingCall(send_heartbeat_msg)
598 lc_heartbeat.start(10)
599 # Polling the delivery status more frequently to get early notification
600 lc_poll = LoopingCall(check_heartbeat_delivery)
601 lc_poll.start(2)
602 except Exception as e:
603 self.log.exception('failed-kafka-heartbeat-startup', e=e)
604
605 from twisted.internet import reactor
606 # Delaying heartbeat initially to let kafka connection be established
607 reactor.callLater(5, schedule_periodic_heartbeat)
608
609 # Receiving the callback and updating the probe accordingly
610 def process_kafka_alive_state_update(self, alive_state):
611 self.log.debug('process-kafka-alive-state-update', alive_state=alive_state)
612 Probe.kafka_cluster_proxy_running = alive_state
613
614 kafka_cluster_proxy = get_kafka_proxy()
615 if kafka_cluster_proxy:
616 Probe.kafka_proxy_faulty = kafka_cluster_proxy.is_faulty()
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500617
Girish Gowdra68613fc2020-06-23 20:14:50 -0700618
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -0500619if __name__ == '__main__':
620 Main().start()