blob: 9b68912d33c55600c1fee0fd6fa64f6f05f8c92d [file] [log] [blame]
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -07001#
Zsolt Harasztiaccad4a2017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
Zsolt Harasztie7b60762016-10-05 17:49:27 -070018gRPC client meant to connect to a gRPC server endpoint, and query the
Zsolt Haraszticba96de2016-11-06 14:04:55 -080019end-point's schema by calling SchemaService.Schema(Empty) and all of its
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070020semantics are derived from the recovered schema.
21"""
Zsolt Harasztie7b60762016-10-05 17:49:27 -070022
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070023import os
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070024import sys
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070025from random import randint
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070026from zlib import decompress
27
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070028import grpc
29from consul import Consul
Zsolt Harasztie7b60762016-10-05 17:49:27 -070030from grpc._channel import _Rendezvous
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070031from structlog import get_logger
Zsolt Harasztie7b60762016-10-05 17:49:27 -070032from twisted.internet import reactor
Zsolt Haraszti2aac6232016-11-23 11:18:23 -080033from twisted.internet.defer import inlineCallbacks, returnValue
Zsolt Harasztie7b60762016-10-05 17:49:27 -070034from werkzeug.exceptions import ServiceUnavailable
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070035
Zsolt Harasztiabae5912016-10-16 19:30:34 -070036from common.utils.asleep import asleep
Zsolt Haraszti21980762016-11-08 10:57:19 -080037from chameleon.protos import third_party
Zsolt Haraszticba96de2016-11-06 14:04:55 -080038from chameleon.protos.schema_pb2 import SchemaServiceStub
39from google.protobuf.empty_pb2 import Empty
40
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070041
42log = get_logger()
43
44
45class GrpcClient(object):
Zsolt Harasztie7b60762016-10-05 17:49:27 -070046 """
47 Connect to a gRPC server, fetch its schema, and process the downloaded
48 schema files to drive the customization of the north-bound interface(s)
49 of Chameleon.
50 """
51 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070052
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070053 def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
54 reconnect_callback=None):
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070055 self.consul_endpoint = consul_endpoint
56 self.endpoint = endpoint
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070057 self.work_dir = work_dir
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070058 self.reconnect_callback = reconnect_callback
59
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070060 self.plugin_dir = os.path.abspath(os.path.join(
61 os.path.dirname(__file__), '../protoc_plugins'))
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070062
63 self.channel = None
64 self.schema = None
Zsolt Harasztie7b60762016-10-05 17:49:27 -070065 self.retries = 0
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070066 self.shutting_down = False
Zsolt Haraszti3cf36342016-10-05 20:40:19 -070067 self.connected = False
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070068
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070069 def start(self):
70 log.debug('starting')
Zsolt Haraszti3cf36342016-10-05 20:40:19 -070071 if not self.connected:
72 reactor.callLater(0, self.connect)
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070073 log.info('started')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070074 return self
75
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070076 def stop(self):
77 log.debug('stopping')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070078 if self.shutting_down:
79 return
80 self.shutting_down = True
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070081 log.info('stopped')
82
83 def set_reconnect_callback(self, reconnect_callback):
84 self.reconnect_callback = reconnect_callback
85 return self
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070086
Zsolt Harasztie7b60762016-10-05 17:49:27 -070087 @inlineCallbacks
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070088 def connect(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -070089 """
90 (Re-)Connect to end-point
91 """
92
Zsolt Haraszti3cf36342016-10-05 20:40:19 -070093 if self.shutting_down or self.connected:
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070094 return
95
96 try:
97 if self.endpoint.startswith('@'):
Zsolt Haraszti267a9062016-12-26 23:11:15 -080098 _endpoint = yield self._get_endpoint_from_consul(
99 self.endpoint[1:])
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700100 else:
101 _endpoint = self.endpoint
102
103 log.info('connecting', endpoint=_endpoint)
104 self.channel = grpc.insecure_channel(_endpoint)
105
Zsolt Haraszti21980762016-11-08 10:57:19 -0800106 swagger_from = self._retrieve_schema()
107 self._compile_proto_files(swagger_from)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700108 self._clear_backoff()
109
Zsolt Haraszti3cf36342016-10-05 20:40:19 -0700110 self.connected = True
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700111 if self.reconnect_callback is not None:
112 reactor.callLater(0, self.reconnect_callback)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700113
114 return
115
116 except _Rendezvous, e:
117 if e.code() == grpc.StatusCode.UNAVAILABLE:
118 log.info('grpc-endpoint-not-available')
119 else:
120 log.exception(e)
121 yield self._backoff('not-available')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700122
123 except Exception, e:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700124 if not self.shutting_down:
125 log.exception('cannot-connect', endpoint=_endpoint)
126 yield self._backoff('unknown-error')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700127
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700128 reactor.callLater(0, self.connect)
129
130 def _backoff(self, msg):
131 wait_time = self.RETRY_BACKOFF[min(self.retries,
132 len(self.RETRY_BACKOFF) - 1)]
133 self.retries += 1
134 log.error(msg, retry_in=wait_time)
135 return asleep(wait_time)
136
137 def _clear_backoff(self):
138 if self.retries:
139 log.info('reconnected', after_retries=self.retries)
140 self.retries = 0
141
Zsolt Haraszti267a9062016-12-26 23:11:15 -0800142 @inlineCallbacks
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700143 def _get_endpoint_from_consul(self, service_name):
144 """
145 Look up an appropriate grpc endpoint (host, port) from
146 consul, under the service name specified by service-name
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700147 """
148 host = self.consul_endpoint.split(':')[0].strip()
149 port = int(self.consul_endpoint.split(':')[1].strip())
150
Zsolt Haraszti267a9062016-12-26 23:11:15 -0800151 while True:
152 log.debug('consul-lookup', host=host, port=port)
153 consul = Consul(host=host, port=port)
154 _, services = consul.catalog.service(service_name)
155 log.debug('consul-response', services=services)
156 if services:
157 break
158 log.warning('no-service', consul_host=host, consul_port=port,
159 service_name=service_name)
160 yield asleep(1.0)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700161
162 # pick a random entry
163 # TODO should we prefer local IP addresses? Probably.
164
165 service = services[randint(0, len(services) - 1)]
166 endpoint = '{}:{}'.format(service['ServiceAddress'],
167 service['ServicePort'])
Zsolt Haraszti267a9062016-12-26 23:11:15 -0800168 returnValue(endpoint)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700169
170 def _retrieve_schema(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700171 """
172 Retrieve schema from gRPC end-point, and save all *.proto files in
173 the work directory.
174 """
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700175 assert isinstance(self.channel, grpc.Channel)
176 stub = SchemaServiceStub(self.channel)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700177 # try:
Zsolt Haraszticba96de2016-11-06 14:04:55 -0800178 schemas = stub.GetSchema(Empty())
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700179 # except _Rendezvous, e:
180 # if e.code == grpc.StatusCode.UNAVAILABLE:
181 #
182 # else:
183 # raise e
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700184
185 os.system('mkdir -p %s' % self.work_dir)
186 os.system('rm -fr /tmp/%s/*' %
187 self.work_dir.replace('/tmp/', '')) # safer
188
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700189 for proto_file in schemas.protos:
190 proto_fname = proto_file.file_name
191 proto_content = proto_file.proto
192 log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
193 length=len(proto_content))
194 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
195 f.write(proto_content)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700196
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700197 desc_content = decompress(proto_file.descriptor)
198 desc_fname = proto_fname.replace('.proto', '.desc')
199 log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
200 length=len(desc_content))
201 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
202 f.write(desc_content)
Zsolt Haraszti21980762016-11-08 10:57:19 -0800203 return schemas.swagger_from
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700204
Zsolt Haraszti21980762016-11-08 10:57:19 -0800205 def _compile_proto_files(self, swagger_from):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700206 """
207 For each *.proto file in the work directory, compile the proto
208 file into the respective *_pb2.py file as well as generate the
209 web server gateway python file *_gw.py.
210 :return: None
211 """
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700212 google_api_dir = os.path.abspath(os.path.join(
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700213 os.path.dirname(__file__), '../protos/third_party'
214 ))
215
216 chameleon_base_dir = os.path.abspath(os.path.join(
217 os.path.dirname(__file__), '../..'
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700218 ))
219
220 for fname in [f for f in os.listdir(self.work_dir)
221 if f.endswith('.proto')]:
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700222
Zsolt Haraszti21980762016-11-08 10:57:19 -0800223 need_swagger = fname == swagger_from
224 log.debug('compiling', file=fname, need_swagger=need_swagger)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700225 cmd = (
226 'cd %s && '
Zsolt Haraszti05b837a2016-10-05 00:18:57 -0700227 'env PATH=%s PYTHONPATH=%s '
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700228 'python -m grpc.tools.protoc '
229 '-I. '
230 '-I%s '
231 '--python_out=. '
232 '--grpc_python_out=. '
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700233 '--plugin=protoc-gen-gw=%s/gw_gen.py '
234 '--gw_out=. '
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700235 '--plugin=protoc-gen-swagger=%s/swagger_gen.py '
Zsolt Haraszti21980762016-11-08 10:57:19 -0800236 '%s'
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700237 '%s' % (
238 self.work_dir,
239 ':'.join([os.environ['PATH'], self.plugin_dir]),
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700240 ':'.join([google_api_dir, chameleon_base_dir]),
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700241 google_api_dir,
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700242 self.plugin_dir,
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700243 self.plugin_dir,
Zsolt Haraszti21980762016-11-08 10:57:19 -0800244 '--swagger_out=. ' if need_swagger else '',
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700245 fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700246 )
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700247 log.debug('executing', cmd=cmd, file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700248 os.system(cmd)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700249 log.info('compiled', file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700250
251 # test-load each _pb2 file to see all is right
252 if self.work_dir not in sys.path:
253 sys.path.insert(0, self.work_dir)
254
255 for fname in [f for f in os.listdir(self.work_dir)
256 if f.endswith('_pb2.py')]:
257 modname = fname[:-len('.py')]
258 log.debug('test-import', modname=modname)
259 _ = __import__(modname)
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700260
Zsolt Haraszti2aac6232016-11-23 11:18:23 -0800261 @inlineCallbacks
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800262 def invoke(self, stub, method_name, request, metadata, retry=1):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700263 """
264 Invoke a gRPC call to the remote server and return the response.
265 :param stub: Reference to the *_pb2 service stub
266 :param method_name: The method name inside the service stub
267 :param request: The request protobuf message
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800268 :param metadata: [(str, str), (str, str), ...]
269 :return: The response protobuf message and returned trailing metadata
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700270 """
271
Zsolt Haraszti3cf36342016-10-05 20:40:19 -0700272 if not self.connected:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700273 raise ServiceUnavailable()
274
275 try:
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800276 method = getattr(stub(self.channel), method_name)
277 response, rendezvous = method.with_call(request, metadata=metadata)
278 returnValue((response, rendezvous.trailing_metadata()))
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700279
280 except grpc._channel._Rendezvous, e:
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800281 code = e.code()
282 if code == grpc.StatusCode.UNAVAILABLE:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700283 e = ServiceUnavailable()
Zsolt Haraszti2aac6232016-11-23 11:18:23 -0800284
285 if self.connected:
286 self.connected = False
287 yield self.connect()
288 if retry > 0:
289 response = yield self.invoke(stub, method_name,
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800290 request, metadata,
Zsolt Haraszti2aac6232016-11-23 11:18:23 -0800291 retry=retry - 1)
292 returnValue(response)
293
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800294 elif code in (
295 grpc.StatusCode.NOT_FOUND,
296 grpc.StatusCode.INVALID_ARGUMENT,
297 grpc.StatusCode.ALREADY_EXISTS):
298
299 pass # don't log error, these occur naturally
300
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700301 else:
302 log.exception(e)
303
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700304 raise e