blob: 9a95330753c3b3d79d1181fddd983b5d2a672080 [file] [log] [blame]
Zsolt Haraszti5cd64702016-09-27 13:48:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070018gRPC client meant to connect to a gRPC server endpoint, and query the
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080019end-point's schema by calling SchemaService.Schema(Empty) and all of its
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070020semantics are derived from the recovered schema.
21"""
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070022
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070023import os
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070024import sys
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070025from random import randint
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070026from zlib import decompress
27
Zsolt Haraszti034db372016-10-03 22:26:41 -070028import grpc
29from consul import Consul
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070030from grpc._channel import _Rendezvous
Zsolt Haraszti034db372016-10-03 22:26:41 -070031from structlog import get_logger
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070032from twisted.internet import reactor
33from twisted.internet.defer import inlineCallbacks
34from werkzeug.exceptions import ServiceUnavailable
Zsolt Haraszti034db372016-10-03 22:26:41 -070035
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070036from common.utils.asleep import asleep
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080037from chameleon.protos.schema_pb2 import SchemaServiceStub
38from google.protobuf.empty_pb2 import Empty
39
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070040
41log = get_logger()
42
43
44class GrpcClient(object):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070045 """
46 Connect to a gRPC server, fetch its schema, and process the downloaded
47 schema files to drive the customization of the north-bound interface(s)
48 of Chameleon.
49 """
50 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070051
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070052 def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
53 reconnect_callback=None):
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070054 self.consul_endpoint = consul_endpoint
55 self.endpoint = endpoint
Zsolt Haraszti034db372016-10-03 22:26:41 -070056 self.work_dir = work_dir
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070057 self.reconnect_callback = reconnect_callback
58
Zsolt Haraszti034db372016-10-03 22:26:41 -070059 self.plugin_dir = os.path.abspath(os.path.join(
60 os.path.dirname(__file__), '../protoc_plugins'))
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070061
62 self.channel = None
63 self.schema = None
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070064 self.retries = 0
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070065 self.shutting_down = False
Zsolt Harasztic51c0652016-10-05 20:40:19 -070066 self.connected = False
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070067
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070068 def start(self):
69 log.debug('starting')
Zsolt Harasztic51c0652016-10-05 20:40:19 -070070 if not self.connected:
71 reactor.callLater(0, self.connect)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070072 log.info('started')
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070073 return self
74
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070075 def stop(self):
76 log.debug('stopping')
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070077 if self.shutting_down:
78 return
79 self.shutting_down = True
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070080 log.info('stopped')
81
82 def set_reconnect_callback(self, reconnect_callback):
83 self.reconnect_callback = reconnect_callback
84 return self
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070085
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070086 @inlineCallbacks
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070087 def connect(self):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070088 """
89 (Re-)Connect to end-point
90 """
91
Zsolt Harasztic51c0652016-10-05 20:40:19 -070092 if self.shutting_down or self.connected:
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070093 return
94
95 try:
96 if self.endpoint.startswith('@'):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070097 _endpoint = self._get_endpoint_from_consul(self.endpoint[1:])
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070098 else:
99 _endpoint = self.endpoint
100
101 log.info('connecting', endpoint=_endpoint)
102 self.channel = grpc.insecure_channel(_endpoint)
103
104 self._retrieve_schema()
105 self._compile_proto_files()
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700106 self._clear_backoff()
107
Zsolt Harasztic51c0652016-10-05 20:40:19 -0700108 self.connected = True
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700109 if self.reconnect_callback is not None:
110 reactor.callLater(0, self.reconnect_callback)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700111
112 return
113
114 except _Rendezvous, e:
115 if e.code() == grpc.StatusCode.UNAVAILABLE:
116 log.info('grpc-endpoint-not-available')
117 else:
118 log.exception(e)
119 yield self._backoff('not-available')
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700120
121 except Exception, e:
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700122 if not self.shutting_down:
123 log.exception('cannot-connect', endpoint=_endpoint)
124 yield self._backoff('unknown-error')
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700125
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700126 reactor.callLater(0, self.connect)
127
128 def _backoff(self, msg):
129 wait_time = self.RETRY_BACKOFF[min(self.retries,
130 len(self.RETRY_BACKOFF) - 1)]
131 self.retries += 1
132 log.error(msg, retry_in=wait_time)
133 return asleep(wait_time)
134
135 def _clear_backoff(self):
136 if self.retries:
137 log.info('reconnected', after_retries=self.retries)
138 self.retries = 0
139
140 def _get_endpoint_from_consul(self, service_name):
141 """
142 Look up an appropriate grpc endpoint (host, port) from
143 consul, under the service name specified by service-name
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700144 """
145 host = self.consul_endpoint.split(':')[0].strip()
146 port = int(self.consul_endpoint.split(':')[1].strip())
147
148 consul = Consul(host=host, port=port)
149 _, services = consul.catalog.service(service_name)
150
151 if len(services) == 0:
152 raise Exception('Cannot find service %s in consul' % service_name)
153
154 # pick a random entry
155 # TODO should we prefer local IP addresses? Probably.
156
157 service = services[randint(0, len(services) - 1)]
158 endpoint = '{}:{}'.format(service['ServiceAddress'],
159 service['ServicePort'])
160 return endpoint
161
162 def _retrieve_schema(self):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700163 """
164 Retrieve schema from gRPC end-point, and save all *.proto files in
165 the work directory.
166 """
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700167 assert isinstance(self.channel, grpc.Channel)
168 stub = SchemaServiceStub(self.channel)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700169 # try:
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -0800170 schemas = stub.GetSchema(Empty())
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700171 # except _Rendezvous, e:
172 # if e.code == grpc.StatusCode.UNAVAILABLE:
173 #
174 # else:
175 # raise e
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700176
177 os.system('mkdir -p %s' % self.work_dir)
178 os.system('rm -fr /tmp/%s/*' %
179 self.work_dir.replace('/tmp/', '')) # safer
180
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700181 for proto_file in schemas.protos:
182 proto_fname = proto_file.file_name
183 proto_content = proto_file.proto
184 log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
185 length=len(proto_content))
186 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
187 f.write(proto_content)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700188
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700189 desc_content = decompress(proto_file.descriptor)
190 desc_fname = proto_fname.replace('.proto', '.desc')
191 log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
192 length=len(desc_content))
193 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
194 f.write(desc_content)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700195
196 def _compile_proto_files(self):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700197 """
198 For each *.proto file in the work directory, compile the proto
199 file into the respective *_pb2.py file as well as generate the
200 web server gateway python file *_gw.py.
201 :return: None
202 """
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700203 google_api_dir = os.path.abspath(os.path.join(
Zsolt Harasztibae12752016-10-10 09:55:30 -0700204 os.path.dirname(__file__), '../protos/third_party'
205 ))
206
207 chameleon_base_dir = os.path.abspath(os.path.join(
208 os.path.dirname(__file__), '../..'
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700209 ))
210
211 for fname in [f for f in os.listdir(self.work_dir)
212 if f.endswith('.proto')]:
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700213
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700214 log.debug('compiling', file=fname)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700215 cmd = (
216 'cd %s && '
Zsolt Haraszti15044082016-10-05 00:18:57 -0700217 'env PATH=%s PYTHONPATH=%s '
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700218 'python -m grpc.tools.protoc '
219 '-I. '
220 '-I%s '
221 '--python_out=. '
222 '--grpc_python_out=. '
Zsolt Haraszti034db372016-10-03 22:26:41 -0700223 '--plugin=protoc-gen-gw=%s/gw_gen.py '
224 '--gw_out=. '
Zsolt Harasztibae12752016-10-10 09:55:30 -0700225 '--plugin=protoc-gen-swagger=%s/swagger_gen.py '
226 '--swagger_out=. '
Zsolt Haraszti034db372016-10-03 22:26:41 -0700227 '%s' % (
228 self.work_dir,
229 ':'.join([os.environ['PATH'], self.plugin_dir]),
Zsolt Harasztibae12752016-10-10 09:55:30 -0700230 ':'.join([google_api_dir, chameleon_base_dir]),
Zsolt Haraszti034db372016-10-03 22:26:41 -0700231 google_api_dir,
Zsolt Harasztibae12752016-10-10 09:55:30 -0700232 self.plugin_dir,
Zsolt Haraszti034db372016-10-03 22:26:41 -0700233 self.plugin_dir,
234 fname)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700235 )
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700236 log.debug('executing', cmd=cmd, file=fname)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700237 os.system(cmd)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700238 log.info('compiled', file=fname)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700239
240 # test-load each _pb2 file to see all is right
241 if self.work_dir not in sys.path:
242 sys.path.insert(0, self.work_dir)
243
244 for fname in [f for f in os.listdir(self.work_dir)
245 if f.endswith('_pb2.py')]:
246 modname = fname[:-len('.py')]
247 log.debug('test-import', modname=modname)
248 _ = __import__(modname)
Zsolt Haraszti034db372016-10-03 22:26:41 -0700249
250 def invoke(self, stub, method_name, request):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700251 """
252 Invoke a gRPC call to the remote server and return the response.
253 :param stub: Reference to the *_pb2 service stub
254 :param method_name: The method name inside the service stub
255 :param request: The request protobuf message
256 :return: The response protobuf message
257 """
258
Zsolt Harasztic51c0652016-10-05 20:40:19 -0700259 if not self.connected:
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700260 raise ServiceUnavailable()
261
262 try:
263 response = getattr(stub(self.channel), method_name)(request)
264 return response
265
266 except grpc._channel._Rendezvous, e:
267 if e.code() == grpc.StatusCode.UNAVAILABLE:
268 e = ServiceUnavailable()
269 else:
270 log.exception(e)
271
Zsolt Harasztic51c0652016-10-05 20:40:19 -0700272 if self.connected :
273 self.connected = False
274 reactor.callLater(0, self.connect)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700275
276 raise e