blob: a8ad69d18fee6c62ddba100665a666ccf02aca76 [file] [log] [blame]
Scott Bakerbef5fd92019-02-21 10:24:02 -08001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
25import time
26from random import randint
27from zlib import decompress
28
29import functools
30import grpc
31from consul import Consul
32from grpc._channel import _Rendezvous
33from structlog import get_logger
34from twisted.internet import reactor
35from twisted.internet.defer import inlineCallbacks, returnValue
36from werkzeug.exceptions import ServiceUnavailable
37
38from protos.schema_pb2_grpc import SchemaServiceStub
39from google.protobuf.empty_pb2 import Empty
40
41from asleep import asleep
42
43log = get_logger()
44
45
46class GrpcClient(object):
47 """
48 Connect to a gRPC server, fetch its schema, and process the downloaded
49 schema files to drive the customization of the north-bound interface(s)
50 of Chameleon.
51 """
52 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
53
54 def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
55 reconnect_callback=None, credentials=None, restart_on_disconnect=False):
56 self.consul_endpoint = consul_endpoint
57 self.endpoint = endpoint
58 self.work_dir = work_dir
59 self.reconnect_callback = reconnect_callback
60 self.credentials = credentials
61 self.restart_on_disconnect = restart_on_disconnect
62
63 self.plugin_dir = os.path.abspath(os.path.join(
64 os.path.dirname(__file__), 'protoc_plugins'))
65
66 self.channel = None
67 self.schema = None
68 self.retries = 0
69 self.shutting_down = False
70 self.connected = False
71 self.was_connected = False
72
73 def start(self):
74 log.debug('starting')
75 if not self.connected:
76 reactor.callLater(0, self.connect)
77 log.info('started')
78 return self
79
80 def stop(self):
81 log.debug('stopping')
82 if self.shutting_down:
83 return
84 self.shutting_down = True
85 log.info('stopped')
86
87 def set_reconnect_callback(self, reconnect_callback):
88 self.reconnect_callback = reconnect_callback
89 return self
90
91 def connectivity_callback(self, client, connectivity):
92 if (self.was_connected) and (connectivity in [connectivity.TRANSIENT_FAILURE, connectivity.SHUTDOWN]):
93 log.info("connectivity lost -- restarting")
94 os.execv(sys.executable, ['python'] + sys.argv)
95
96 if (connectivity == connectivity.READY):
97 self.was_connected = True
98
99 # Sometimes gRPC transitions from READY to IDLE, skipping TRANSIENT_FAILURE even though a socket is
100 # disconnected. So on idle, force a connectivity check.
101 if (connectivity == connectivity.IDLE) and (self.was_connected):
102 connectivity = client.channel._channel.check_connectivity_state(True)
103 # The result will probably show IDLE, but passing in True has the side effect of reconnecting if the
104 # connection has been lost, which will trigger the TRANSIENT_FALURE we were looking for.
105
106 @inlineCallbacks
107 def connect(self):
108 """
109 (Re-)Connect to end-point
110 """
111
112 if self.shutting_down or self.connected:
113 return
114
115 try:
116 if self.endpoint.startswith('@'):
117 _endpoint = yield self._get_endpoint_from_consul(
118 self.endpoint[1:])
119 else:
120 _endpoint = self.endpoint
121
122 if self.credentials:
123 log.info('securely connecting', endpoint=_endpoint)
124 self.channel = grpc.secure_channel(_endpoint, self.credentials)
125 else:
126 log.info('insecurely connecting', endpoint=_endpoint)
127 self.channel = grpc.insecure_channel(_endpoint)
128
129 if self.restart_on_disconnect:
130 connectivity_callback = functools.partial(self.connectivity_callback, self)
131 self.channel.subscribe(connectivity_callback)
132
133 # Delay between initiating connection and executing first gRPC. See CORD-3012.
134 time.sleep(0.5)
135
136 swagger_from = self._retrieve_schema()
137 self._compile_proto_files(swagger_from)
138 self._clear_backoff()
139
140 self.connected = True
141 if self.reconnect_callback is not None:
142 reactor.callLater(0, self.reconnect_callback)
143
144 return
145
146 except _Rendezvous, e:
147 if e.code() == grpc.StatusCode.UNAVAILABLE:
148 log.info('grpc-endpoint-not-available')
149 else:
150 log.exception('rendezvous error', e=e)
151 yield self._backoff('not-available')
152
153 except Exception, e:
154 if not self.shutting_down:
155 log.exception('cannot-connect', endpoint=_endpoint)
156 yield self._backoff('unknown-error')
157
158 reactor.callLater(0, self.connect)
159
160 def _backoff(self, msg):
161 wait_time = self.RETRY_BACKOFF[min(self.retries,
162 len(self.RETRY_BACKOFF) - 1)]
163 self.retries += 1
164 log.error(msg, retry_in=wait_time)
165 return asleep(wait_time)
166
167 def _clear_backoff(self):
168 if self.retries:
169 log.info('reconnected', after_retries=self.retries)
170 self.retries = 0
171
172 @inlineCallbacks
173 def _get_endpoint_from_consul(self, service_name):
174 """
175 Look up an appropriate grpc endpoint (host, port) from
176 consul, under the service name specified by service-name
177 """
178 host = self.consul_endpoint.split(':')[0].strip()
179 port = int(self.consul_endpoint.split(':')[1].strip())
180
181 while True:
182 log.debug('consul-lookup', host=host, port=port)
183 consul = Consul(host=host, port=port)
184 _, services = consul.catalog.service(service_name)
185 log.debug('consul-response', services=services)
186 if services:
187 break
188 log.warning('no-service', consul_host=host, consul_port=port,
189 service_name=service_name)
190 yield asleep(1.0)
191
192 # pick local addresses when resolving a service via consul
193 # see CORD-815 (https://jira.opencord.org/browse/CORD-815)
194
195 service = services[randint(0, len(services) - 1)]
196 endpoint = '{}:{}'.format(service['ServiceAddress'],
197 service['ServicePort'])
198 returnValue(endpoint)
199
200 def _retrieve_schema(self):
201 """
202 Retrieve schema from gRPC end-point, and save all *.proto files in
203 the work directory.
204 """
205 assert isinstance(self.channel, grpc.Channel)
206 stub = SchemaServiceStub(self.channel)
207 # try:
208 schemas = stub.GetSchema(Empty(), timeout=120)
209 # except _Rendezvous, e:
210 # if e.code == grpc.StatusCode.UNAVAILABLE:
211 #
212 # else:
213 # raise e
214
215 os.system('mkdir -p %s' % self.work_dir)
216 os.system('rm -fr /tmp/%s/*' %
217 self.work_dir.replace('/tmp/', '')) # safer
218
219 for proto_file in schemas.protos:
220 proto_fname = proto_file.file_name
221 proto_content = proto_file.proto
222 log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
223 length=len(proto_content))
224 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
225 f.write(proto_content)
226
227 desc_content = decompress(proto_file.descriptor)
228 desc_fname = proto_fname.replace('.proto', '.desc')
229 log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
230 length=len(desc_content))
231 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
232 f.write(desc_content)
233 return schemas.swagger_from
234
235 def _compile_proto_files(self, swagger_from):
236 """
237 For each *.proto file in the work directory, compile the proto
238 file into the respective *_pb2.py file as well as generate the
239 web server gateway python file *_gw.py.
240 :return: None
241 """
242
243 chameleon_base_dir = os.path.abspath(os.path.join(
244 os.path.dirname(__file__), '.'
245 ))
246
247 for fname in [f for f in os.listdir(self.work_dir)
248 if f.endswith('.proto')]:
249
250 need_swagger = fname == swagger_from
251 log.debug('compiling', file=fname, need_swagger=need_swagger)
252 cmd = (
253 'cd %s && '
254 'env PATH=%s PYTHONPATH=%s '
255 'python -m grpc.tools.protoc '
256 '-I. '
257 '--python_out=. '
258 '--grpc_python_out=. '
259 '--plugin=protoc-gen-gw=%s/gw_gen.py '
260 '--gw_out=. '
261 '%s' % (
262 self.work_dir,
263 ':'.join([os.environ['PATH'], self.plugin_dir]),
264 chameleon_base_dir,
265 self.plugin_dir,
266 fname)
267 )
268 log.debug('executing', cmd=cmd, file=fname)
269 os.system(cmd)
270 log.info('compiled', file=fname)
271
272 # test-load each _pb2 file to see all is right
273 if self.work_dir not in sys.path:
274 sys.path.insert(0, self.work_dir)
275
276 for fname in [f for f in os.listdir(self.work_dir)
277 if f.endswith('_pb2.py')]:
278 modname = fname[:-len('.py')]
279 log.debug('test-import', modname=modname)
280 _ = __import__(modname)
281
282 @inlineCallbacks
283 def invoke(self, stub, method_name, request, metadata, retry=1):
284 """
285 Invoke a gRPC call to the remote server and return the response.
286 :param stub: Reference to the *_pb2 service stub
287 :param method_name: The method name inside the service stub
288 :param request: The request protobuf message
289 :param metadata: [(str, str), (str, str), ...]
290 :return: The response protobuf message and returned trailing metadata
291 """
292
293 if not self.connected:
294 raise ServiceUnavailable()
295
296 try:
297 method = getattr(stub(self.channel), method_name)
298 response, rendezvous = method.with_call(request, metadata=metadata)
299 returnValue((response, rendezvous.trailing_metadata()))
300
301 except grpc._channel._Rendezvous, e:
302 code = e.code()
303 if code == grpc.StatusCode.UNAVAILABLE:
304 e = ServiceUnavailable()
305
306 if self.connected:
307 self.connected = False
308 yield self.connect()
309 if retry > 0:
310 response = yield self.invoke(stub, method_name,
311 request, metadata,
312 retry=retry - 1)
313 returnValue(response)
314
315 elif code in (
316 grpc.StatusCode.NOT_FOUND,
317 grpc.StatusCode.INVALID_ARGUMENT,
318 grpc.StatusCode.ALREADY_EXISTS,
319 grpc.StatusCode.UNAUTHENTICATED,
320 grpc.StatusCode.PERMISSION_DENIED):
321
322 pass # don't log error, these occur naturally
323
324 else:
325 log.exception(e)
326
327 raise e