blob: 2b2d1e0d8290f30e3641dc22e73bdbe1533c31d1 [file] [log] [blame]
Scott Bakerbef5fd92019-02-21 10:24:02 -08001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
Zack Williams5c2ea232019-01-30 15:23:01 -070023from __future__ import absolute_import
Scott Bakerbef5fd92019-02-21 10:24:02 -080024import os
25import sys
26import time
27from random import randint
28from zlib import decompress
29
30import functools
31import grpc
32from consul import Consul
33from grpc._channel import _Rendezvous
34from structlog import get_logger
35from twisted.internet import reactor
36from twisted.internet.defer import inlineCallbacks, returnValue
Zack Williams5c2ea232019-01-30 15:23:01 -070037from twisted.internet.error import ConnectError
Scott Bakerbef5fd92019-02-21 10:24:02 -080038
Zack Williams5c2ea232019-01-30 15:23:01 -070039from .protos.schema_pb2_grpc import SchemaServiceStub
Scott Bakerbef5fd92019-02-21 10:24:02 -080040from google.protobuf.empty_pb2 import Empty
41
Zack Williams5c2ea232019-01-30 15:23:01 -070042from .asleep import asleep
Scott Bakerbef5fd92019-02-21 10:24:02 -080043
44log = get_logger()
45
46
47class GrpcClient(object):
48 """
49 Connect to a gRPC server, fetch its schema, and process the downloaded
50 schema files to drive the customization of the north-bound interface(s)
51 of Chameleon.
52 """
53 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
54
55 def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
56 reconnect_callback=None, credentials=None, restart_on_disconnect=False):
57 self.consul_endpoint = consul_endpoint
58 self.endpoint = endpoint
59 self.work_dir = work_dir
60 self.reconnect_callback = reconnect_callback
61 self.credentials = credentials
62 self.restart_on_disconnect = restart_on_disconnect
Daniele Moroefa52b72020-02-12 22:10:18 -080063 self.google_api_dir = os.path.abspath(os.path.join(
64 os.path.dirname(__file__), 'protos'))
Scott Bakerbef5fd92019-02-21 10:24:02 -080065 self.plugin_dir = os.path.abspath(os.path.join(
66 os.path.dirname(__file__), 'protoc_plugins'))
67
68 self.channel = None
69 self.schema = None
70 self.retries = 0
71 self.shutting_down = False
72 self.connected = False
73 self.was_connected = False
74
75 def start(self):
76 log.debug('starting')
77 if not self.connected:
78 reactor.callLater(0, self.connect)
79 log.info('started')
80 return self
81
82 def stop(self):
83 log.debug('stopping')
84 if self.shutting_down:
85 return
86 self.shutting_down = True
87 log.info('stopped')
88
89 def set_reconnect_callback(self, reconnect_callback):
90 self.reconnect_callback = reconnect_callback
91 return self
92
93 def connectivity_callback(self, client, connectivity):
94 if (self.was_connected) and (connectivity in [connectivity.TRANSIENT_FAILURE, connectivity.SHUTDOWN]):
95 log.info("connectivity lost -- restarting")
96 os.execv(sys.executable, ['python'] + sys.argv)
97
98 if (connectivity == connectivity.READY):
99 self.was_connected = True
100
101 # Sometimes gRPC transitions from READY to IDLE, skipping TRANSIENT_FAILURE even though a socket is
102 # disconnected. So on idle, force a connectivity check.
103 if (connectivity == connectivity.IDLE) and (self.was_connected):
104 connectivity = client.channel._channel.check_connectivity_state(True)
105 # The result will probably show IDLE, but passing in True has the side effect of reconnecting if the
106 # connection has been lost, which will trigger the TRANSIENT_FALURE we were looking for.
107
108 @inlineCallbacks
109 def connect(self):
110 """
111 (Re-)Connect to end-point
112 """
113
114 if self.shutting_down or self.connected:
115 return
116
117 try:
118 if self.endpoint.startswith('@'):
119 _endpoint = yield self._get_endpoint_from_consul(
120 self.endpoint[1:])
121 else:
122 _endpoint = self.endpoint
123
124 if self.credentials:
125 log.info('securely connecting', endpoint=_endpoint)
126 self.channel = grpc.secure_channel(_endpoint, self.credentials)
127 else:
128 log.info('insecurely connecting', endpoint=_endpoint)
129 self.channel = grpc.insecure_channel(_endpoint)
130
131 if self.restart_on_disconnect:
132 connectivity_callback = functools.partial(self.connectivity_callback, self)
133 self.channel.subscribe(connectivity_callback)
134
135 # Delay between initiating connection and executing first gRPC. See CORD-3012.
136 time.sleep(0.5)
137
138 swagger_from = self._retrieve_schema()
139 self._compile_proto_files(swagger_from)
140 self._clear_backoff()
141
142 self.connected = True
143 if self.reconnect_callback is not None:
144 reactor.callLater(0, self.reconnect_callback)
145
146 return
147
Zack Williams5c2ea232019-01-30 15:23:01 -0700148 except _Rendezvous as e:
Scott Bakerbef5fd92019-02-21 10:24:02 -0800149 if e.code() == grpc.StatusCode.UNAVAILABLE:
150 log.info('grpc-endpoint-not-available')
151 else:
152 log.exception('rendezvous error', e=e)
153 yield self._backoff('not-available')
154
Zack Williams5c2ea232019-01-30 15:23:01 -0700155 except Exception:
Scott Bakerbef5fd92019-02-21 10:24:02 -0800156 if not self.shutting_down:
157 log.exception('cannot-connect', endpoint=_endpoint)
158 yield self._backoff('unknown-error')
159
160 reactor.callLater(0, self.connect)
161
162 def _backoff(self, msg):
163 wait_time = self.RETRY_BACKOFF[min(self.retries,
164 len(self.RETRY_BACKOFF) - 1)]
165 self.retries += 1
166 log.error(msg, retry_in=wait_time)
167 return asleep(wait_time)
168
169 def _clear_backoff(self):
170 if self.retries:
171 log.info('reconnected', after_retries=self.retries)
172 self.retries = 0
173
174 @inlineCallbacks
175 def _get_endpoint_from_consul(self, service_name):
176 """
177 Look up an appropriate grpc endpoint (host, port) from
178 consul, under the service name specified by service-name
179 """
180 host = self.consul_endpoint.split(':')[0].strip()
181 port = int(self.consul_endpoint.split(':')[1].strip())
182
183 while True:
184 log.debug('consul-lookup', host=host, port=port)
185 consul = Consul(host=host, port=port)
186 _, services = consul.catalog.service(service_name)
187 log.debug('consul-response', services=services)
188 if services:
189 break
190 log.warning('no-service', consul_host=host, consul_port=port,
191 service_name=service_name)
192 yield asleep(1.0)
193
194 # pick local addresses when resolving a service via consul
195 # see CORD-815 (https://jira.opencord.org/browse/CORD-815)
196
197 service = services[randint(0, len(services) - 1)]
198 endpoint = '{}:{}'.format(service['ServiceAddress'],
199 service['ServicePort'])
200 returnValue(endpoint)
201
202 def _retrieve_schema(self):
203 """
204 Retrieve schema from gRPC end-point, and save all *.proto files in
205 the work directory.
206 """
207 assert isinstance(self.channel, grpc.Channel)
208 stub = SchemaServiceStub(self.channel)
209 # try:
210 schemas = stub.GetSchema(Empty(), timeout=120)
211 # except _Rendezvous, e:
212 # if e.code == grpc.StatusCode.UNAVAILABLE:
213 #
214 # else:
215 # raise e
216
217 os.system('mkdir -p %s' % self.work_dir)
218 os.system('rm -fr /tmp/%s/*' %
219 self.work_dir.replace('/tmp/', '')) # safer
220
221 for proto_file in schemas.protos:
222 proto_fname = proto_file.file_name
223 proto_content = proto_file.proto
224 log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
225 length=len(proto_content))
226 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
227 f.write(proto_content)
228
229 desc_content = decompress(proto_file.descriptor)
230 desc_fname = proto_fname.replace('.proto', '.desc')
231 log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
232 length=len(desc_content))
233 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
234 f.write(desc_content)
235 return schemas.swagger_from
236
237 def _compile_proto_files(self, swagger_from):
238 """
239 For each *.proto file in the work directory, compile the proto
240 file into the respective *_pb2.py file as well as generate the
241 web server gateway python file *_gw.py.
242 :return: None
243 """
244
245 chameleon_base_dir = os.path.abspath(os.path.join(
246 os.path.dirname(__file__), '.'
247 ))
248
249 for fname in [f for f in os.listdir(self.work_dir)
250 if f.endswith('.proto')]:
251
252 need_swagger = fname == swagger_from
253 log.debug('compiling', file=fname, need_swagger=need_swagger)
254 cmd = (
255 'cd %s && '
256 'env PATH=%s PYTHONPATH=%s '
257 'python -m grpc.tools.protoc '
Daniele Moroefa52b72020-02-12 22:10:18 -0800258 '-I. -I %s '
Scott Bakerbef5fd92019-02-21 10:24:02 -0800259 '--python_out=. '
260 '--grpc_python_out=. '
261 '--plugin=protoc-gen-gw=%s/gw_gen.py '
262 '--gw_out=. '
263 '%s' % (
264 self.work_dir,
265 ':'.join([os.environ['PATH'], self.plugin_dir]),
266 chameleon_base_dir,
Daniele Moroefa52b72020-02-12 22:10:18 -0800267 self.google_api_dir,
Scott Bakerbef5fd92019-02-21 10:24:02 -0800268 self.plugin_dir,
269 fname)
270 )
271 log.debug('executing', cmd=cmd, file=fname)
272 os.system(cmd)
273 log.info('compiled', file=fname)
274
275 # test-load each _pb2 file to see all is right
276 if self.work_dir not in sys.path:
277 sys.path.insert(0, self.work_dir)
278
279 for fname in [f for f in os.listdir(self.work_dir)
280 if f.endswith('_pb2.py')]:
281 modname = fname[:-len('.py')]
282 log.debug('test-import', modname=modname)
Zack Williams5c2ea232019-01-30 15:23:01 -0700283 _ = __import__(modname) # noqa: F841
Scott Bakerbef5fd92019-02-21 10:24:02 -0800284
285 @inlineCallbacks
286 def invoke(self, stub, method_name, request, metadata, retry=1):
287 """
288 Invoke a gRPC call to the remote server and return the response.
289 :param stub: Reference to the *_pb2 service stub
290 :param method_name: The method name inside the service stub
291 :param request: The request protobuf message
292 :param metadata: [(str, str), (str, str), ...]
293 :return: The response protobuf message and returned trailing metadata
294 """
295
296 if not self.connected:
Zack Williams5c2ea232019-01-30 15:23:01 -0700297 raise ConnectError()
Scott Bakerbef5fd92019-02-21 10:24:02 -0800298
299 try:
300 method = getattr(stub(self.channel), method_name)
301 response, rendezvous = method.with_call(request, metadata=metadata)
302 returnValue((response, rendezvous.trailing_metadata()))
303
Zack Williams5c2ea232019-01-30 15:23:01 -0700304 except grpc._channel._Rendezvous as e:
Scott Bakerbef5fd92019-02-21 10:24:02 -0800305 code = e.code()
306 if code == grpc.StatusCode.UNAVAILABLE:
Zack Williams5c2ea232019-01-30 15:23:01 -0700307 e = ConnectError()
Scott Bakerbef5fd92019-02-21 10:24:02 -0800308
309 if self.connected:
310 self.connected = False
311 yield self.connect()
312 if retry > 0:
313 response = yield self.invoke(stub, method_name,
314 request, metadata,
315 retry=retry - 1)
316 returnValue(response)
317
318 elif code in (
319 grpc.StatusCode.NOT_FOUND,
320 grpc.StatusCode.INVALID_ARGUMENT,
321 grpc.StatusCode.ALREADY_EXISTS,
322 grpc.StatusCode.UNAUTHENTICATED,
323 grpc.StatusCode.PERMISSION_DENIED):
324
325 pass # don't log error, these occur naturally
326
327 else:
328 log.exception(e)
329
330 raise e