blob: dbaff760d97219c7ac4fad852b30ae3a6bf2ef0a [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import logging
15import sys
16
17import grpc
18import psutil
19from common.service_registry import ServiceRegistry
20
21
22class GRPCClientManager:
23 def __init__(
24 self, service_name: str, service_stub,
25 max_client_reuse: int = 60,
26 ):
27 self._client = None
28 self._service_stub = service_stub
29 self._service_name = service_name
30 self._num_client_use = 0
31 self._max_client_reuse = max_client_reuse
32
33 def get_client(self):
34 """
35 get_client returns a grpc client of the specified service in the cloud.
36 it will return a recycled client until the client fails or the number
37 of recycling reaches the max_client_use.
38 """
39 if self._client is None or \
40 self._num_client_use > self._max_client_reuse:
41 chan = ServiceRegistry.get_rpc_channel(
42 self._service_name,
43 ServiceRegistry.CLOUD,
44 )
45 self._client = self._service_stub(chan)
46 self._num_client_use = 0
47
48 self._num_client_use += 1
49 return self._client
50
51 def on_grpc_fail(self, err_code):
52 """
53 Try to reuse the grpc client if possible. We are yet to fix a
54 grpc behavior, where if DNS request blackholes then the DNS request
55 is retried infinitely even after the channel is deleted. To prevent
56 running out of fds, we try to reuse the channel during such failures
57 as much as possible.
58 """
59 if err_code != grpc.StatusCode.DEADLINE_EXCEEDED:
60 # Not related to the DNS issue
61 self._reset_client()
62 if self._num_client_use >= self._max_client_reuse:
63 logging.info('Max client reuse reached. Cleaning up client')
64 self._reset_client()
65
66 # Sanity check if we are not leaking fds
67 proc = psutil.Process()
68 max_fds, _ = proc.rlimit(psutil.RLIMIT_NOFILE)
69 open_fds = proc.num_fds()
70 logging.info('Num open fds: %d', open_fds)
71 if open_fds >= (max_fds * 0.8):
72 logging.error("Reached 80% of allowed fds. Restarting process")
73 sys.exit(1)
74
75 def _reset_client(self):
76 self._client = None
77 self._num_client_use = 0