blob: 790bab3e7df1e6333de0fdd73f2a556e2a22f2ea [file] [log] [blame]
Zsolt Haraszti5cd64702016-09-27 13:48:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070018gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(NullMessage) and all of its
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070020semantics are derived from the recovered schema.
21"""
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070022
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070023import os
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070024import sys
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070025from random import randint
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070026from zlib import decompress
27
Zsolt Haraszti034db372016-10-03 22:26:41 -070028import grpc
29from consul import Consul
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070030from grpc._channel import _Rendezvous
Zsolt Haraszti034db372016-10-03 22:26:41 -070031from structlog import get_logger
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070032from twisted.internet import reactor
33from twisted.internet.defer import inlineCallbacks
34from werkzeug.exceptions import ServiceUnavailable
Zsolt Haraszti034db372016-10-03 22:26:41 -070035
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070036from chameleon.asleep import asleep
Zsolt Haraszti034db372016-10-03 22:26:41 -070037from chameleon.protos.schema_pb2 import NullMessage, SchemaServiceStub
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070038
39log = get_logger()
40
41
42class GrpcClient(object):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070043 """
44 Connect to a gRPC server, fetch its schema, and process the downloaded
45 schema files to drive the customization of the north-bound interface(s)
46 of Chameleon.
47 """
48 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070049
Zsolt Haraszti034db372016-10-03 22:26:41 -070050 def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055'):
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070051 self.consul_endpoint = consul_endpoint
52 self.endpoint = endpoint
Zsolt Haraszti034db372016-10-03 22:26:41 -070053 self.work_dir = work_dir
54 self.plugin_dir = os.path.abspath(os.path.join(
55 os.path.dirname(__file__), '../protoc_plugins'))
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070056
57 self.channel = None
58 self.schema = None
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070059 self.retries = 0
60 self.on_reconnect = None
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070061 self.shutting_down = False
Zsolt Harasztic51c0652016-10-05 20:40:19 -070062 self.connected = False
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070063
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070064 def run(self, on_reconnect=None):
65 self.on_reconnect = on_reconnect
Zsolt Harasztic51c0652016-10-05 20:40:19 -070066 if not self.connected:
67 reactor.callLater(0, self.connect)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070068 return self
69
70 def shutdown(self):
71 if self.shutting_down:
72 return
73 self.shutting_down = True
74 pass
75
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070076 @inlineCallbacks
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070077 def connect(self):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070078 """
79 (Re-)Connect to end-point
80 """
81
Zsolt Harasztic51c0652016-10-05 20:40:19 -070082 if self.shutting_down or self.connected:
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070083 return
84
85 try:
86 if self.endpoint.startswith('@'):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070087 _endpoint = self._get_endpoint_from_consul(self.endpoint[1:])
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070088 else:
89 _endpoint = self.endpoint
90
91 log.info('connecting', endpoint=_endpoint)
92 self.channel = grpc.insecure_channel(_endpoint)
93
94 self._retrieve_schema()
95 self._compile_proto_files()
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070096 self._clear_backoff()
97
Zsolt Harasztic51c0652016-10-05 20:40:19 -070098 self.connected = True
Zsolt Harasztid4226ed2016-10-05 17:49:27 -070099 if self.on_reconnect is not None:
100 reactor.callLater(0, self.on_reconnect)
101
102 return
103
104 except _Rendezvous, e:
105 if e.code() == grpc.StatusCode.UNAVAILABLE:
106 log.info('grpc-endpoint-not-available')
107 else:
108 log.exception(e)
109 yield self._backoff('not-available')
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700110
111 except Exception, e:
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700112 if not self.shutting_down:
113 log.exception('cannot-connect', endpoint=_endpoint)
114 yield self._backoff('unknown-error')
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700115
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700116 reactor.callLater(0, self.connect)
117
118 def _backoff(self, msg):
119 wait_time = self.RETRY_BACKOFF[min(self.retries,
120 len(self.RETRY_BACKOFF) - 1)]
121 self.retries += 1
122 log.error(msg, retry_in=wait_time)
123 return asleep(wait_time)
124
125 def _clear_backoff(self):
126 if self.retries:
127 log.info('reconnected', after_retries=self.retries)
128 self.retries = 0
129
130 def _get_endpoint_from_consul(self, service_name):
131 """
132 Look up an appropriate grpc endpoint (host, port) from
133 consul, under the service name specified by service-name
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700134 """
135 host = self.consul_endpoint.split(':')[0].strip()
136 port = int(self.consul_endpoint.split(':')[1].strip())
137
138 consul = Consul(host=host, port=port)
139 _, services = consul.catalog.service(service_name)
140
141 if len(services) == 0:
142 raise Exception('Cannot find service %s in consul' % service_name)
143
144 # pick a random entry
145 # TODO should we prefer local IP addresses? Probably.
146
147 service = services[randint(0, len(services) - 1)]
148 endpoint = '{}:{}'.format(service['ServiceAddress'],
149 service['ServicePort'])
150 return endpoint
151
152 def _retrieve_schema(self):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700153 """
154 Retrieve schema from gRPC end-point, and save all *.proto files in
155 the work directory.
156 """
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700157 assert isinstance(self.channel, grpc.Channel)
158 stub = SchemaServiceStub(self.channel)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700159 # try:
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700160 schema = stub.GetSchema(NullMessage())
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700161 # except _Rendezvous, e:
162 # if e.code == grpc.StatusCode.UNAVAILABLE:
163 #
164 # else:
165 # raise e
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700166
167 os.system('mkdir -p %s' % self.work_dir)
168 os.system('rm -fr /tmp/%s/*' %
169 self.work_dir.replace('/tmp/', '')) # safer
170
171 for fname in schema.protos:
172 content = schema.protos[fname]
173 log.debug('saving-proto',
174 fname=fname, dir=self.work_dir, length=len(content))
175 with open(os.path.join(self.work_dir, fname), 'w') as f:
176 f.write(content)
177
178 for fname in schema.descriptors:
179 content = decompress(schema.descriptors[fname])
180 log.debug('saving-descriptor',
181 fname=fname, dir=self.work_dir, length=len(content))
182 with open(os.path.join(self.work_dir, fname), 'wb') as f:
183 f.write(content)
184
185 def _compile_proto_files(self):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700186 """
187 For each *.proto file in the work directory, compile the proto
188 file into the respective *_pb2.py file as well as generate the
189 web server gateway python file *_gw.py.
190 :return: None
191 """
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700192 google_api_dir = os.path.abspath(os.path.join(
Zsolt Harasztibae12752016-10-10 09:55:30 -0700193 os.path.dirname(__file__), '../protos/third_party'
194 ))
195
196 chameleon_base_dir = os.path.abspath(os.path.join(
197 os.path.dirname(__file__), '../..'
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700198 ))
199
200 for fname in [f for f in os.listdir(self.work_dir)
201 if f.endswith('.proto')]:
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700202
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700203 log.debug('compiling', file=fname)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700204 cmd = (
205 'cd %s && '
Zsolt Haraszti15044082016-10-05 00:18:57 -0700206 'env PATH=%s PYTHONPATH=%s '
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700207 'python -m grpc.tools.protoc '
208 '-I. '
209 '-I%s '
210 '--python_out=. '
211 '--grpc_python_out=. '
Zsolt Haraszti034db372016-10-03 22:26:41 -0700212 '--plugin=protoc-gen-gw=%s/gw_gen.py '
213 '--gw_out=. '
Zsolt Harasztibae12752016-10-10 09:55:30 -0700214 '--plugin=protoc-gen-swagger=%s/swagger_gen.py '
215 '--swagger_out=. '
Zsolt Haraszti034db372016-10-03 22:26:41 -0700216 '%s' % (
217 self.work_dir,
218 ':'.join([os.environ['PATH'], self.plugin_dir]),
Zsolt Harasztibae12752016-10-10 09:55:30 -0700219 ':'.join([google_api_dir, chameleon_base_dir]),
Zsolt Haraszti034db372016-10-03 22:26:41 -0700220 google_api_dir,
Zsolt Harasztibae12752016-10-10 09:55:30 -0700221 self.plugin_dir,
Zsolt Haraszti034db372016-10-03 22:26:41 -0700222 self.plugin_dir,
223 fname)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700224 )
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700225 log.debug('executing', cmd=cmd, file=fname)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700226 os.system(cmd)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700227 log.info('compiled', file=fname)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700228
229 # test-load each _pb2 file to see all is right
230 if self.work_dir not in sys.path:
231 sys.path.insert(0, self.work_dir)
232
233 for fname in [f for f in os.listdir(self.work_dir)
234 if f.endswith('_pb2.py')]:
235 modname = fname[:-len('.py')]
236 log.debug('test-import', modname=modname)
237 _ = __import__(modname)
Zsolt Haraszti034db372016-10-03 22:26:41 -0700238
239 def invoke(self, stub, method_name, request):
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700240 """
241 Invoke a gRPC call to the remote server and return the response.
242 :param stub: Reference to the *_pb2 service stub
243 :param method_name: The method name inside the service stub
244 :param request: The request protobuf message
245 :return: The response protobuf message
246 """
247
Zsolt Harasztic51c0652016-10-05 20:40:19 -0700248 if not self.connected:
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700249 raise ServiceUnavailable()
250
251 try:
252 response = getattr(stub(self.channel), method_name)(request)
253 return response
254
255 except grpc._channel._Rendezvous, e:
256 if e.code() == grpc.StatusCode.UNAVAILABLE:
257 e = ServiceUnavailable()
258 else:
259 log.exception(e)
260
Zsolt Harasztic51c0652016-10-05 20:40:19 -0700261 if self.connected :
262 self.connected = False
263 reactor.callLater(0, self.connect)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700264
265 raise e