blob: 75c78ebaaee486c1634a3852faacf60273239cb6 [file] [log] [blame]
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
Zsolt Harasztie7b60762016-10-05 17:49:27 -070018gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(NullMessage) and all of its
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070020semantics are derived from the recovered schema.
21"""
Zsolt Harasztie7b60762016-10-05 17:49:27 -070022
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070023import os
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070024import sys
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070025from random import randint
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070026from zlib import decompress
27
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070028import grpc
29from consul import Consul
Zsolt Harasztie7b60762016-10-05 17:49:27 -070030from grpc._channel import _Rendezvous
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070031from structlog import get_logger
Zsolt Harasztie7b60762016-10-05 17:49:27 -070032from twisted.internet import reactor
33from twisted.internet.defer import inlineCallbacks
34from werkzeug.exceptions import ServiceUnavailable
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070035
Zsolt Harasztiabae5912016-10-16 19:30:34 -070036from common.utils.asleep import asleep
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070037from chameleon.protos.schema_pb2 import NullMessage, SchemaServiceStub
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070038
39log = get_logger()
40
41
42class GrpcClient(object):
Zsolt Harasztie7b60762016-10-05 17:49:27 -070043 """
44 Connect to a gRPC server, fetch its schema, and process the downloaded
45 schema files to drive the customization of the north-bound interface(s)
46 of Chameleon.
47 """
48 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070049
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070050 def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
51 reconnect_callback=None):
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070052 self.consul_endpoint = consul_endpoint
53 self.endpoint = endpoint
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070054 self.work_dir = work_dir
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070055 self.reconnect_callback = reconnect_callback
56
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070057 self.plugin_dir = os.path.abspath(os.path.join(
58 os.path.dirname(__file__), '../protoc_plugins'))
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070059
60 self.channel = None
61 self.schema = None
Zsolt Harasztie7b60762016-10-05 17:49:27 -070062 self.retries = 0
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070063 self.shutting_down = False
Zsolt Haraszti3cf36342016-10-05 20:40:19 -070064 self.connected = False
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070065
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070066 def start(self):
67 log.debug('starting')
Zsolt Haraszti3cf36342016-10-05 20:40:19 -070068 if not self.connected:
69 reactor.callLater(0, self.connect)
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070070 log.info('started')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070071 return self
72
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070073 def stop(self):
74 log.debug('stopping')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070075 if self.shutting_down:
76 return
77 self.shutting_down = True
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070078 log.info('stopped')
79
80 def set_reconnect_callback(self, reconnect_callback):
81 self.reconnect_callback = reconnect_callback
82 return self
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070083
Zsolt Harasztie7b60762016-10-05 17:49:27 -070084 @inlineCallbacks
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070085 def connect(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -070086 """
87 (Re-)Connect to end-point
88 """
89
Zsolt Haraszti3cf36342016-10-05 20:40:19 -070090 if self.shutting_down or self.connected:
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070091 return
92
93 try:
94 if self.endpoint.startswith('@'):
Zsolt Harasztie7b60762016-10-05 17:49:27 -070095 _endpoint = self._get_endpoint_from_consul(self.endpoint[1:])
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070096 else:
97 _endpoint = self.endpoint
98
99 log.info('connecting', endpoint=_endpoint)
100 self.channel = grpc.insecure_channel(_endpoint)
101
102 self._retrieve_schema()
103 self._compile_proto_files()
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700104 self._clear_backoff()
105
Zsolt Haraszti3cf36342016-10-05 20:40:19 -0700106 self.connected = True
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700107 if self.reconnect_callback is not None:
108 reactor.callLater(0, self.reconnect_callback)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700109
110 return
111
112 except _Rendezvous, e:
113 if e.code() == grpc.StatusCode.UNAVAILABLE:
114 log.info('grpc-endpoint-not-available')
115 else:
116 log.exception(e)
117 yield self._backoff('not-available')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700118
119 except Exception, e:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700120 if not self.shutting_down:
121 log.exception('cannot-connect', endpoint=_endpoint)
122 yield self._backoff('unknown-error')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700123
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700124 reactor.callLater(0, self.connect)
125
126 def _backoff(self, msg):
127 wait_time = self.RETRY_BACKOFF[min(self.retries,
128 len(self.RETRY_BACKOFF) - 1)]
129 self.retries += 1
130 log.error(msg, retry_in=wait_time)
131 return asleep(wait_time)
132
133 def _clear_backoff(self):
134 if self.retries:
135 log.info('reconnected', after_retries=self.retries)
136 self.retries = 0
137
138 def _get_endpoint_from_consul(self, service_name):
139 """
140 Look up an appropriate grpc endpoint (host, port) from
141 consul, under the service name specified by service-name
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700142 """
143 host = self.consul_endpoint.split(':')[0].strip()
144 port = int(self.consul_endpoint.split(':')[1].strip())
145
146 consul = Consul(host=host, port=port)
147 _, services = consul.catalog.service(service_name)
148
149 if len(services) == 0:
150 raise Exception('Cannot find service %s in consul' % service_name)
151
152 # pick a random entry
153 # TODO should we prefer local IP addresses? Probably.
154
155 service = services[randint(0, len(services) - 1)]
156 endpoint = '{}:{}'.format(service['ServiceAddress'],
157 service['ServicePort'])
158 return endpoint
159
160 def _retrieve_schema(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700161 """
162 Retrieve schema from gRPC end-point, and save all *.proto files in
163 the work directory.
164 """
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700165 assert isinstance(self.channel, grpc.Channel)
166 stub = SchemaServiceStub(self.channel)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700167 # try:
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700168 schemas = stub.GetSchema(NullMessage())
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700169 # except _Rendezvous, e:
170 # if e.code == grpc.StatusCode.UNAVAILABLE:
171 #
172 # else:
173 # raise e
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700174
175 os.system('mkdir -p %s' % self.work_dir)
176 os.system('rm -fr /tmp/%s/*' %
177 self.work_dir.replace('/tmp/', '')) # safer
178
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700179 for proto_file in schemas.protos:
180 proto_fname = proto_file.file_name
181 proto_content = proto_file.proto
182 log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
183 length=len(proto_content))
184 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
185 f.write(proto_content)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700186
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700187 desc_content = decompress(proto_file.descriptor)
188 desc_fname = proto_fname.replace('.proto', '.desc')
189 log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
190 length=len(desc_content))
191 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
192 f.write(desc_content)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700193
194 def _compile_proto_files(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700195 """
196 For each *.proto file in the work directory, compile the proto
197 file into the respective *_pb2.py file as well as generate the
198 web server gateway python file *_gw.py.
199 :return: None
200 """
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700201 google_api_dir = os.path.abspath(os.path.join(
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700202 os.path.dirname(__file__), '../protos/third_party'
203 ))
204
205 chameleon_base_dir = os.path.abspath(os.path.join(
206 os.path.dirname(__file__), '../..'
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700207 ))
208
209 for fname in [f for f in os.listdir(self.work_dir)
210 if f.endswith('.proto')]:
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700211
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700212 log.debug('compiling', file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700213 cmd = (
214 'cd %s && '
Zsolt Haraszti05b837a2016-10-05 00:18:57 -0700215 'env PATH=%s PYTHONPATH=%s '
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700216 'python -m grpc.tools.protoc '
217 '-I. '
218 '-I%s '
219 '--python_out=. '
220 '--grpc_python_out=. '
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700221 '--plugin=protoc-gen-gw=%s/gw_gen.py '
222 '--gw_out=. '
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700223 '--plugin=protoc-gen-swagger=%s/swagger_gen.py '
224 '--swagger_out=. '
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700225 '%s' % (
226 self.work_dir,
227 ':'.join([os.environ['PATH'], self.plugin_dir]),
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700228 ':'.join([google_api_dir, chameleon_base_dir]),
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700229 google_api_dir,
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700230 self.plugin_dir,
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700231 self.plugin_dir,
232 fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700233 )
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700234 log.debug('executing', cmd=cmd, file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700235 os.system(cmd)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700236 log.info('compiled', file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700237
238 # test-load each _pb2 file to see all is right
239 if self.work_dir not in sys.path:
240 sys.path.insert(0, self.work_dir)
241
242 for fname in [f for f in os.listdir(self.work_dir)
243 if f.endswith('_pb2.py')]:
244 modname = fname[:-len('.py')]
245 log.debug('test-import', modname=modname)
246 _ = __import__(modname)
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700247
248 def invoke(self, stub, method_name, request):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700249 """
250 Invoke a gRPC call to the remote server and return the response.
251 :param stub: Reference to the *_pb2 service stub
252 :param method_name: The method name inside the service stub
253 :param request: The request protobuf message
254 :return: The response protobuf message
255 """
256
Zsolt Haraszti3cf36342016-10-05 20:40:19 -0700257 if not self.connected:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700258 raise ServiceUnavailable()
259
260 try:
261 response = getattr(stub(self.channel), method_name)(request)
262 return response
263
264 except grpc._channel._Rendezvous, e:
265 if e.code() == grpc.StatusCode.UNAVAILABLE:
266 e = ServiceUnavailable()
267 else:
268 log.exception(e)
269
Zsolt Haraszti3cf36342016-10-05 20:40:19 -0700270 if self.connected :
271 self.connected = False
272 reactor.callLater(0, self.connect)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700273
274 raise e