blob: 99b308a5029565a23830dcd4539fea6425af519c [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import logging
7import sys
8
9import grpc
10import psutil
11from common.service_registry import ServiceRegistry
12
13
14class GRPCClientManager:
15 def __init__(
16 self, service_name: str, service_stub,
17 max_client_reuse: int = 60,
18 ):
19 self._client = None
20 self._service_stub = service_stub
21 self._service_name = service_name
22 self._num_client_use = 0
23 self._max_client_reuse = max_client_reuse
24
25 def get_client(self):
26 """
27 get_client returns a grpc client of the specified service in the cloud.
28 it will return a recycled client until the client fails or the number
29 of recycling reaches the max_client_use.
30 """
31 if self._client is None or \
32 self._num_client_use > self._max_client_reuse:
33 chan = ServiceRegistry.get_rpc_channel(
34 self._service_name,
35 ServiceRegistry.CLOUD,
36 )
37 self._client = self._service_stub(chan)
38 self._num_client_use = 0
39
40 self._num_client_use += 1
41 return self._client
42
43 def on_grpc_fail(self, err_code):
44 """
45 Try to reuse the grpc client if possible. We are yet to fix a
46 grpc behavior, where if DNS request blackholes then the DNS request
47 is retried infinitely even after the channel is deleted. To prevent
48 running out of fds, we try to reuse the channel during such failures
49 as much as possible.
50 """
51 if err_code != grpc.StatusCode.DEADLINE_EXCEEDED:
52 # Not related to the DNS issue
53 self._reset_client()
54 if self._num_client_use >= self._max_client_reuse:
55 logging.info('Max client reuse reached. Cleaning up client')
56 self._reset_client()
57
58 # Sanity check if we are not leaking fds
59 proc = psutil.Process()
60 max_fds, _ = proc.rlimit(psutil.RLIMIT_NOFILE)
61 open_fds = proc.num_fds()
62 logging.info('Num open fds: %d', open_fds)
63 if open_fds >= (max_fds * 0.8):
64 logging.error("Reached 80% of allowed fds. Restarting process")
65 sys.exit(1)
66
67 def _reset_client(self):
68 self._client = None
69 self._num_client_use = 0